You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/03/10 15:25:53 UTC

svn commit: r1665565 - in /lucene/dev/trunk/solr/solrj/src: java/org/apache/solr/client/solrj/io/ParallelStream.java test/org/apache/solr/client/solrj/io/StreamingTest.java

Author: jbernste
Date: Tue Mar 10 14:25:53 2015
New Revision: 1665565

URL: http://svn.apache.org/r1665565
Log:
SOLR-7225: ParallelStream chooses workers incorrectly

Modified:
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/ParallelStream.java
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/ParallelStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/ParallelStream.java?rev=1665565&r1=1665564&r2=1665565&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/ParallelStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/ParallelStream.java Tue Mar 10 14:25:53 2015
@@ -129,30 +129,36 @@ public class ParallelStream extends Clou
       ClusterState clusterState = zkStateReader.getClusterState();
       Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
       long time = System.currentTimeMillis();
-      int workerNum = 0;
+      List<Replica> shuffler = new ArrayList();
       for(Slice slice : slices) {
-        HashMap params = new HashMap();
+        Collection<Replica> replicas = slice.getReplicas();
+        for (Replica replica : replicas) {
+          shuffler.add(replica);
+        }
+      }
+
+      if(workers > shuffler.size()) {
+        throw new IOException("Number of workers exceeds nodes in the worker collection");
+      }
+
+      Collections.shuffle(shuffler, new Random(time));
 
+      for(int w=0; w<workers; w++) {
+        HashMap params = new HashMap();
         params.put("distrib","false"); // We are the aggregator.
         params.put("numWorkers", workers);
-        params.put("workerID", workerNum);
+        params.put("workerID", w);
         params.put("stream", this.encoded);
         params.put("qt","/stream");
-
-        Collection<Replica> replicas = slice.getReplicas();
-        List<Replica> shuffler = new ArrayList();
-        for(Replica replica : replicas) {
-          shuffler.add(replica);
-        }
-
-        Collections.shuffle(shuffler, new Random(time));
-        Replica rep = shuffler.get(0);
+        Replica rep = shuffler.get(w);
         ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
         String url = zkProps.getCoreUrl();
         SolrStream solrStream = new SolrStream(url, params);
         solrStreams.add(solrStream);
-        ++workerNum;
       }
+
+      assert(solrStreams.size() == workers);
+
     } catch (Exception e) {
       throw new IOException(e);
     }

Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java?rev=1665565&r1=1665564&r2=1665565&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java Tue Mar 10 14:25:53 2015
@@ -907,6 +907,34 @@ public class StreamingTest extends Abstr
     commit();
   }
 
+  private void testParallelStreamSingleWorker() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+
+    Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc", "partitionKeys","a_s");
+    CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+    Map paramsB = mapParams("q","id:(0 2)","fl","a_s","sort", "a_s asc", "partitionKeys","a_s");
+    CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+
+    FilterStream fstream = new FilterStream(streamA, streamB, new AscFieldComp("a_s"));
+    ParallelStream pstream = new ParallelStream(zkHost,"collection1", fstream, 1, new AscFieldComp("a_s"));
+    List<Tuple> tuples = getTuples(pstream);
+
+    assert(tuples.size() == 2);
+    assertOrder(tuples, 0,2);
+
+    del("*:*");
+    commit();
+  }
 
 
   private void testParallelHashJoinStream() {
@@ -1119,6 +1147,7 @@ public class StreamingTest extends Abstr
     testHashJoinStream();
     testMergeJoinStream();
     testMergeStream();
+    testParallelStreamSingleWorker();
     testParallelStream();
     testParallelRollupStream();
     testParallelMetricStream();