You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dp...@apache.org on 2018/05/18 21:19:26 UTC

lucene-solr:master: SOLR-12355: Fixes hash conflict in HashJoinStream and OuterHashJoinStream

Repository: lucene-solr
Updated Branches:
  refs/heads/master 4da0d6898 -> f506bc9cb


SOLR-12355: Fixes hash conflict in HashJoinStream and OuterHashJoinStream


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/f506bc9c
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/f506bc9c
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/f506bc9c

Branch: refs/heads/master
Commit: f506bc9cb7f1e82295c9c367487d49a80e767731
Parents: 4da0d68
Author: Dennis Gove <dp...@gmail.com>
Authored: Tue May 15 10:23:25 2018 -0400
Committer: Dennis Gove <dp...@gmail.com>
Committed: Fri May 18 17:14:39 2018 -0400

----------------------------------------------------------------------
 solr/CHANGES.txt                                |  2 +
 .../client/solrj/io/stream/HashJoinStream.java  | 12 ++--
 .../solrj/io/stream/OuterHashJoinStream.java    |  2 +-
 .../solrj/io/stream/StreamDecoratorTest.java    | 76 ++++++++++++++++++++
 4 files changed, 85 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f506bc9c/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 980d20b..7187360 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -223,6 +223,8 @@ Bug Fixes
 
 * SOLR-12200: abandon OverseerExitThread when ZkController is closed. (Mikhail Khludnev) 
 
+* SOLR-12355: Fixes hash conflict in HashJoinStream and OuterHashJoinStream (Dennis Gove)
+
 Optimizations
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f506bc9c/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java
index ffba0ca..5a534df 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java
@@ -50,10 +50,10 @@ public class HashJoinStream extends TupleStream implements Expressible {
   protected TupleStream fullStream;
   protected List<String> leftHashOn;
   protected List<String> rightHashOn;
-  protected HashMap<Integer, List<Tuple>> hashedTuples;
+  protected HashMap<String, List<Tuple>> hashedTuples;
   
   protected Tuple workingFullTuple = null;
-  protected Integer workingFullHash = null;
+  protected String workingFullHash = null;
   protected int workngHashSetIdx = 0;
   
   public HashJoinStream(TupleStream fullStream, TupleStream hashStream, List<String> hashOn) throws IOException {
@@ -199,7 +199,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
     
     Tuple tuple = hashStream.read();
     while(!tuple.EOF){
-      Integer hash = calculateHash(tuple, rightHashOn);
+      String hash = computeHash(tuple, rightHashOn);
       if(null != hash){
         if(hashedTuples.containsKey(hash)){
           hashedTuples.get(hash).add(tuple);
@@ -214,7 +214,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
     }
   }
   
-  protected Integer calculateHash(Tuple tuple, List<String> hashOn){
+  protected String computeHash(Tuple tuple, List<String> hashOn){
     StringBuilder sb = new StringBuilder();
     for(String part : hashOn){
       Object obj = tuple.get(part);
@@ -225,7 +225,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
       sb.append("::"); // this is here to separate fields
     }
     
-    return sb.toString().hashCode();
+    return sb.toString();
   }
 
   public void close() throws IOException {
@@ -246,7 +246,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
       
       // If fullTuple doesn't have a valid hash or if there is no doc to 
       // join with then retry loop - keep going until we find one
-      Integer fullHash = calculateHash(fullTuple, leftHashOn);
+      String fullHash = computeHash(fullTuple, leftHashOn);
       if(null == fullHash || !hashedTuples.containsKey(fullHash)){
         continue findNextWorkingFullTuple;
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f506bc9c/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java
index 813dc79..aaec111 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java
@@ -100,7 +100,7 @@ public class OuterHashJoinStream extends HashJoinStream implements Expressible {
       // If fullTuple doesn't have a valid hash or the hash cannot be found in the hashedTuples then
       // return the tuple from fullStream.
       // This is an outer join so there is no requirement there be a matching value in the hashed stream
-      Integer fullHash = calculateHash(fullTuple, leftHashOn);
+      String fullHash = computeHash(fullTuple, leftHashOn);
       if(null == fullHash || !hashedTuples.containsKey(fullHash)){
         return fullTuple.clone();
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f506bc9c/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
index 2afc74f..a2412df 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
@@ -1942,6 +1942,82 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
       solrClientCache.close();
     }
   }
+  
+  @Test
+  public void testHashJoinStreamWithKnownConflict() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "1", "type_s","left", "bbid_s", "MG!!00TNH1", "ykey_s", "Mtge")
+        .add(id, "2", "type_s","right", "bbid_s", "MG!!00TNGP", "ykey_s", "Mtge")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+    
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    
+    StreamFactory factory = new StreamFactory()
+      .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+      .withFunctionName("search", CloudSolrStream.class)
+      .withFunctionName("hashJoin", HashJoinStream.class);
+    try {
+      // Basic test
+      expression = StreamExpressionParser.parse("hashJoin("
+          + "  search(collection1, q=*:*, fl=\"bbid_s,ykey_s\", fq=\"type_s:left\", sort=\"bbid_s asc, ykey_s asc\"),"
+          + "  hashed=search(collection1, q=*:*, fl=\"bbid_s,ykey_s\", fq=\"type_s:right\", sort=\"bbid_s asc, ykey_s asc\"),"
+          + "  on=\"bbid_s,ykey_s\""
+          + ")");
+      stream = new HashJoinStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      
+      assertEquals(0, tuples.size());
+    
+    
+    } finally {
+      solrClientCache.close();
+    }
+  }
+
+  @Test
+  public void testOuterHashJoinStreamWithKnownConflict() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "1", "type_s","left", "bbid_s", "MG!!00TNH1", "ykey_s", "Mtge")
+        .add(id, "2", "type_s","right", "bbid_s", "MG!!00TNGP", "ykey_s", "Mtge", "extra_s", "foo")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+    
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    
+    StreamFactory factory = new StreamFactory()
+      .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+      .withFunctionName("search", CloudSolrStream.class)
+      .withFunctionName("outerHashJoin", OuterHashJoinStream.class);
+    try {
+      // Basic test
+      expression = StreamExpressionParser.parse("outerHashJoin("
+          + "  search(collection1, q=*:*, fl=\"bbid_s,ykey_s\", fq=\"type_s:left\", sort=\"bbid_s asc, ykey_s asc\"),"
+          + "  hashed=search(collection1, q=*:*, fl=\"bbid_s,ykey_s,extra_s\", fq=\"type_s:right\", sort=\"bbid_s asc, ykey_s asc\"),"
+          + "  on=\"bbid_s,ykey_s\""
+          + ")");
+      stream = new OuterHashJoinStream(expression, factory);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      
+      assertEquals(1, tuples.size());
+      assertFalse(tuples.get(0).fields.containsKey("extra_s"));
+    
+    } finally {
+      solrClientCache.close();
+    }
+  }
 
   @Test
   public void testOuterHashJoinStream() throws Exception {