You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/11/28 18:48:40 UTC
[05/10] lucene-solr:jira/http2: SOLR-12984: The search Streaming
Expression should properly support and push down paging when using the
/select handler
SOLR-12984: The search Streaming Expression should properly support and push down paging when using the /select handler
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c2cac887
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c2cac887
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c2cac887
Branch: refs/heads/jira/http2
Commit: c2cac887702f9efc0a6bf75cd9f1e78f730c2c4f
Parents: 1534bbe
Author: Joel Bernstein <jb...@apache.org>
Authored: Tue Nov 27 11:21:42 2018 -0500
Committer: Joel Bernstein <jb...@apache.org>
Committed: Tue Nov 27 11:22:02 2018 -0500
----------------------------------------------------------------------
.../org/apache/solr/client/solrj/io/Lang.java | 3 +-
.../client/solrj/io/stream/CloudSolrStream.java | 6 +-
.../solrj/io/stream/SearchFacadeStream.java | 141 ++++++++++
.../client/solrj/io/stream/SearchStream.java | 269 +++++++++++++++++++
.../client/solrj/io/stream/ShuffleStream.java | 70 +++++
.../apache/solr/client/solrj/io/TestLang.java | 2 +-
.../solrj/io/stream/MathExpressionTest.java | 2 +-
.../solrj/io/stream/StreamDecoratorTest.java | 50 ++--
.../solrj/io/stream/StreamExpressionTest.java | 69 +++++
.../client/solrj/io/stream/StreamingTest.java | 32 +--
10 files changed, 597 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
index 13c91ec..050fa7e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
@@ -36,7 +36,7 @@ public class Lang {
public static void register(StreamFactory streamFactory) {
streamFactory
// source streams
- .withFunctionName("search", CloudSolrStream.class)
+ .withFunctionName("search", SearchFacadeStream.class)
.withFunctionName("facet", FacetStream.class)
.withFunctionName("update", UpdateStream.class)
.withFunctionName("jdbc", JDBCStream.class)
@@ -80,6 +80,7 @@ public class Lang {
.withFunctionName("significantTerms", SignificantTermsStream.class)
.withFunctionName("cartesianProduct", CartesianProductStream.class)
.withFunctionName("shuffle", ShuffleStream.class)
+ .withFunctionName("export", ShuffleStream.class)
.withFunctionName("calc", CalculatorStream.class)
.withFunctionName("eval", EvalStream.class)
.withFunctionName("echo", EchoStream.class)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index ddd9774..2cff0a7 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -167,7 +167,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
// functionName(collectionName, param1, param2, ..., paramN, sort="comp", [aliases="field=alias,..."])
// function name
- StreamExpression expression = new StreamExpression(factory.getFunctionName(getClass()));
+ StreamExpression expression = new StreamExpression("search");
// collection
expression.addParameter(collection);
@@ -206,7 +206,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
- explanation.setFunctionName(factory.getFunctionName(this.getClass()));
+ explanation.setFunctionName("search");
explanation.setImplementingClass(this.getClass().getName());
explanation.setExpressionType(ExpressionType.STREAM_SOURCE);
explanation.setExpression(toExpression(factory).toString());
@@ -226,7 +226,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
return explanation;
}
- protected void init(String collectionName, String zkHost, SolrParams params) throws IOException {
+ void init(String collectionName, String zkHost, SolrParams params) throws IOException {
this.zkHost = zkHost;
this.collection = collectionName;
this.params = new ModifiableSolrParams(params);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchFacadeStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchFacadeStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchFacadeStream.java
new file mode 100644
index 0000000..5e8b549
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchFacadeStream.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+
+/**
+ * Connects to Zookeeper to pick replicas from a specific collection to send the query to.
+ * Under the covers the SolrStream instances send the query to the replicas.
+ * SolrStreams are opened using a thread pool, but a single thread is used
+ * to iterate and merge Tuples from each SolrStream.
+ * @since 5.1.0
+ **/
+
+public class SearchFacadeStream extends TupleStream implements Expressible {
+
+ private static final long serialVersionUID = 1;
+ private TupleStream innerStream;
+
+ public SearchFacadeStream(StreamExpression expression, StreamFactory factory) throws IOException{
+ // grab all parameters out
+ String collectionName = factory.getValueOperand(expression, 0);
+ List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
+ StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
+
+ // Collection Name
+ if(null == collectionName){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
+ }
+
+
+ ModifiableSolrParams mParams = new ModifiableSolrParams();
+ for(StreamExpressionNamedParameter namedParam : namedParams){
+ if(!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("aliases")){
+ mParams.add(namedParam.getName(), namedParam.getParameter().toString().trim());
+ }
+ }
+
+ // zkHost, optional - if not provided then will look into factory list to get
+ String zkHost = null;
+ if(null == zkHostExpression){
+ zkHost = factory.getCollectionZkHost(collectionName);
+ if(zkHost == null) {
+ zkHost = factory.getDefaultZkHost();
+ }
+ }
+ else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
+ zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
+ }
+ /*
+ if(null == zkHost){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
+ }
+ */
+
+ if(mParams.get(CommonParams.QT) != null && mParams.get(CommonParams.QT).equals("/export")) {
+ CloudSolrStream cloudSolrStream = new CloudSolrStream();
+ cloudSolrStream.init(collectionName, zkHost, mParams);
+ this.innerStream = cloudSolrStream;
+ } else {
+
+ if(mParams.get("partitionKeys") != null) {
+ throw new IOException("partitionKeys can only be used in the search function when the /export handler is specified");
+ }
+
+ SearchStream searchStream = new SearchStream();
+ searchStream.init(zkHost, collectionName, mParams);
+ this.innerStream = searchStream;
+ }
+ }
+
+ @Override
+ public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+ return ((Expressible)innerStream).toExpression(factory);
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+ return innerStream.toExplanation(factory);
+ }
+
+ public void setStreamContext(StreamContext context) {
+ this.innerStream.setStreamContext(context);
+ }
+
+ /**
+ * Opens the CloudSolrStream
+ *
+ ***/
+ public void open() throws IOException {
+ innerStream.open();
+ }
+
+ public List<TupleStream> children() {
+ return innerStream.children();
+ }
+
+ /**
+ * Closes the CloudSolrStream
+ **/
+ public void close() throws IOException {
+ innerStream.close();
+ }
+
+
+ public Tuple read() throws IOException {
+ return innerStream.read();
+ }
+
+ public StreamComparator getStreamSort(){
+ return innerStream.getStreamSort();
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
new file mode 100644
index 0000000..2eeeb78
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+
+public class SearchStream extends TupleStream implements Expressible {
+
+ private String zkHost;
+ private ModifiableSolrParams params;
+ private String collection;
+ protected transient SolrClientCache cache;
+ protected transient CloudSolrClient cloudSolrClient;
+ private Iterator<SolrDocument> documentIterator;
+ protected StreamComparator comp;
+
+ public SearchStream() {}
+
+
+ public SearchStream(StreamExpression expression, StreamFactory factory) throws IOException{
+ // grab all parameters out
+ String collectionName = factory.getValueOperand(expression, 0);
+ List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
+ StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
+
+
+ // Collection Name
+ if(null == collectionName){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
+ }
+
+ // Named parameters - passed directly to solr as solrparams
+ if(0 == namedParams.size()){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression));
+ }
+
+ // pull out known named params
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ for(StreamExpressionNamedParameter namedParam : namedParams){
+ if(!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("buckets") && !namedParam.getName().equals("bucketSorts") && !namedParam.getName().equals("limit")){
+ params.add(namedParam.getName(), namedParam.getParameter().toString().trim());
+ }
+ }
+
+ // zkHost, optional - if not provided then will look into factory list to get
+ String zkHost = null;
+ if(null == zkHostExpression){
+ zkHost = factory.getCollectionZkHost(collectionName);
+ if(zkHost == null) {
+ zkHost = factory.getDefaultZkHost();
+ }
+ }
+ else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
+ zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
+ }
+ if(null == zkHost){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
+ }
+
+ // We've got all the required items
+ init(zkHost, collectionName, params);
+ }
+
+ void init(String zkHost, String collection, ModifiableSolrParams params) throws IOException {
+ this.zkHost = zkHost;
+ this.params = params;
+
+ if(this.params.get(CommonParams.Q) == null) {
+ this.params.add(CommonParams.Q, "*:*");
+ }
+ this.collection = collection;
+ if(params.get(CommonParams.SORT) != null) {
+ this.comp = parseComp(params.get(CommonParams.SORT), params.get(CommonParams.FL));
+ }
+ }
+
+ @Override
+ public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+ // function name
+ StreamExpression expression = new StreamExpression("search");
+
+ // collection
+ expression.addParameter(collection);
+
+ for (Entry<String, String[]> param : params.getMap().entrySet()) {
+ for (String val : param.getValue()) {
+ // SOLR-8409: Escaping the " is a special case.
+ // Do note that in any other BASE streams with parameters where a " might come into play
+ // that this same replacement needs to take place.
+ expression.addParameter(new StreamExpressionNamedParameter(param.getKey(),
+ val.replace("\"", "\\\"")));
+ }
+ }
+
+ // zkHost
+ expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
+
+ return expression;
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+ StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
+
+ explanation.setFunctionName("search");
+ explanation.setImplementingClass(this.getClass().getName());
+ explanation.setExpressionType(ExpressionType.STREAM_SOURCE);
+ explanation.setExpression(toExpression(factory).toString());
+
+ // child is a datastore so add it at this point
+ StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore");
+ child.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection));
+ child.setImplementingClass("Solr/Lucene");
+ child.setExpressionType(ExpressionType.DATASTORE);
+
+ explanation.addChild(child);
+
+ return explanation;
+ }
+
+ public void setStreamContext(StreamContext context) {
+ cache = context.getSolrClientCache();
+ }
+
+ public List<TupleStream> children() {
+ List<TupleStream> l = new ArrayList();
+ return l;
+ }
+
+ public void open() throws IOException {
+ if(cache != null) {
+ cloudSolrClient = cache.getCloudSolrClient(zkHost);
+ } else {
+ final List<String> hosts = new ArrayList<>();
+ hosts.add(zkHost);
+ cloudSolrClient = new CloudSolrClient.Builder(hosts, Optional.empty()).build();
+ }
+
+
+ QueryRequest request = new QueryRequest(params);
+ try {
+ QueryResponse response = request.process(cloudSolrClient, collection);
+ SolrDocumentList docs = response.getResults();
+ documentIterator = docs.iterator();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ public void close() throws IOException {
+ if(cache == null) {
+ cloudSolrClient.close();
+ }
+ }
+
+ public Tuple read() throws IOException {
+ if(documentIterator.hasNext()) {
+ Map map = new HashMap();
+ SolrDocument doc = documentIterator.next();
+ for(String key : doc.keySet()) {
+ map.put(key, doc.get(key));
+ }
+ return new Tuple(map);
+ } else {
+ Map fields = new HashMap();
+ fields.put("EOF", true);
+ Tuple tuple = new Tuple(fields);
+ return tuple;
+ }
+ }
+
+
+ public int getCost() {
+ return 0;
+ }
+
+ @Override
+ public StreamComparator getStreamSort() {
+ return comp;
+ }
+
+ private StreamComparator parseComp(String sort, String fl) throws IOException {
+
+ HashSet fieldSet = null;
+
+ if(fl != null) {
+ fieldSet = new HashSet();
+ String[] fls = fl.split(",");
+ for (String f : fls) {
+ fieldSet.add(f.trim()); //Handle spaces in the field list.
+ }
+ }
+
+ String[] sorts = sort.split(",");
+ StreamComparator[] comps = new StreamComparator[sorts.length];
+ for(int i=0; i<sorts.length; i++) {
+ String s = sorts[i];
+
+ String[] spec = s.trim().split("\\s+"); //This should take into account spaces in the sort spec.
+
+ if (spec.length != 2) {
+ throw new IOException("Invalid sort spec:" + s);
+ }
+
+ String fieldName = spec[0].trim();
+ String order = spec[1].trim();
+
+ if(fieldSet != null && !fieldSet.contains(spec[0])) {
+ throw new IOException("Fields in the sort spec must be included in the field list:"+spec[0]);
+ }
+
+ comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
+ }
+
+ if(comps.length > 1) {
+ return new MultipleFieldComparator(comps);
+ } else {
+ return comps[0];
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ShuffleStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ShuffleStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ShuffleStream.java
index 559228f..0047e2e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ShuffleStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ShuffleStream.java
@@ -21,8 +21,12 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
@@ -98,6 +102,72 @@ public class ShuffleStream extends CloudSolrStream implements Expressible {
init(collectionName, zkHost, mParams);
}
+ @Override
+ public StreamExpression toExpression(StreamFactory factory) throws IOException {
+ // functionName(collectionName, param1, param2, ..., paramN, sort="comp", [aliases="field=alias,..."])
+
+ // function name
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+ // collection
+ expression.addParameter(collection);
+
+ for (Map.Entry<String, String[]> param : params.getMap().entrySet()) {
+ for (String val : param.getValue()) {
+ // SOLR-8409: Escaping the " is a special case.
+ // Do note that in any other BASE streams with parameters where a " might come into play
+ // that this same replacement needs to take place.
+ expression.addParameter(new StreamExpressionNamedParameter(param.getKey(),
+ val.replace("\"", "\\\"")));
+ }
+ }
+
+ // zkHost
+ expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
+
+ // aliases
+ if(null != fieldMappings && 0 != fieldMappings.size()){
+ StringBuilder sb = new StringBuilder();
+ for(Map.Entry<String,String> mapping : fieldMappings.entrySet()){
+ if(sb.length() > 0){ sb.append(","); }
+ sb.append(mapping.getKey());
+ sb.append("=");
+ sb.append(mapping.getValue());
+ }
+
+ expression.addParameter(new StreamExpressionNamedParameter("aliases", sb.toString()));
+ }
+
+ return expression;
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+ StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
+
+ explanation.setFunctionName(factory.getFunctionName(this.getClass()));
+ explanation.setImplementingClass(this.getClass().getName());
+ explanation.setExpressionType(Explanation.ExpressionType.STREAM_SOURCE);
+ explanation.setExpression(toExpression(factory).toString());
+
+ // child is a datastore so add it at this point
+ StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore");
+ child.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection));
+ child.setImplementingClass("Solr/Lucene");
+ child.setExpressionType(Explanation.ExpressionType.DATASTORE);
+
+ if(null != params){
+ ModifiableSolrParams mParams = new ModifiableSolrParams(params);
+ child.setExpression(mParams.getMap().entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
+ }
+ explanation.addChild(child);
+
+ return explanation;
+ }
+
+
+
public ModifiableSolrParams adjustParams(ModifiableSolrParams mParams) {
mParams.set(CommonParams.QT, "/export");
return mParams;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
index 3adac54..3b238c2 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
@@ -73,7 +73,7 @@ public class TestLang extends LuceneTestCase {
"outliers", "stream", "getCache", "putCache", "listCache", "removeCache", "zscores", "latlonVectors",
"convexHull", "getVertices", "getBaryCenter", "getArea", "getBoundarySize","oscillate",
"getAmplitude", "getPhase", "getAngularFrequency", "enclosingDisk", "getCenter", "getRadius",
- "getSupportPoints", "pairSort", "log10", "plist", "recip", "pivot", "ltrim", "rtrim"};
+ "getSupportPoints", "pairSort", "log10", "plist", "recip", "pivot", "ltrim", "rtrim", "export"};
@Test
public void testLang() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
index 0ccd691..a45683c 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
@@ -5113,7 +5113,7 @@ public class MathExpressionTest extends SolrCloudTestCase {
d = (double)tuples.get(1).get("kilometers");
assertTrue(d == (double)(70*1.61));
- expr = "parallel("+COLLECTIONORALIAS+", workers=2, sort=\"miles_i asc\", select(search("+COLLECTIONORALIAS+", q=\"*:*\", partitionKeys=miles_i, sort=\"miles_i asc\", fl=\"miles_i\"), convert(miles, kilometers, miles_i) as kilometers))";
+ expr = "parallel("+COLLECTIONORALIAS+", workers=2, sort=\"miles_i asc\", select(search("+COLLECTIONORALIAS+", q=\"*:*\", partitionKeys=miles_i, sort=\"miles_i asc\", fl=\"miles_i\", qt=\"/export\"), convert(miles, kilometers, miles_i) as kilometers))";
paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", expr);
paramsLoc.set("qt", "/stream");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
index af79ea6..aa639d4 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
@@ -283,7 +283,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
try {
// Basic test
- stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"nullCount desc\", null(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), by=\"a_i asc\"))");
+ stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"nullCount desc\", null(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id, qt=\"/export\"), by=\"a_i asc\"))");
stream.setStreamContext(streamContext);
tuples = getTuples(stream);
assertTrue(tuples.size() == 2);
@@ -697,7 +697,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
.withFunctionName("val", RawValueEvaluator.class)
.withFunctionName("parallel", ParallelStream.class);
- stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), eq(a_i, 9)))");
+ stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id, qt=\"/export\"), eq(a_i, 9)))");
StreamContext context = new StreamContext();
context.setSolrClientCache(solrClientCache);
stream.setStreamContext(context);
@@ -707,7 +707,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
Tuple t = tuples.get(0);
assertTrue(t.getString("id").equals("9"));
- stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), and(eq(a_i, 9),lt(a_i, 10))))");
+ stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id, qt=\"/export\"), and(eq(a_i, 9),lt(a_i, 10))))");
context = new StreamContext();
context.setSolrClientCache(solrClientCache);
stream.setStreamContext(context);
@@ -717,7 +717,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
t = tuples.get(0);
assertTrue(t.getString("id").equals("9"));
- stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\",having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), or(eq(a_i, 9),eq(a_i, 8))))");
+ stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\",having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id, qt=\"/export\"), or(eq(a_i, 9),eq(a_i, 8))))");
context = new StreamContext();
context.setSolrClientCache(solrClientCache);
stream.setStreamContext(context);
@@ -731,7 +731,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
assertTrue(t.getString("id").equals("9"));
- stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), and(eq(a_i, 9),not(eq(a_i, 9)))))");
+ stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id, qt=\"/export\"), and(eq(a_i, 9),not(eq(a_i, 9)))))");
context = new StreamContext();
context.setSolrClientCache(solrClientCache);
stream.setStreamContext(context);
@@ -740,7 +740,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
assert(tuples.size() == 0);
- stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\",having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), and(lteq(a_i, 9), gteq(a_i, 8))))");
+ stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\",having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id, qt=\"/export\"), and(lteq(a_i, 9), gteq(a_i, 8))))");
context = new StreamContext();
context.setSolrClientCache(solrClientCache);
stream.setStreamContext(context);
@@ -754,7 +754,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
t = tuples.get(1);
assertTrue(t.getString("id").equals("9"));
- stream = factory.constructStream("parallel("+COLLECTIONORALIAS+", workers=2, sort=\"a_f asc\", having(rollup(over=a_f, sum(a_i), search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=a_f)), and(eq(sum(a_i), 9),eq(sum(a_i),9))))");
+ stream = factory.constructStream("parallel("+COLLECTIONORALIAS+", workers=2, sort=\"a_f asc\", having(rollup(over=a_f, sum(a_i), search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=a_f, qt=\"/export\")), and(eq(sum(a_i), 9),eq(sum(a_i),9))))");
context = new StreamContext();
context.setSolrClientCache(solrClientCache);
stream.setStreamContext(context);
@@ -901,7 +901,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
try {
- stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ", search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))");
+ stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ", search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\", qt=\"/export\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))");
stream.setStreamContext(streamContext);
tuples = getTuples(stream);
@@ -928,7 +928,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
assertTrue("blah blah blah 9".equals(t.getString("subject")));
- stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ", search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))");
+ stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ", search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\", qt=\"/export\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))");
stream.setStreamContext(streamContext);
tuples = getTuples(stream);
@@ -1274,7 +1274,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
try {
- ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_f asc\")");
+ ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\", qt=\"/export\"), over=\"a_f\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_f asc\")");
pstream.setStreamContext(streamContext);
List<Tuple> tuples = getTuples(pstream);
assert (tuples.size() == 5);
@@ -1414,7 +1414,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
try {
ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", " +
"reduce(" +
- "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), " +
+ "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\", qt=\"/export\"), " +
"by=\"a_s\"," +
"group(sort=\"a_i asc\", n=\"5\")), " +
"workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_s asc\")");
@@ -1440,7 +1440,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", " +
"reduce(" +
- "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), " +
+ "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\", qt=\"/export\"), " +
"by=\"a_s\", " +
"group(sort=\"a_i desc\", n=\"5\"))," +
"workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_s desc\")");
@@ -1500,7 +1500,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel("
+ COLLECTIONORALIAS + ", "
+ "top("
- + "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), "
+ + "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\", qt=\"/export\"), "
+ "n=\"11\", "
+ "sort=\"a_i desc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i desc\")");
pstream.setStreamContext(streamContext);
@@ -1545,7 +1545,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
streamContext.setSolrClientCache(solrClientCache);
try {
//Test ascending
- ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 7 9)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), on=\"a_i asc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i asc\")");
+ ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 7 9)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\", qt=\"/export\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\", qt=\"/export\"), on=\"a_i asc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i asc\")");
pstream.setStreamContext(streamContext);
List<Tuple> tuples = getTuples(pstream);
@@ -1554,7 +1554,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
//Test descending
- pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 9)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), on=\"a_i desc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i desc\")");
+ pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 9)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\", qt=\"/export\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\", qt=\"/export\"), on=\"a_i desc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i desc\")");
pstream.setStreamContext(streamContext);
tuples = getTuples(pstream);
@@ -1606,7 +1606,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
try {
expression = StreamExpressionParser.parse("parallel(" + COLLECTIONORALIAS + ","
+ "rollup("
- + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\", partitionKeys=\"a_s\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\", partitionKeys=\"a_s\", qt=\"/export\"),"
+ "over=\"a_s\","
+ "sum(a_i),"
+ "sum(a_f),"
@@ -2525,7 +2525,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
try {
//Copy all docs to destinationCollection
- String updateExpression = "update(parallelDestinationCollection, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"))";
+ String updateExpression = "update(parallelDestinationCollection, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\", qt=\"/export\"))";
TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"batchNumber asc\")");
parallelUpdateStream.setStreamContext(streamContext);
List<Tuple> tuples = getTuples(parallelUpdateStream);
@@ -2626,7 +2626,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
try {
//Copy all docs to destinationCollection
- String updateExpression = "daemon(update(parallelDestinationCollection1, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\")), runInterval=\"1000\", id=\"test\")";
+ String updateExpression = "daemon(update(parallelDestinationCollection1, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\", qt=\"/export\")), runInterval=\"1000\", id=\"test\")";
TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"batchNumber asc\")");
parallelUpdateStream.setStreamContext(streamContext);
List<Tuple> tuples = getTuples(parallelUpdateStream);
@@ -3015,7 +3015,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
try {
//Copy all docs to destinationCollection
- String updateExpression = "commit(parallelDestinationCollection, batchSize=0, zkHost=\"" + cluster.getZkServer().getZkAddress() + "\", update(parallelDestinationCollection, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\")))";
+ String updateExpression = "commit(parallelDestinationCollection, batchSize=0, zkHost=\"" + cluster.getZkServer().getZkAddress() + "\", update(parallelDestinationCollection, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\", qt=\"/export\")))";
TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"batchNumber asc\")");
parallelUpdateStream.setStreamContext(streamContext);
List<Tuple> tuples = getTuples(parallelUpdateStream);
@@ -3115,7 +3115,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
try {
//Copy all docs to destinationCollection
- String updateExpression = "daemon(commit(parallelDestinationCollection1, batchSize=0, zkHost=\"" + cluster.getZkServer().getZkAddress() + "\", update(parallelDestinationCollection1, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"))), runInterval=\"1000\", id=\"test\")";
+ String updateExpression = "daemon(commit(parallelDestinationCollection1, batchSize=0, zkHost=\"" + cluster.getZkServer().getZkAddress() + "\", update(parallelDestinationCollection1, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\", qt=\"/export\"))), runInterval=\"1000\", id=\"test\")";
TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"batchNumber asc\")");
parallelUpdateStream.setStreamContext(streamContext);
List<Tuple> tuples = getTuples(parallelUpdateStream);
@@ -3626,7 +3626,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
.withFunctionName("parallel", ParallelStream.class)
.withFunctionName("update", UpdateStream.class);
- String executorExpression = "parallel(workQueue1, workers=2, sort=\"EOF asc\", executor(threads=3, queueSize=100, search(workQueue1, q=\"*:*\", fl=\"id, expr_s\", rows=1000, partitionKeys=id, sort=\"id desc\")))";
+ String executorExpression = "parallel(workQueue1, workers=2, sort=\"EOF asc\", executor(threads=3, queueSize=100, search(workQueue1, q=\"*:*\", fl=\"id, expr_s\", rows=1000, partitionKeys=id, sort=\"id desc\", qt=\"/export\")))";
executorStream = factory.constructStream(executorExpression);
StreamContext context = new StreamContext();
@@ -3699,8 +3699,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
final TupleStream stream = streamFactory.constructStream("parallel("
+ "collection1, "
+ "intersect("
- + "search(collection1, q=a_s:(setA || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc, a_s asc\", partitionKeys=\"a_i\"),"
- + "search(collection1, q=a_s:(setB || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"),"
+ + "search(collection1, q=a_s:(setA || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc, a_s asc\", partitionKeys=\"a_i\", qt=\"/export\"),"
+ + "search(collection1, q=a_s:(setB || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\", qt=\"/export\"),"
+ "on=\"a_i\"),"
+ "workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i asc\")");
@@ -3935,8 +3935,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
final TupleStream stream = streamFactory.constructStream("parallel("
+ "collection1, "
+ "complement("
- + "search(collection1, q=a_s:(setA || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc, a_s asc\", partitionKeys=\"a_i\"),"
- + "search(collection1, q=a_s:(setB || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"),"
+ + "search(collection1, q=a_s:(setA || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc, a_s asc\", partitionKeys=\"a_i\", qt=\"/export\"),"
+ + "search(collection1, q=a_s:(setB || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\", qt=\"/export\"),"
+ "on=\"a_i\"),"
+ "workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i asc\")");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 7e2451e..e0cc965 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -223,6 +223,66 @@ public class StreamExpressionTest extends SolrCloudTestCase {
}
@Test
+ public void testSearchFacadeStream() throws Exception {
+
+ new UpdateRequest()
+ .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+ .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+ .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+ .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+ .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+ List<Tuple> tuples;
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+ List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext);
+
+ try {
+ StringBuilder buf = new StringBuilder();
+ for (String shardUrl : shardUrls) {
+ if (buf.length() > 0) {
+ buf.append(",");
+ }
+ buf.append(shardUrl);
+ }
+
+ ModifiableSolrParams solrParams = new ModifiableSolrParams();
+ solrParams.add("qt", "/stream");
+ solrParams.add("expr", "sort(search("+COLLECTIONORALIAS+"), by=\"a_i asc\")");
+ SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams);
+ solrStream.setStreamContext(streamContext);
+ tuples = getTuples(solrStream);
+ assert (tuples.size() == 5);
+ assertOrder(tuples, 0, 1, 2, 3, 4);
+ assertLong(tuples.get(0), "a_i", 0);
+ assertDouble(tuples.get(0), "a_f", 0);
+ assertString(tuples.get(0), "a_s", "hello0");
+
+ assertLong(tuples.get(1), "a_i", 1);
+ assertDouble(tuples.get(1), "a_f", 1);
+ assertString(tuples.get(1), "a_s", "hello1");
+
+ assertLong(tuples.get(2), "a_i", 2);
+ assertDouble(tuples.get(2), "a_f", 0);
+ assertString(tuples.get(2), "a_s", "hello2");
+
+ assertLong(tuples.get(3), "a_i", 3);
+ assertDouble(tuples.get(3), "a_f", 3);
+ assertString(tuples.get(3), "a_s", "hello3");
+
+ assertLong(tuples.get(4), "a_i", 4);
+ assertDouble(tuples.get(4), "a_f", 4);
+ assertString(tuples.get(4), "a_s", "hello4");
+
+ } finally {
+ solrClientCache.close();
+ }
+ }
+
+
+ @Test
public void testSqlStream() throws Exception {
new UpdateRequest()
@@ -2662,6 +2722,15 @@ public class StreamExpressionTest extends SolrCloudTestCase {
return true;
}
+
+ public boolean assertDouble(Tuple tuple, String fieldName, double d) throws Exception {
+ double dv = tuple.getDouble(fieldName);
+ if(dv != d) {
+ throw new Exception("Doubles not equal:"+d+" : "+dv);
+ }
+
+ return true;
+ }
protected boolean assertMaps(List<Map> maps, int... ids) throws Exception {
if(maps.size() != ids.length) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index ea3ec36..3085f2c 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -180,7 +180,7 @@ public void testNonePartitionKeys() throws Exception {
streamContext.setSolrClientCache(solrClientCache);
try {
- SolrParams sParamsA = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
+ SolrParams sParamsA = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none", "qt", "/export");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
ParallelStream pstream = parallelStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
attachStreamFactory(pstream);
@@ -214,7 +214,7 @@ public void testParallelUniqueStream() throws Exception {
try {
- SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
+ SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc", "partitionKeys", "a_f", "qt", "/export");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
ParallelStream pstream = parallelStream(ustream, new FieldComparator("a_f", ComparatorOrder.ASCENDING));
@@ -315,7 +315,7 @@ public void testParallelRankStream() throws Exception {
SolrClientCache solrClientCache = new SolrClientCache();
streamContext.setSolrClientCache(solrClientCache);
try {
- SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+ SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i", "qt", "/export");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
@@ -497,7 +497,7 @@ public void testParallelRankStream() throws Exception {
streamContext.setSolrClientCache(solrClientCache);
try {
- SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
+ SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s", "qt", "/export");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
ReducerStream rstream = new ReducerStream(stream,
@@ -524,7 +524,7 @@ public void testParallelRankStream() throws Exception {
//Test Descending with Ascending subsort
- sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
+ sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s desc,a_f asc", "partitionKeys", "a_s", "qt", "/export");
stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
rstream = new ReducerStream(stream,
@@ -628,7 +628,7 @@ public void testParallelRankStream() throws Exception {
//Test an error that originates from the /select handler
- sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc", "partitionKeys", "a_s");
+ sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc", "partitionKeys", "a_s", "qt", "/export");
stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
pstream = new ParallelStream(zkHost, COLLECTIONORALIAS, stream, 2, new FieldComparator("blah", ComparatorOrder.ASCENDING));
estream = new ExceptionStream(pstream);
@@ -1815,7 +1815,7 @@ public void testParallelRankStream() throws Exception {
try {
//Intentionally adding partitionKeys to trigger SOLR-12674
- SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s");
+ SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s", "qt", "/export" );
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
Bucket[] buckets = {new Bucket("a_s")};
@@ -1839,7 +1839,7 @@ public void testParallelRankStream() throws Exception {
List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext);
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.add("qt", "/stream");
- solrParams.add("expr", "rollup(search(" + COLLECTIONORALIAS + ",q=\"*:*\",fl=\"a_s,a_i,a_f\",sort=\"a_s desc\",partitionKeys=\"a_s\"),over=\"a_s\")\n");
+ solrParams.add("expr", "rollup(search(" + COLLECTIONORALIAS + ",q=\"*:*\",fl=\"a_s,a_i,a_f\",sort=\"a_s desc\",partitionKeys=\"a_s\", qt=\"/export\"),over=\"a_s\")\n");
SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams);
streamContext = new StreamContext();
solrStream.setStreamContext(streamContext);
@@ -1871,7 +1871,7 @@ public void testParallelRankStream() throws Exception {
streamContext.setSolrClientCache(solrClientCache);
try {
- SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s");
+ SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s", "qt", "/export");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
Bucket[] buckets = {new Bucket("a_s")};
@@ -1990,7 +1990,7 @@ public void testParallelRankStream() throws Exception {
SolrClientCache solrClientCache = new SolrClientCache();
streamContext.setSolrClientCache(solrClientCache);
try {
- SolrParams sParamsA = mapParams("q", "blah", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
+ SolrParams sParamsA = mapParams("q", "a_s:blah", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s", "qt", "/export");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
ReducerStream rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"),
@@ -2151,10 +2151,10 @@ public void testParallelRankStream() throws Exception {
try {
//Test ascending
- SolrParams sParamsA = mapParams("q", "id:(4 1 8 7 9)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+ SolrParams sParamsA = mapParams("q", "id:(4 1 8 7 9)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i", "qt", "/export");
CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- SolrParams sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+ SolrParams sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i", "qt", "/export");
CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);
MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i", ComparatorOrder.ASCENDING));
@@ -2167,10 +2167,10 @@ public void testParallelRankStream() throws Exception {
assertOrder(tuples, 0, 1, 2, 3, 4, 7, 6, 8, 9);
//Test descending
- sParamsA = mapParams("q", "id:(4 1 8 9)", "fl", "id,a_s,a_i", "sort", "a_i desc", "partitionKeys", "a_i");
+ sParamsA = mapParams("q", "id:(4 1 8 9)", "fl", "id,a_s,a_i", "sort", "a_i desc", "partitionKeys", "a_i", "qt", "/export");
streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i desc", "partitionKeys", "a_i");
+ sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i desc", "partitionKeys", "a_i", "qt", "/export");
streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);
mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
@@ -2208,10 +2208,10 @@ public void testParallelRankStream() throws Exception {
try {
//Test ascending
- SolrParams sParamsA = mapParams("q", "id:(4 1 8 7 9)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+ SolrParams sParamsA = mapParams("q", "id:(4 1 8 7 9)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i", "qt", "/export");
CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- SolrParams sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+ SolrParams sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i", "qt", "/export");
CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);
MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i", ComparatorOrder.ASCENDING));