You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/03/10 15:25:53 UTC
svn commit: r1665565 - in /lucene/dev/trunk/solr/solrj/src:
java/org/apache/solr/client/solrj/io/ParallelStream.java
test/org/apache/solr/client/solrj/io/StreamingTest.java
Author: jbernste
Date: Tue Mar 10 14:25:53 2015
New Revision: 1665565
URL: http://svn.apache.org/r1665565
Log:
SOLR-7225: ParallelStream chooses workers incorrectly
Modified:
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/ParallelStream.java
lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/ParallelStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/ParallelStream.java?rev=1665565&r1=1665564&r2=1665565&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/ParallelStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/ParallelStream.java Tue Mar 10 14:25:53 2015
@@ -129,30 +129,36 @@ public class ParallelStream extends Clou
ClusterState clusterState = zkStateReader.getClusterState();
Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
long time = System.currentTimeMillis();
- int workerNum = 0;
+ List<Replica> shuffler = new ArrayList();
for(Slice slice : slices) {
- HashMap params = new HashMap();
+ Collection<Replica> replicas = slice.getReplicas();
+ for (Replica replica : replicas) {
+ shuffler.add(replica);
+ }
+ }
+
+ if(workers > shuffler.size()) {
+ throw new IOException("Number of workers exceeds nodes in the worker collection");
+ }
+
+ Collections.shuffle(shuffler, new Random(time));
+ for(int w=0; w<workers; w++) {
+ HashMap params = new HashMap();
params.put("distrib","false"); // We are the aggregator.
params.put("numWorkers", workers);
- params.put("workerID", workerNum);
+ params.put("workerID", w);
params.put("stream", this.encoded);
params.put("qt","/stream");
-
- Collection<Replica> replicas = slice.getReplicas();
- List<Replica> shuffler = new ArrayList();
- for(Replica replica : replicas) {
- shuffler.add(replica);
- }
-
- Collections.shuffle(shuffler, new Random(time));
- Replica rep = shuffler.get(0);
+ Replica rep = shuffler.get(w);
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
String url = zkProps.getCoreUrl();
SolrStream solrStream = new SolrStream(url, params);
solrStreams.add(solrStream);
- ++workerNum;
}
+
+ assert(solrStreams.size() == workers);
+
} catch (Exception e) {
throw new IOException(e);
}
Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java?rev=1665565&r1=1665564&r2=1665565&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java Tue Mar 10 14:25:53 2015
@@ -907,6 +907,34 @@ public class StreamingTest extends Abstr
commit();
}
+ private void testParallelStreamSingleWorker() throws Exception {
+
+ indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+ indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+ indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+ indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+ indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+
+ commit();
+
+ String zkHost = zkServer.getZkAddress();
+
+ Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc", "partitionKeys","a_s");
+ CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+ Map paramsB = mapParams("q","id:(0 2)","fl","a_s","sort", "a_s asc", "partitionKeys","a_s");
+ CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+
+ FilterStream fstream = new FilterStream(streamA, streamB, new AscFieldComp("a_s"));
+ ParallelStream pstream = new ParallelStream(zkHost,"collection1", fstream, 1, new AscFieldComp("a_s"));
+ List<Tuple> tuples = getTuples(pstream);
+
+ assert(tuples.size() == 2);
+ assertOrder(tuples, 0,2);
+
+ del("*:*");
+ commit();
+ }
private void testParallelHashJoinStream() {
@@ -1119,6 +1147,7 @@ public class StreamingTest extends Abstr
testHashJoinStream();
testMergeJoinStream();
testMergeStream();
+ testParallelStreamSingleWorker();
testParallelStream();
testParallelRollupStream();
testParallelMetricStream();