You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/04/06 13:57:34 UTC

[08/12] lucene-solr:jira/solr-9959: SOLR-10426: Add shuffle Streaming Expression

SOLR-10426: Add shuffle Streaming Expression


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/37b6c605
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/37b6c605
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/37b6c605

Branch: refs/heads/jira/solr-9959
Commit: 37b6c60548f3288ee057dbd8ce1e1594ab48d314
Parents: dbd22a6
Author: Joel Bernstein <jb...@apache.org>
Authored: Wed Apr 5 17:57:11 2017 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Wed Apr 5 17:57:24 2017 -0400

----------------------------------------------------------------------
 .../org/apache/solr/handler/StreamHandler.java  |  39 +------
 .../client/solrj/io/stream/CloudSolrStream.java |  13 ++-
 .../client/solrj/io/stream/ShuffleStream.java   | 103 +++++++++++++++++++
 .../solrj/io/stream/StreamExpressionTest.java   |  86 +++++++++++++++-
 4 files changed, 199 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37b6c605/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index 599924e..8f123ec 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -74,43 +74,7 @@ import org.apache.solr.client.solrj.io.ops.ConcatOperation;
 import org.apache.solr.client.solrj.io.ops.DistinctOperation;
 import org.apache.solr.client.solrj.io.ops.GroupOperation;
 import org.apache.solr.client.solrj.io.ops.ReplaceOperation;
-import org.apache.solr.client.solrj.io.stream.CartesianProductStream;
-import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
-import org.apache.solr.client.solrj.io.stream.CommitStream;
-import org.apache.solr.client.solrj.io.stream.ComplementStream;
-import org.apache.solr.client.solrj.io.stream.DaemonStream;
-import org.apache.solr.client.solrj.io.stream.ExceptionStream;
-import org.apache.solr.client.solrj.io.stream.ExecutorStream;
-import org.apache.solr.client.solrj.io.stream.FacetStream;
-import org.apache.solr.client.solrj.io.stream.FeaturesSelectionStream;
-import org.apache.solr.client.solrj.io.stream.FetchStream;
-import org.apache.solr.client.solrj.io.stream.HashJoinStream;
-import org.apache.solr.client.solrj.io.stream.HavingStream;
-import org.apache.solr.client.solrj.io.stream.InnerJoinStream;
-import org.apache.solr.client.solrj.io.stream.IntersectStream;
-import org.apache.solr.client.solrj.io.stream.JDBCStream;
-import org.apache.solr.client.solrj.io.stream.LeftOuterJoinStream;
-import org.apache.solr.client.solrj.io.stream.MergeStream;
-import org.apache.solr.client.solrj.io.stream.ModelStream;
-import org.apache.solr.client.solrj.io.stream.NullStream;
-import org.apache.solr.client.solrj.io.stream.OuterHashJoinStream;
-import org.apache.solr.client.solrj.io.stream.ParallelStream;
-import org.apache.solr.client.solrj.io.stream.PriorityStream;
-import org.apache.solr.client.solrj.io.stream.RandomStream;
-import org.apache.solr.client.solrj.io.stream.RankStream;
-import org.apache.solr.client.solrj.io.stream.ReducerStream;
-import org.apache.solr.client.solrj.io.stream.RollupStream;
-import org.apache.solr.client.solrj.io.stream.ScoreNodesStream;
-import org.apache.solr.client.solrj.io.stream.SelectStream;
-import org.apache.solr.client.solrj.io.stream.SignificantTermsStream;
-import org.apache.solr.client.solrj.io.stream.SortStream;
-import org.apache.solr.client.solrj.io.stream.StatsStream;
-import org.apache.solr.client.solrj.io.stream.StreamContext;
-import org.apache.solr.client.solrj.io.stream.TextLogitStream;
-import org.apache.solr.client.solrj.io.stream.TopicStream;
-import org.apache.solr.client.solrj.io.stream.TupleStream;
-import org.apache.solr.client.solrj.io.stream.UniqueStream;
-import org.apache.solr.client.solrj.io.stream.UpdateStream;
+import org.apache.solr.client.solrj.io.stream.*;
 import org.apache.solr.client.solrj.io.stream.expr.Explanation;
 import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
 import org.apache.solr.client.solrj.io.stream.expr.Expressible;
@@ -223,6 +187,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
       .withFunctionName("priority", PriorityStream.class)
       .withFunctionName("significantTerms", SignificantTermsStream.class)
       .withFunctionName("cartesianProduct", CartesianProductStream.class)
+      .withFunctionName("shuffle", ShuffleStream.class)
       
       // metrics
       .withFunctionName("min", MinMetric.class)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37b6c605/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index 1acd79d..7161dc4 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -80,7 +80,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
   protected String zkHost;
   protected String collection;
   protected SolrParams params;
-  private Map<String, String> fieldMappings;
+  protected Map<String, String> fieldMappings;
   protected StreamComparator comp;
   private boolean trace;
   protected transient Map<String, Tuple> eofTuples;
@@ -191,7 +191,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
     // functionName(collectionName, param1, param2, ..., paramN, sort="comp", [aliases="field=alias,..."])
     
     // function name
-    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(getClass()));
     
     // collection
     expression.addParameter(collection);
@@ -254,7 +254,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
     return explanation;
   }
 
