You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/04/06 13:57:34 UTC
[08/12] lucene-solr:jira/solr-9959: SOLR-10426: Add shuffle Streaming
Expression
SOLR-10426: Add shuffle Streaming Expression
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/37b6c605
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/37b6c605
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/37b6c605
Branch: refs/heads/jira/solr-9959
Commit: 37b6c60548f3288ee057dbd8ce1e1594ab48d314
Parents: dbd22a6
Author: Joel Bernstein <jb...@apache.org>
Authored: Wed Apr 5 17:57:11 2017 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Wed Apr 5 17:57:24 2017 -0400
----------------------------------------------------------------------
.../org/apache/solr/handler/StreamHandler.java | 39 +------
.../client/solrj/io/stream/CloudSolrStream.java | 13 ++-
.../client/solrj/io/stream/ShuffleStream.java | 103 +++++++++++++++++++
.../solrj/io/stream/StreamExpressionTest.java | 86 +++++++++++++++-
4 files changed, 199 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37b6c605/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index 599924e..8f123ec 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -74,43 +74,7 @@ import org.apache.solr.client.solrj.io.ops.ConcatOperation;
import org.apache.solr.client.solrj.io.ops.DistinctOperation;
import org.apache.solr.client.solrj.io.ops.GroupOperation;
import org.apache.solr.client.solrj.io.ops.ReplaceOperation;
-import org.apache.solr.client.solrj.io.stream.CartesianProductStream;
-import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
-import org.apache.solr.client.solrj.io.stream.CommitStream;
-import org.apache.solr.client.solrj.io.stream.ComplementStream;
-import org.apache.solr.client.solrj.io.stream.DaemonStream;
-import org.apache.solr.client.solrj.io.stream.ExceptionStream;
-import org.apache.solr.client.solrj.io.stream.ExecutorStream;
-import org.apache.solr.client.solrj.io.stream.FacetStream;
-import org.apache.solr.client.solrj.io.stream.FeaturesSelectionStream;
-import org.apache.solr.client.solrj.io.stream.FetchStream;
-import org.apache.solr.client.solrj.io.stream.HashJoinStream;
-import org.apache.solr.client.solrj.io.stream.HavingStream;
-import org.apache.solr.client.solrj.io.stream.InnerJoinStream;
-import org.apache.solr.client.solrj.io.stream.IntersectStream;
-import org.apache.solr.client.solrj.io.stream.JDBCStream;
-import org.apache.solr.client.solrj.io.stream.LeftOuterJoinStream;
-import org.apache.solr.client.solrj.io.stream.MergeStream;
-import org.apache.solr.client.solrj.io.stream.ModelStream;
-import org.apache.solr.client.solrj.io.stream.NullStream;
-import org.apache.solr.client.solrj.io.stream.OuterHashJoinStream;
-import org.apache.solr.client.solrj.io.stream.ParallelStream;
-import org.apache.solr.client.solrj.io.stream.PriorityStream;
-import org.apache.solr.client.solrj.io.stream.RandomStream;
-import org.apache.solr.client.solrj.io.stream.RankStream;
-import org.apache.solr.client.solrj.io.stream.ReducerStream;
-import org.apache.solr.client.solrj.io.stream.RollupStream;
-import org.apache.solr.client.solrj.io.stream.ScoreNodesStream;
-import org.apache.solr.client.solrj.io.stream.SelectStream;
-import org.apache.solr.client.solrj.io.stream.SignificantTermsStream;
-import org.apache.solr.client.solrj.io.stream.SortStream;
-import org.apache.solr.client.solrj.io.stream.StatsStream;
-import org.apache.solr.client.solrj.io.stream.StreamContext;
-import org.apache.solr.client.solrj.io.stream.TextLogitStream;
-import org.apache.solr.client.solrj.io.stream.TopicStream;
-import org.apache.solr.client.solrj.io.stream.TupleStream;
-import org.apache.solr.client.solrj.io.stream.UniqueStream;
-import org.apache.solr.client.solrj.io.stream.UpdateStream;
+import org.apache.solr.client.solrj.io.stream.*;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
@@ -223,6 +187,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
.withFunctionName("priority", PriorityStream.class)
.withFunctionName("significantTerms", SignificantTermsStream.class)
.withFunctionName("cartesianProduct", CartesianProductStream.class)
+ .withFunctionName("shuffle", ShuffleStream.class)
// metrics
.withFunctionName("min", MinMetric.class)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37b6c605/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index 1acd79d..7161dc4 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -80,7 +80,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
protected String zkHost;
protected String collection;
protected SolrParams params;
- private Map<String, String> fieldMappings;
+ protected Map<String, String> fieldMappings;
protected StreamComparator comp;
private boolean trace;
protected transient Map<String, Tuple> eofTuples;
@@ -191,7 +191,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
// functionName(collectionName, param1, param2, ..., paramN, sort="comp", [aliases="field=alias,..."])
// function name
- StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(getClass()));
// collection
expression.addParameter(collection);
@@ -254,7 +254,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
return explanation;
}
- private void init(String collectionName, String zkHost, SolrParams params) throws IOException {
+ protected void init(String collectionName, String zkHost, SolrParams params) throws IOException {
this.zkHost = zkHost;
this.collection = collectionName;
this.params = new ModifiableSolrParams(params);
@@ -405,7 +405,8 @@ public class CloudSolrStream extends TupleStream implements Expressible {
Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, true);
- ModifiableSolrParams mParams = new ModifiableSolrParams(params);
+ ModifiableSolrParams mParams = new ModifiableSolrParams(params);
+ mParams = adjustParams(mParams);
mParams.set(DISTRIB, "false"); // We are the aggregator.
Set<String> liveNodes = clusterState.getLiveNodes();
@@ -571,4 +572,8 @@ public class CloudSolrStream extends TupleStream implements Expressible {
}
}
}
+
+ protected ModifiableSolrParams adjustParams(ModifiableSolrParams params) {
+ return params;
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37b6c605/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ShuffleStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ShuffleStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ShuffleStream.java
new file mode 100644
index 0000000..d30918b
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ShuffleStream.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+
+
+public class ShuffleStream extends CloudSolrStream implements Expressible {
+
+ public ShuffleStream(StreamExpression expression, StreamFactory factory) throws IOException {
+ // grab all parameters out
+ String collectionName = factory.getValueOperand(expression, 0);
+ List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
+ StreamExpressionNamedParameter aliasExpression = factory.getNamedOperand(expression, "aliases");
+ StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
+
+ // Collection Name
+ if(null == collectionName){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
+ }
+
+ // Validate there are no unknown parameters - zkHost and alias are namedParameter so we don't need to count it twice
+ if(expression.getParameters().size() != 1 + namedParams.size()){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - unknown operands found",expression));
+ }
+
+ // Named parameters - passed directly to solr as solrparams
+ if(0 == namedParams.size()){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression));
+ }
+
+ ModifiableSolrParams mParams = new ModifiableSolrParams();
+ for(StreamExpressionNamedParameter namedParam : namedParams){
+ if(!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("aliases")){
+ mParams.add(namedParam.getName(), namedParam.getParameter().toString().trim());
+ }
+ }
+
+ // Aliases, optional, if provided then need to split
+ if(null != aliasExpression && aliasExpression.getParameter() instanceof StreamExpressionValue){
+ fieldMappings = new HashMap<>();
+ for(String mapping : ((StreamExpressionValue)aliasExpression.getParameter()).getValue().split(",")){
+ String[] parts = mapping.trim().split("=");
+ if(2 == parts.length){
+ fieldMappings.put(parts[0], parts[1]);
+ }
+ else{
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - alias expected of the format origName=newName",expression));
+ }
+ }
+ }
+
+ // zkHost, optional - if not provided then will look into factory list to get
+ String zkHost = null;
+ if(null == zkHostExpression){
+ zkHost = factory.getCollectionZkHost(collectionName);
+ if(zkHost == null) {
+ zkHost = factory.getDefaultZkHost();
+ }
+ }
+ else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
+ zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
+ }
+ if(null == zkHost){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
+ }
+
+ // We've got all the required items
+ init(collectionName, zkHost, mParams);
+ }
+
+ public ModifiableSolrParams adjustParams(ModifiableSolrParams mParams) {
+ mParams.set(CommonParams.QT, "/export");
+ return mParams;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/37b6c605/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 581013f..bb0bd7e 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -1096,7 +1096,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assertTrue("blah blah blah 9".equals(t.getString("subject")));
//Change the batch size
- stream = factory.constructStream("fetch("+ COLLECTIONORALIAS +", search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\")");
+ stream = factory.constructStream("fetch(" + COLLECTIONORALIAS + ", search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\")");
context = new StreamContext();
context.setSolrClientCache(solrClientCache);
stream.setStreamContext(context);
@@ -1603,6 +1603,90 @@ public class StreamExpressionTest extends SolrCloudTestCase {
}
@Test
+ public void testParallelShuffleStream() throws Exception {
+
+ new UpdateRequest()
+ .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+ .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+ .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+ .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+ .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+ .add(id, "5", "a_s", "hello1", "a_i", "10", "a_f", "1")
+ .add(id, "6", "a_s", "hello1", "a_i", "11", "a_f", "5")
+ .add(id, "7", "a_s", "hello1", "a_i", "12", "a_f", "5")
+ .add(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "9", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "10", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "11", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "12", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "13", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "14", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "15", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "16", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "17", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "18", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "19", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "20", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "21", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "22", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "23", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "24", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "25", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "26", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "27", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "28", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "29", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "30", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "31", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "32", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "33", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "34", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "35", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "36", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "37", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "38", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "39", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "40", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "41", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "42", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "43", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "44", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "45", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "46", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "47", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "48", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "49", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "50", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "51", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "52", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "53", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "54", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "55", "a_s", "hello1", "a_i", "13", "a_f", "4")
+ .add(id, "56", "a_s", "hello1", "a_i", "13", "a_f", "1000")
+
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+ String zkHost = cluster.getZkServer().getZkAddress();
+ StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, zkHost)
+ .withFunctionName("shuffle", ShuffleStream.class)
+ .withFunctionName("unique", UniqueStream.class)
+ .withFunctionName("parallel", ParallelStream.class);
+
+ ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(shuffle(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_f asc\")");
+
+ List<Tuple> tuples = getTuples(pstream);
+ assert(tuples.size() == 6);
+ assertOrder(tuples, 0, 1, 3, 4, 6, 56);
+
+ //Test the eofTuples
+
+ Map<String,Tuple> eofTuples = pstream.getEofTuples();
+ assert(eofTuples.size() == 2); //There should be an EOF tuple for each worker.
+ assert(pstream.toExpression(streamFactory).toString().contains("shuffle"));
+ }
+
+
+ @Test
public void testParallelReducerStream() throws Exception {
new UpdateRequest()