You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/04/06 17:12:45 UTC

svn commit: r1671547 - in /lucene/dev/trunk/solr/solrj/src: java/org/apache/solr/client/solrj/io/CloudSolrStream.java test/org/apache/solr/client/solrj/io/StreamingTest.java

Author: jbernste
Date: Mon Apr  6 15:12:44 2015
New Revision: 1671547

URL: http://svn.apache.org/r1671547
Log:
SOLR-7352: Synchronize CloudSolrStream EOF Tuple Map

Modified:
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java?rev=1671547&r1=1671546&r2=1671547&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java Mon Apr  6 15:12:44 2015
@@ -108,7 +108,7 @@ public class CloudSolrStream extends Tup
   public void open() throws IOException {
     this.tuples = new TreeSet();
     this.solrStreams = new ArrayList();
-    this.eofTuples = new HashMap();
+    this.eofTuples = Collections.synchronizedMap(new HashMap());
     if(this.cache != null) {
       this.cloudSolrClient = this.cache.getCloudSolrClient(zkHost);
     } else {

Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java?rev=1671547&r1=1671546&r2=1671547&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java Mon Apr  6 15:12:44 2015
@@ -228,7 +228,7 @@ public class StreamingTest extends Abstr
 
     String zkHost = zkServer.getZkAddress();
 
-    Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
+    Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
     CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
     RankStream rstream = new RankStream(stream, 3, new DescFieldComp("a_i"));
     List<Tuple> tuples = getTuples(rstream);
@@ -259,7 +259,7 @@ public class StreamingTest extends Abstr
 
     String zkHost = zkServer.getZkAddress();
 
-    Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
+    Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
     CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
     RankStream rstream = new RankStream(stream, 11, new DescFieldComp("a_i"));
     ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new DescFieldComp("a_i"));
@@ -335,7 +335,7 @@ public class StreamingTest extends Abstr
 
     Tuple t0 = tuples.get(0);
     List<Map> maps0 = t0.getMaps();
-    assertMaps(maps0, 0, 2,1, 9);
+    assertMaps(maps0, 0, 2, 1, 9);
 
     Tuple t1 = tuples.get(1);
     List<Map> maps1 = t1.getMaps();
@@ -351,6 +351,37 @@ public class StreamingTest extends Abstr
     commit();
   }
 
+  private void testZeroReducerStream() throws Exception {
+
+    //Gracefully handle zero results
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+
+    //Test with spaces in the parameter lists.
+    Map paramsA = mapParams("q", "blah", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    ReducerStream rstream = new ReducerStream(stream, new AscFieldComp("a_s"));
+
+    List<Tuple> tuples = getTuples(rstream);
+
+    assert(tuples.size() == 0);
+
+    del("*:*");
+    commit();
+  }
+
 
   private void testParallelReducerStream() throws Exception {
 
@@ -423,6 +454,35 @@ public class StreamingTest extends Abstr
     commit();
   }
 
+  private void testZeroParallelReducerStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+
+    Map paramsA = mapParams("q","blah","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    ReducerStream rstream = new ReducerStream(stream, new AscFieldComp("a_s"));
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new AscFieldComp("a_s"));
+
+    List<Tuple> tuples = getTuples(pstream);
+    assert(tuples.size() == 0);
+    del("*:*");
+    commit();
+  }
+
+
   private void testTuple() throws Exception {
 
     indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "5.1", "s_multi", "a", "s_multi", "b", "i_multi", "1", "i_multi", "2", "f_multi", "1.2", "f_multi", "1.3");
@@ -698,11 +758,13 @@ public class StreamingTest extends Abstr
     testRankStream();
     testMergeStream();
     testReducerStream();
+    testZeroReducerStream();
     testParallelEOF();
     testParallelUniqueStream();
     testParallelRankStream();
     testParallelMergeStream();
     testParallelReducerStream();
+    testZeroParallelReducerStream();
   }
 
   protected Map mapParams(String... vals) {