-  private void init(String collectionName, String zkHost, SolrParams params) throws IOException {
+  protected void init(String collectionName, String zkHost, SolrParams params) throws IOException {
     this.zkHost = zkHost;
     this.collection = collectionName;
     this.params = new ModifiableSolrParams(params);
@@ -405,7 +405,8 @@ public class CloudSolrStream extends TupleStream implements Expressible {
 
       Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, true);
 
-      ModifiableSolrParams mParams = new ModifiableSolrParams(params); 
+      ModifiableSolrParams mParams = new ModifiableSolrParams(params);
+      mParams = adjustParams(mParams);
       mParams.set(DISTRIB, "false"); // We are the aggregator.
 
       Set<String> liveNodes = clusterState.getLiveNodes();
@@ -571,4 +572,8 @@ public class CloudSolrStream extends TupleStream implements Expressible {
       }
     }
   }
+
+  protected ModifiableSolrParams adjustParams(ModifiableSolrParams params) {
+    return params;
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37b6c605/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ShuffleStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ShuffleStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ShuffleStream.java
new file mode 100644
index 0000000..d30918b
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ShuffleStream.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+
+
+public class ShuffleStream extends CloudSolrStream implements Expressible {
+
+  public ShuffleStream(StreamExpression expression, StreamFactory factory) throws IOException {
+    // grab all parameters out
+    String collectionName = factory.getValueOperand(expression, 0);
+    List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
+    StreamExpressionNamedParameter aliasExpression = factory.getNamedOperand(expression, "aliases");
+    StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
+
+    // Collection Name
+    if(null == collectionName){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
+    }
+
+    // Validate there are no unknown parameters - zkHost and alias are namedParameter so we don't need to count it twice
+    if(expression.getParameters().size() != 1 + namedParams.size()){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - unknown operands found",expression));
+    }
+
+    // Named parameters - passed directly to solr as solrparams
+    if(0 == namedParams.size()){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression));
+    }
+
+    ModifiableSolrParams mParams = new ModifiableSolrParams();
+    for(StreamExpressionNamedParameter namedParam : namedParams){
+      if(!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("aliases")){
+        mParams.add(namedParam.getName(), namedParam.getParameter().toString().trim());
+      }
+    }
+
+    // Aliases, optional, if provided then need to split
+    if(null != aliasExpression && aliasExpression.getParameter() instanceof StreamExpressionValue){
+      fieldMappings = new HashMap<>();
+      for(String mapping : ((StreamExpressionValue)aliasExpression.getParameter()).getValue().split(",")){
+        String[] parts = mapping.trim().split("=");
+        if(2 == parts.length){
+          fieldMappings.put(parts[0], parts[1]);
+        }
+        else{
+          throw new IOException(String.format(Locale.ROOT,"invalid expression %s - alias expected of the format origName=newName",expression));
+        }
+      }
+    }
+
+    // zkHost, optional - if not provided then will look into factory list to get
+    String zkHost = null;
+    if(null == zkHostExpression){
+      zkHost = factory.getCollectionZkHost(collectionName);
+      if(zkHost == null) {
+        zkHost = factory.getDefaultZkHost();
+      }
+    }
+    else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
+      zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
+    }
+    if(null == zkHost){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
+    }
+
+    // We've got all the required items
+    init(collectionName, zkHost, mParams);
+  }
+
+  public ModifiableSolrParams adjustParams(ModifiableSolrParams mParams) {
+    mParams.set(CommonParams.QT, "/export");
+    return mParams;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37b6c605/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 581013f..bb0bd7e 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -1096,7 +1096,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     assertTrue("blah blah blah 9".equals(t.getString("subject")));
 
     //Change the batch size
-    stream = factory.constructStream("fetch("+ COLLECTIONORALIAS +",  search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\")");
+    stream = factory.constructStream("fetch(" + COLLECTIONORALIAS + ",  search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\")");
     context = new StreamContext();
     context.setSolrClientCache(solrClientCache);
     stream.setStreamContext(context);
@@ -1603,6 +1603,90 @@ public class StreamExpressionTest extends SolrCloudTestCase {
   }
 
   @Test
+  public void testParallelShuffleStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+        .add(id, "5", "a_s", "hello1", "a_i", "10", "a_f", "1")
+        .add(id, "6", "a_s", "hello1", "a_i", "11", "a_f", "5")
+        .add(id, "7", "a_s", "hello1", "a_i", "12", "a_f", "5")
+        .add(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "9", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "10", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "11", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "12", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "13", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "14", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "15", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "16", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "17", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "18", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "19", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "20", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "21", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "22", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "23", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "24", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "25", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "26", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "27", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "28", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "29", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "30", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "31", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "32", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "33", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "34", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "35", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "36", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "37", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "38", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "39", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "40", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "41", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "42", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "43", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "44", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "45", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "46", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "47", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "48", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "49", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "50", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "51", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "52", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "53", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "54", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "55", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .add(id, "56", "a_s", "hello1", "a_i", "13", "a_f", "1000")
+
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    String zkHost = cluster.getZkServer().getZkAddress();
+    StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, zkHost)
+        .withFunctionName("shuffle", ShuffleStream.class)
+        .withFunctionName("unique", UniqueStream.class)
+        .withFunctionName("parallel", ParallelStream.class);
+
+    ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(shuffle(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_f asc\")");
+
+    List<Tuple> tuples = getTuples(pstream);
+    assert(tuples.size() == 6);
+    assertOrder(tuples, 0, 1, 3, 4, 6, 56);
+
+    //Test the eofTuples
+
+    Map<String,Tuple> eofTuples = pstream.getEofTuples();
+    assert(eofTuples.size() == 2); //There should be an EOF tuple for each worker.
+    assert(pstream.toExpression(streamFactory).toString().contains("shuffle"));
+  }
+
+
+  @Test
   public void testParallelReducerStream() throws Exception {
 
     new UpdateRequest()