You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/04/06 17:12:45 UTC
svn commit: r1671547 - in /lucene/dev/trunk/solr/solrj/src:
java/org/apache/solr/client/solrj/io/CloudSolrStream.java
test/org/apache/solr/client/solrj/io/StreamingTest.java
Author: jbernste
Date: Mon Apr 6 15:12:44 2015
New Revision: 1671547
URL: http://svn.apache.org/r1671547
Log:
SOLR-7352: Synchronize CloudSolrStream EOF Tuple Map
Modified:
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java
lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java?rev=1671547&r1=1671546&r2=1671547&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java Mon Apr 6 15:12:44 2015
@@ -108,7 +108,7 @@ public class CloudSolrStream extends Tup
public void open() throws IOException {
this.tuples = new TreeSet();
this.solrStreams = new ArrayList();
- this.eofTuples = new HashMap();
+ this.eofTuples = Collections.synchronizedMap(new HashMap());
if(this.cache != null) {
this.cloudSolrClient = this.cache.getCloudSolrClient(zkHost);
} else {
Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java?rev=1671547&r1=1671546&r2=1671547&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java Mon Apr 6 15:12:44 2015
@@ -228,7 +228,7 @@ public class StreamingTest extends Abstr
String zkHost = zkServer.getZkAddress();
- Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
+ Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
RankStream rstream = new RankStream(stream, 3, new DescFieldComp("a_i"));
List<Tuple> tuples = getTuples(rstream);
@@ -259,7 +259,7 @@ public class StreamingTest extends Abstr
String zkHost = zkServer.getZkAddress();
- Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
+ Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
RankStream rstream = new RankStream(stream, 11, new DescFieldComp("a_i"));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new DescFieldComp("a_i"));
@@ -335,7 +335,7 @@ public class StreamingTest extends Abstr
Tuple t0 = tuples.get(0);
List<Map> maps0 = t0.getMaps();
- assertMaps(maps0, 0, 2,1, 9);
+ assertMaps(maps0, 0, 2, 1, 9);
Tuple t1 = tuples.get(1);
List<Map> maps1 = t1.getMaps();
@@ -351,6 +351,37 @@ public class StreamingTest extends Abstr
commit();
}
+ private void testZeroReducerStream() throws Exception {
+
+ //Gracefully handle zero results
+ indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+ indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+ indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+ indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+ indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+ indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+ indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+ indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+ indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+ indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
+
+ commit();
+
+ String zkHost = zkServer.getZkAddress();
+
+ //Test with spaces in the parameter lists.
+ Map paramsA = mapParams("q", "blah", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+ ReducerStream rstream = new ReducerStream(stream, new AscFieldComp("a_s"));
+
+ List<Tuple> tuples = getTuples(rstream);
+
+ assert(tuples.size() == 0);
+
+ del("*:*");
+ commit();
+ }
+
private void testParallelReducerStream() throws Exception {
@@ -423,6 +454,35 @@ public class StreamingTest extends Abstr
commit();
}
+ private void testZeroParallelReducerStream() throws Exception {
+
+ indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+ indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+ indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+ indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+ indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+ indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+ indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+ indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+ indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+ indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
+
+ commit();
+
+ String zkHost = zkServer.getZkAddress();
+
+ Map paramsA = mapParams("q","blah","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+ ReducerStream rstream = new ReducerStream(stream, new AscFieldComp("a_s"));
+ ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new AscFieldComp("a_s"));
+
+ List<Tuple> tuples = getTuples(pstream);
+ assert(tuples.size() == 0);
+ del("*:*");
+ commit();
+ }
+
+
private void testTuple() throws Exception {
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "5.1", "s_multi", "a", "s_multi", "b", "i_multi", "1", "i_multi", "2", "f_multi", "1.2", "f_multi", "1.3");
@@ -698,11 +758,13 @@ public class StreamingTest extends Abstr
testRankStream();
testMergeStream();
testReducerStream();
+ testZeroReducerStream();
testParallelEOF();
testParallelUniqueStream();
testParallelRankStream();
testParallelMergeStream();
testParallelReducerStream();
+ testZeroParallelReducerStream();
}
protected Map mapParams(String... vals) {