You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dp...@apache.org on 2018/05/18 21:19:26 UTC
lucene-solr:master: SOLR-12355: Fixes hash conflict in HashJoinStream
and OuterHashJoinStream
Repository: lucene-solr
Updated Branches:
refs/heads/master 4da0d6898 -> f506bc9cb
SOLR-12355: Fixes hash conflict in HashJoinStream and OuterHashJoinStream
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/f506bc9c
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/f506bc9c
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/f506bc9c
Branch: refs/heads/master
Commit: f506bc9cb7f1e82295c9c367487d49a80e767731
Parents: 4da0d68
Author: Dennis Gove <dp...@gmail.com>
Authored: Tue May 15 10:23:25 2018 -0400
Committer: Dennis Gove <dp...@gmail.com>
Committed: Fri May 18 17:14:39 2018 -0400
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../client/solrj/io/stream/HashJoinStream.java | 12 ++--
.../solrj/io/stream/OuterHashJoinStream.java | 2 +-
.../solrj/io/stream/StreamDecoratorTest.java | 76 ++++++++++++++++++++
4 files changed, 85 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f506bc9c/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 980d20b..7187360 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -223,6 +223,8 @@ Bug Fixes
* SOLR-12200: abandon OverseerExitThread when ZkController is closed. (Mikhail Khludnev)
+* SOLR-12355: Fixes hash conflict in HashJoinStream and OuterHashJoinStream (Dennis Gove)
+
Optimizations
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f506bc9c/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java
index ffba0ca..5a534df 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java
@@ -50,10 +50,10 @@ public class HashJoinStream extends TupleStream implements Expressible {
protected TupleStream fullStream;
protected List<String> leftHashOn;
protected List<String> rightHashOn;
- protected HashMap<Integer, List<Tuple>> hashedTuples;
+ protected HashMap<String, List<Tuple>> hashedTuples;
protected Tuple workingFullTuple = null;
- protected Integer workingFullHash = null;
+ protected String workingFullHash = null;
protected int workngHashSetIdx = 0;
public HashJoinStream(TupleStream fullStream, TupleStream hashStream, List<String> hashOn) throws IOException {
@@ -199,7 +199,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
Tuple tuple = hashStream.read();
while(!tuple.EOF){
- Integer hash = calculateHash(tuple, rightHashOn);
+ String hash = computeHash(tuple, rightHashOn);
if(null != hash){
if(hashedTuples.containsKey(hash)){
hashedTuples.get(hash).add(tuple);
@@ -214,7 +214,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
}
}
- protected Integer calculateHash(Tuple tuple, List<String> hashOn){
+ protected String computeHash(Tuple tuple, List<String> hashOn){
StringBuilder sb = new StringBuilder();
for(String part : hashOn){
Object obj = tuple.get(part);
@@ -225,7 +225,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
sb.append("::"); // this is here to separate fields
}
- return sb.toString().hashCode();
+ return sb.toString();
}
public void close() throws IOException {
@@ -246,7 +246,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
// If fullTuple doesn't have a valid hash or if there is no doc to
// join with then retry loop - keep going until we find one
- Integer fullHash = calculateHash(fullTuple, leftHashOn);
+ String fullHash = computeHash(fullTuple, leftHashOn);
if(null == fullHash || !hashedTuples.containsKey(fullHash)){
continue findNextWorkingFullTuple;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f506bc9c/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java
index 813dc79..aaec111 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java
@@ -100,7 +100,7 @@ public class OuterHashJoinStream extends HashJoinStream implements Expressible {
// If fullTuple doesn't have a valid hash or the hash cannot be found in the hashedTuples then
// return the tuple from fullStream.
// This is an outer join so there is no requirement there be a matching value in the hashed stream
- Integer fullHash = calculateHash(fullTuple, leftHashOn);
+ String fullHash = computeHash(fullTuple, leftHashOn);
if(null == fullHash || !hashedTuples.containsKey(fullHash)){
return fullTuple.clone();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f506bc9c/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
index 2afc74f..a2412df 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
@@ -1942,6 +1942,82 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
solrClientCache.close();
}
}
+
+ @Test
+ public void testHashJoinStreamWithKnownConflict() throws Exception {
+
+ new UpdateRequest()
+ .add(id, "1", "type_s","left", "bbid_s", "MG!!00TNH1", "ykey_s", "Mtge")
+ .add(id, "2", "type_s","right", "bbid_s", "MG!!00TNGP", "ykey_s", "Mtge")
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+ StreamExpression expression;
+ TupleStream stream;
+ List<Tuple> tuples;
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ StreamFactory factory = new StreamFactory()
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+ .withFunctionName("search", CloudSolrStream.class)
+ .withFunctionName("hashJoin", HashJoinStream.class);
+ try {
+ // Basic test
+ expression = StreamExpressionParser.parse("hashJoin("
+ + " search(collection1, q=*:*, fl=\"bbid_s,ykey_s\", fq=\"type_s:left\", sort=\"bbid_s asc, ykey_s asc\"),"
+ + " hashed=search(collection1, q=*:*, fl=\"bbid_s,ykey_s\", fq=\"type_s:right\", sort=\"bbid_s asc, ykey_s asc\"),"
+ + " on=\"bbid_s,ykey_s\""
+ + ")");
+ stream = new HashJoinStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assertEquals(0, tuples.size());
+
+
+ } finally {
+ solrClientCache.close();
+ }
+ }
+
+ @Test
+ public void testOuterHashJoinStreamWithKnownConflict() throws Exception {
+
+ new UpdateRequest()
+ .add(id, "1", "type_s","left", "bbid_s", "MG!!00TNH1", "ykey_s", "Mtge")
+ .add(id, "2", "type_s","right", "bbid_s", "MG!!00TNGP", "ykey_s", "Mtge", "extra_s", "foo")
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+ StreamExpression expression;
+ TupleStream stream;
+ List<Tuple> tuples;
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ StreamFactory factory = new StreamFactory()
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+ .withFunctionName("search", CloudSolrStream.class)
+ .withFunctionName("outerHashJoin", OuterHashJoinStream.class);
+ try {
+ // Basic test
+ expression = StreamExpressionParser.parse("outerHashJoin("
+ + " search(collection1, q=*:*, fl=\"bbid_s,ykey_s\", fq=\"type_s:left\", sort=\"bbid_s asc, ykey_s asc\"),"
+ + " hashed=search(collection1, q=*:*, fl=\"bbid_s,ykey_s,extra_s\", fq=\"type_s:right\", sort=\"bbid_s asc, ykey_s asc\"),"
+ + " on=\"bbid_s,ykey_s\""
+ + ")");
+ stream = new OuterHashJoinStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assertEquals(1, tuples.size());
+ assertFalse(tuples.get(0).fields.containsKey("extra_s"));
+
+ } finally {
+ solrClientCache.close();
+ }
+ }
@Test
public void testOuterHashJoinStream() throws Exception {