You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/11/28 18:48:40 UTC

[05/10] lucene-solr:jira/http2: SOLR-12984: The search Streaming Expression should properly support and push down paging when using the /select handler

SOLR-12984: The search Streaming Expression should properly support and push down paging when using the /select handler


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c2cac887
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c2cac887
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c2cac887

Branch: refs/heads/jira/http2
Commit: c2cac887702f9efc0a6bf75cd9f1e78f730c2c4f
Parents: 1534bbe
Author: Joel Bernstein <jb...@apache.org>
Authored: Tue Nov 27 11:21:42 2018 -0500
Committer: Joel Bernstein <jb...@apache.org>
Committed: Tue Nov 27 11:22:02 2018 -0500

----------------------------------------------------------------------
 .../org/apache/solr/client/solrj/io/Lang.java   |   3 +-
 .../client/solrj/io/stream/CloudSolrStream.java |   6 +-
 .../solrj/io/stream/SearchFacadeStream.java     | 141 ++++++++++
 .../client/solrj/io/stream/SearchStream.java    | 269 +++++++++++++++++++
 .../client/solrj/io/stream/ShuffleStream.java   |  70 +++++
 .../apache/solr/client/solrj/io/TestLang.java   |   2 +-
 .../solrj/io/stream/MathExpressionTest.java     |   2 +-
 .../solrj/io/stream/StreamDecoratorTest.java    |  50 ++--
 .../solrj/io/stream/StreamExpressionTest.java   |  69 +++++
 .../client/solrj/io/stream/StreamingTest.java   |  32 +--
 10 files changed, 597 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
index 13c91ec..050fa7e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
@@ -36,7 +36,7 @@ public class Lang {
   public static void register(StreamFactory streamFactory) {
     streamFactory
         // source streams
-        .withFunctionName("search", CloudSolrStream.class)
+        .withFunctionName("search", SearchFacadeStream.class)
         .withFunctionName("facet", FacetStream.class)
         .withFunctionName("update", UpdateStream.class)
         .withFunctionName("jdbc", JDBCStream.class)
@@ -80,6 +80,7 @@ public class Lang {
         .withFunctionName("significantTerms", SignificantTermsStream.class)
         .withFunctionName("cartesianProduct", CartesianProductStream.class)
         .withFunctionName("shuffle", ShuffleStream.class)
+        .withFunctionName("export", ShuffleStream.class)
         .withFunctionName("calc", CalculatorStream.class)
         .withFunctionName("eval", EvalStream.class)
         .withFunctionName("echo", EchoStream.class)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index ddd9774..2cff0a7 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -167,7 +167,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
     // functionName(collectionName, param1, param2, ..., paramN, sort="comp", [aliases="field=alias,..."])
     
     // function name
-    StreamExpression expression = new StreamExpression(factory.getFunctionName(getClass()));
+    StreamExpression expression = new StreamExpression("search");
     
     // collection
     expression.addParameter(collection);
@@ -206,7 +206,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
 
     StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
     
-    explanation.setFunctionName(factory.getFunctionName(this.getClass()));
+    explanation.setFunctionName("search");
     explanation.setImplementingClass(this.getClass().getName());
     explanation.setExpressionType(ExpressionType.STREAM_SOURCE);
     explanation.setExpression(toExpression(factory).toString());
@@ -226,7 +226,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
     return explanation;
   }
 
-  protected void init(String collectionName, String zkHost, SolrParams params) throws IOException {
+  void init(String collectionName, String zkHost, SolrParams params) throws IOException {
     this.zkHost = zkHost;
     this.collection = collectionName;
     this.params = new ModifiableSolrParams(params);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchFacadeStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchFacadeStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchFacadeStream.java
new file mode 100644
index 0000000..5e8b549
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchFacadeStream.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+
+/**
+ * Connects to Zookeeper to pick replicas from a specific collection to send the query to.
+ * Under the covers the SolrStream instances send the query to the replicas.
+ * SolrStreams are opened using a thread pool, but a single thread is used
+ * to iterate and merge Tuples from each SolrStream.
+ * @since 5.1.0
+ **/
+
+public class SearchFacadeStream extends TupleStream implements Expressible {
+
+  private static final long serialVersionUID = 1;
+  private TupleStream innerStream;
+
+  public SearchFacadeStream(StreamExpression expression, StreamFactory factory) throws IOException{
+    // grab all parameters out
+    String collectionName = factory.getValueOperand(expression, 0);
+    List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
+    StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
+
+    // Collection Name
+    if(null == collectionName){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
+    }
+
+
+    ModifiableSolrParams mParams = new ModifiableSolrParams();
+    for(StreamExpressionNamedParameter namedParam : namedParams){
+      if(!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("aliases")){
+        mParams.add(namedParam.getName(), namedParam.getParameter().toString().trim());
+      }
+    }
+
+    // zkHost, optional - if not provided then will look into factory list to get
+    String zkHost = null;
+    if(null == zkHostExpression){
+      zkHost = factory.getCollectionZkHost(collectionName);
+      if(zkHost == null) {
+        zkHost = factory.getDefaultZkHost();
+      }
+    }
+    else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
+      zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
+    }
+    /*
+    if(null == zkHost){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
+    }
+    */
+
+    if(mParams.get(CommonParams.QT) != null && mParams.get(CommonParams.QT).equals("/export")) {
+      CloudSolrStream cloudSolrStream = new CloudSolrStream();
+      cloudSolrStream.init(collectionName, zkHost, mParams);
+      this.innerStream = cloudSolrStream;
+    } else {
+
+      if(mParams.get("partitionKeys") != null) {
+        throw new IOException("partitionKeys can only be used in the search function when the /export handler is specified");
+      }
+
+      SearchStream searchStream = new SearchStream();
+      searchStream.init(zkHost, collectionName, mParams);
+      this.innerStream = searchStream;
+    }
+  }
+
+  @Override
+  public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+    return ((Expressible)innerStream).toExpression(factory);
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+    return innerStream.toExplanation(factory);
+  }
+
+  public void setStreamContext(StreamContext context) {
+    this.innerStream.setStreamContext(context);
+  }
+
+  /**
+   * Opens the CloudSolrStream
+   *
+   ***/
+  public void open() throws IOException {
+    innerStream.open();
+  }
+
+  public List<TupleStream> children() {
+    return innerStream.children();
+  }
+
+  /**
+   *  Closes the CloudSolrStream
+   **/
+  public void close() throws IOException {
+    innerStream.close();
+  }
+
+
+  public Tuple read() throws IOException {
+    return innerStream.read();
+  }
+
+  public StreamComparator getStreamSort(){
+    return innerStream.getStreamSort();
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
new file mode 100644
index 0000000..2eeeb78
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+
+public class SearchStream extends TupleStream implements Expressible  {
+
+  private String zkHost;
+  private ModifiableSolrParams params;
+  private String collection;
+  protected transient SolrClientCache cache;
+  protected transient CloudSolrClient cloudSolrClient;
+  private Iterator<SolrDocument> documentIterator;
+  protected StreamComparator comp;
+
+  public SearchStream() {}
+
+
+  public SearchStream(StreamExpression expression, StreamFactory factory) throws IOException{
+    // grab all parameters out
+    String collectionName = factory.getValueOperand(expression, 0);
+    List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
+    StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
+
+
+    // Collection Name
+    if(null == collectionName){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
+    }
+
+    // Named parameters - passed directly to solr as solrparams
+    if(0 == namedParams.size()){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression));
+    }
+
+    // pull out known named params
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    for(StreamExpressionNamedParameter namedParam : namedParams){
+      if(!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("buckets") && !namedParam.getName().equals("bucketSorts") && !namedParam.getName().equals("limit")){
+        params.add(namedParam.getName(), namedParam.getParameter().toString().trim());
+      }
+    }
+
+    // zkHost, optional - if not provided then will look into factory list to get
+    String zkHost = null;
+    if(null == zkHostExpression){
+      zkHost = factory.getCollectionZkHost(collectionName);
+      if(zkHost == null) {
+        zkHost = factory.getDefaultZkHost();
+      }
+    }
+    else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
+      zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
+    }
+    if(null == zkHost){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
+    }
+
+    // We've got all the required items
+    init(zkHost, collectionName, params);
+  }
+
+  void init(String zkHost, String collection, ModifiableSolrParams params) throws IOException {
+    this.zkHost  = zkHost;
+    this.params   = params;
+
+    if(this.params.get(CommonParams.Q) == null) {
+      this.params.add(CommonParams.Q, "*:*");
+    }
+    this.collection = collection;
+    if(params.get(CommonParams.SORT) != null) {
+      this.comp = parseComp(params.get(CommonParams.SORT), params.get(CommonParams.FL));
+    }
+  }
+
+  @Override
+  public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+    // function name
+    StreamExpression expression = new StreamExpression("search");
+
+    // collection
+    expression.addParameter(collection);
+
+    for (Entry<String, String[]> param : params.getMap().entrySet()) {
+      for (String val : param.getValue()) {
+        // SOLR-8409: Escaping the " is a special case.
+        // Do note that in any other BASE streams with parameters where a " might come into play
+        // that this same replacement needs to take place.
+        expression.addParameter(new StreamExpressionNamedParameter(param.getKey(),
+            val.replace("\"", "\\\"")));
+      }
+    }
+
+    // zkHost
+    expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
+
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+    StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
+
+    explanation.setFunctionName("search");
+    explanation.setImplementingClass(this.getClass().getName());
+    explanation.setExpressionType(ExpressionType.STREAM_SOURCE);
+    explanation.setExpression(toExpression(factory).toString());
+
+    // child is a datastore so add it at this point
+    StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore");
+    child.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection));
+    child.setImplementingClass("Solr/Lucene");
+    child.setExpressionType(ExpressionType.DATASTORE);
+
+    explanation.addChild(child);
+
+    return explanation;
+  }
+
+  public void setStreamContext(StreamContext context) {
+    cache = context.getSolrClientCache();
+  }
+
+  public List<TupleStream> children() {
+    List<TupleStream> l =  new ArrayList();
+    return l;
+  }
+
+  public void open() throws IOException {
+    if(cache != null) {
+      cloudSolrClient = cache.getCloudSolrClient(zkHost);
+    } else {
+      final List<String> hosts = new ArrayList<>();
+      hosts.add(zkHost);
+      cloudSolrClient = new CloudSolrClient.Builder(hosts, Optional.empty()).build();
+    }
+
+
+    QueryRequest request = new QueryRequest(params);
+    try {
+      QueryResponse response = request.process(cloudSolrClient, collection);
+      SolrDocumentList docs = response.getResults();
+      documentIterator = docs.iterator();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  public void close() throws IOException {
+    if(cache == null) {
+      cloudSolrClient.close();
+    }
+  }
+
+  public Tuple read() throws IOException {
+    if(documentIterator.hasNext()) {
+      Map map = new HashMap();
+      SolrDocument doc = documentIterator.next();
+      for(String key  : doc.keySet()) {
+        map.put(key, doc.get(key));
+      }
+      return new Tuple(map);
+    } else {
+      Map fields = new HashMap();
+      fields.put("EOF", true);
+      Tuple tuple = new Tuple(fields);
+      return tuple;
+    }
+  }
+
+
+  public int getCost() {
+    return 0;
+  }
+
+  @Override
+  public StreamComparator getStreamSort() {
+    return comp;
+  }
+
+  private StreamComparator parseComp(String sort, String fl) throws IOException {
+
+    HashSet fieldSet = null;
+
+    if(fl != null) {
+      fieldSet = new HashSet();
+      String[] fls = fl.split(",");
+      for (String f : fls) {
+        fieldSet.add(f.trim()); //Handle spaces in the field list.
+      }
+    }
+
+    String[] sorts = sort.split(",");
+    StreamComparator[] comps = new StreamComparator[sorts.length];
+    for(int i=0; i<sorts.length; i++) {
+      String s = sorts[i];
+
+      String[] spec = s.trim().split("\\s+"); //This should take into account spaces in the sort spec.
+
+      if (spec.length != 2) {
+        throw new IOException("Invalid sort spec:" + s);
+      }
+
+      String fieldName = spec[0].trim();
+      String order = spec[1].trim();
+
+      if(fieldSet != null && !fieldSet.contains(spec[0])) {
+        throw new IOException("Fields in the sort spec must be included in the field list:"+spec[0]);
+      }
+
+      comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
+    }
+
+    if(comps.length > 1) {
+      return new MultipleFieldComparator(comps);
+    } else {
+      return comps[0];
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ShuffleStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ShuffleStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ShuffleStream.java
index 559228f..0047e2e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ShuffleStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ShuffleStream.java
@@ -21,8 +21,12 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
 
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
 import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
@@ -98,6 +102,72 @@ public class ShuffleStream extends CloudSolrStream implements Expressible {
     init(collectionName, zkHost, mParams);
   }
 
+  @Override
+  public StreamExpression toExpression(StreamFactory factory) throws IOException {
+    // functionName(collectionName, param1, param2, ..., paramN, sort="comp", [aliases="field=alias,..."])
+
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+    // collection
+    expression.addParameter(collection);
+
+    for (Map.Entry<String, String[]> param : params.getMap().entrySet()) {
+      for (String val : param.getValue()) {
+        // SOLR-8409: Escaping the " is a special case.
+        // Do note that in any other BASE streams with parameters where a " might come into play
+        // that this same replacement needs to take place.
+        expression.addParameter(new StreamExpressionNamedParameter(param.getKey(),
+            val.replace("\"", "\\\"")));
+      }
+    }
+
+    // zkHost
+    expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
+
+    // aliases
+    if(null != fieldMappings && 0 != fieldMappings.size()){
+      StringBuilder sb = new StringBuilder();
+      for(Map.Entry<String,String> mapping : fieldMappings.entrySet()){
+        if(sb.length() > 0){ sb.append(","); }
+        sb.append(mapping.getKey());
+        sb.append("=");
+        sb.append(mapping.getValue());
+      }
+
+      expression.addParameter(new StreamExpressionNamedParameter("aliases", sb.toString()));
+    }
+
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+    StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
+
+    explanation.setFunctionName(factory.getFunctionName(this.getClass()));
+    explanation.setImplementingClass(this.getClass().getName());
+    explanation.setExpressionType(Explanation.ExpressionType.STREAM_SOURCE);
+    explanation.setExpression(toExpression(factory).toString());
+
+    // child is a datastore so add it at this point
+    StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore");
+    child.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection));
+    child.setImplementingClass("Solr/Lucene");
+    child.setExpressionType(Explanation.ExpressionType.DATASTORE);
+
+    if(null != params){
+      ModifiableSolrParams mParams = new ModifiableSolrParams(params);
+      child.setExpression(mParams.getMap().entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
+    }
+    explanation.addChild(child);
+
+    return explanation;
+  }
+
+
+
   public ModifiableSolrParams adjustParams(ModifiableSolrParams mParams) {
     mParams.set(CommonParams.QT, "/export");
     return mParams;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
index 3adac54..3b238c2 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
@@ -73,7 +73,7 @@ public class TestLang extends LuceneTestCase {
       "outliers", "stream", "getCache", "putCache", "listCache", "removeCache", "zscores", "latlonVectors",
       "convexHull", "getVertices", "getBaryCenter", "getArea", "getBoundarySize","oscillate",
       "getAmplitude", "getPhase", "getAngularFrequency", "enclosingDisk", "getCenter", "getRadius",
-      "getSupportPoints", "pairSort", "log10", "plist", "recip", "pivot", "ltrim", "rtrim"};
+      "getSupportPoints", "pairSort", "log10", "plist", "recip", "pivot", "ltrim", "rtrim", "export"};
 
   @Test
   public void testLang() {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
index 0ccd691..a45683c 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
@@ -5113,7 +5113,7 @@ public class MathExpressionTest extends SolrCloudTestCase {
     d = (double)tuples.get(1).get("kilometers");
     assertTrue(d == (double)(70*1.61));
 
-    expr = "parallel("+COLLECTIONORALIAS+", workers=2, sort=\"miles_i asc\", select(search("+COLLECTIONORALIAS+", q=\"*:*\", partitionKeys=miles_i, sort=\"miles_i asc\", fl=\"miles_i\"), convert(miles, kilometers, miles_i) as kilometers))";
+    expr = "parallel("+COLLECTIONORALIAS+", workers=2, sort=\"miles_i asc\", select(search("+COLLECTIONORALIAS+", q=\"*:*\", partitionKeys=miles_i, sort=\"miles_i asc\", fl=\"miles_i\", qt=\"/export\"), convert(miles, kilometers, miles_i) as kilometers))";
     paramsLoc = new ModifiableSolrParams();
     paramsLoc.set("expr", expr);
     paramsLoc.set("qt", "/stream");

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
index af79ea6..aa639d4 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
@@ -283,7 +283,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
     try {
 
       // Basic test
-      stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"nullCount desc\", null(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), by=\"a_i asc\"))");
+      stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"nullCount desc\", null(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id, qt=\"/export\"), by=\"a_i asc\"))");
       stream.setStreamContext(streamContext);
       tuples = getTuples(stream);
       assertTrue(tuples.size() == 2);
@@ -697,7 +697,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
         .withFunctionName("val", RawValueEvaluator.class)
         .withFunctionName("parallel", ParallelStream.class);
 
-    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), eq(a_i, 9)))");
+    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id, qt=\"/export\"), eq(a_i, 9)))");
     StreamContext context = new StreamContext();
     context.setSolrClientCache(solrClientCache);
     stream.setStreamContext(context);
@@ -707,7 +707,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
     Tuple t = tuples.get(0);
     assertTrue(t.getString("id").equals("9"));
 
-    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), and(eq(a_i, 9),lt(a_i, 10))))");
+    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id, qt=\"/export\"), and(eq(a_i, 9),lt(a_i, 10))))");
     context = new StreamContext();
     context.setSolrClientCache(solrClientCache);
     stream.setStreamContext(context);
@@ -717,7 +717,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
     t = tuples.get(0);
     assertTrue(t.getString("id").equals("9"));
 
-    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\",having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), or(eq(a_i, 9),eq(a_i, 8))))");
+    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\",having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id, qt=\"/export\"), or(eq(a_i, 9),eq(a_i, 8))))");
     context = new StreamContext();
     context.setSolrClientCache(solrClientCache);
     stream.setStreamContext(context);
@@ -731,7 +731,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
     assertTrue(t.getString("id").equals("9"));
 
 
-    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), and(eq(a_i, 9),not(eq(a_i, 9)))))");
+    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id, qt=\"/export\"), and(eq(a_i, 9),not(eq(a_i, 9)))))");
     context = new StreamContext();
     context.setSolrClientCache(solrClientCache);
     stream.setStreamContext(context);
@@ -740,7 +740,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
     assert(tuples.size() == 0);
 
 
-    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\",having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), and(lteq(a_i, 9), gteq(a_i, 8))))");
+    stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\",having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id, qt=\"/export\"), and(lteq(a_i, 9), gteq(a_i, 8))))");
     context = new StreamContext();
     context.setSolrClientCache(solrClientCache);
     stream.setStreamContext(context);
@@ -754,7 +754,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
     t = tuples.get(1);
     assertTrue(t.getString("id").equals("9"));
 
-    stream = factory.constructStream("parallel("+COLLECTIONORALIAS+", workers=2, sort=\"a_f asc\", having(rollup(over=a_f, sum(a_i), search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=a_f)), and(eq(sum(a_i), 9),eq(sum(a_i),9))))");
+    stream = factory.constructStream("parallel("+COLLECTIONORALIAS+", workers=2, sort=\"a_f asc\", having(rollup(over=a_f, sum(a_i), search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=a_f, qt=\"/export\")), and(eq(sum(a_i), 9),eq(sum(a_i),9))))");
     context = new StreamContext();
     context.setSolrClientCache(solrClientCache);
     stream.setStreamContext(context);
@@ -901,7 +901,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
 
     try {
 
-      stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ",  search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))");
+      stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ",  search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\", qt=\"/export\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))");
       stream.setStreamContext(streamContext);
       tuples = getTuples(stream);
 
@@ -928,7 +928,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
       assertTrue("blah blah blah 9".equals(t.getString("subject")));
 
 
-      stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ",  search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))");
+      stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ",  search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\", qt=\"/export\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))");
       stream.setStreamContext(streamContext);
       tuples = getTuples(stream);
 
@@ -1274,7 +1274,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
 
     try {
 
-      ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_f asc\")");
+      ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\", qt=\"/export\"), over=\"a_f\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_f asc\")");
       pstream.setStreamContext(streamContext);
       List<Tuple> tuples = getTuples(pstream);
       assert (tuples.size() == 5);
@@ -1414,7 +1414,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
     try {
       ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", " +
           "reduce(" +
-          "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), " +
+          "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\", qt=\"/export\"), " +
           "by=\"a_s\"," +
           "group(sort=\"a_i asc\", n=\"5\")), " +
           "workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_s asc\")");
@@ -1440,7 +1440,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
 
       pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", " +
           "reduce(" +
-          "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), " +
+          "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\", qt=\"/export\"), " +
           "by=\"a_s\", " +
           "group(sort=\"a_i desc\", n=\"5\"))," +
           "workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_s desc\")");
@@ -1500,7 +1500,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
       ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel("
           + COLLECTIONORALIAS + ", "
           + "top("
-          + "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), "
+          + "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\", qt=\"/export\"), "
           + "n=\"11\", "
           + "sort=\"a_i desc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i desc\")");
       pstream.setStreamContext(streamContext);
@@ -1545,7 +1545,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
     streamContext.setSolrClientCache(solrClientCache);
     try {
       //Test ascending
-      ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 7 9)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), on=\"a_i asc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i asc\")");
+      ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 7 9)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\", qt=\"/export\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\", qt=\"/export\"), on=\"a_i asc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i asc\")");
       pstream.setStreamContext(streamContext);
       List<Tuple> tuples = getTuples(pstream);
 
@@ -1554,7 +1554,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
 
       //Test descending
 
-      pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 9)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), on=\"a_i desc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i desc\")");
+      pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 9)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\", qt=\"/export\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\", qt=\"/export\"), on=\"a_i desc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i desc\")");
       pstream.setStreamContext(streamContext);
       tuples = getTuples(pstream);
 
@@ -1606,7 +1606,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
     try {
       expression = StreamExpressionParser.parse("parallel(" + COLLECTIONORALIAS + ","
               + "rollup("
-              + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\", partitionKeys=\"a_s\"),"
+              + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\", partitionKeys=\"a_s\", qt=\"/export\"),"
               + "over=\"a_s\","
               + "sum(a_i),"
               + "sum(a_f),"
@@ -2525,7 +2525,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
 
     try {
       //Copy all docs to destinationCollection
-      String updateExpression = "update(parallelDestinationCollection, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"))";
+      String updateExpression = "update(parallelDestinationCollection, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\", qt=\"/export\"))";
       TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"batchNumber asc\")");
       parallelUpdateStream.setStreamContext(streamContext);
       List<Tuple> tuples = getTuples(parallelUpdateStream);
@@ -2626,7 +2626,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
 
     try {
       //Copy all docs to destinationCollection
-      String updateExpression = "daemon(update(parallelDestinationCollection1, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\")), runInterval=\"1000\", id=\"test\")";
+      String updateExpression = "daemon(update(parallelDestinationCollection1, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\", qt=\"/export\")), runInterval=\"1000\", id=\"test\")";
       TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"batchNumber asc\")");
       parallelUpdateStream.setStreamContext(streamContext);
       List<Tuple> tuples = getTuples(parallelUpdateStream);
@@ -3015,7 +3015,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
 
     try {
       //Copy all docs to destinationCollection
-      String updateExpression = "commit(parallelDestinationCollection, batchSize=0, zkHost=\"" + cluster.getZkServer().getZkAddress() + "\", update(parallelDestinationCollection, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\")))";
+      String updateExpression = "commit(parallelDestinationCollection, batchSize=0, zkHost=\"" + cluster.getZkServer().getZkAddress() + "\", update(parallelDestinationCollection, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\", qt=\"/export\")))";
       TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"batchNumber asc\")");
       parallelUpdateStream.setStreamContext(streamContext);
       List<Tuple> tuples = getTuples(parallelUpdateStream);
@@ -3115,7 +3115,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
 
     try {
       //Copy all docs to destinationCollection
-      String updateExpression = "daemon(commit(parallelDestinationCollection1, batchSize=0, zkHost=\"" + cluster.getZkServer().getZkAddress() + "\", update(parallelDestinationCollection1, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"))), runInterval=\"1000\", id=\"test\")";
+      String updateExpression = "daemon(commit(parallelDestinationCollection1, batchSize=0, zkHost=\"" + cluster.getZkServer().getZkAddress() + "\", update(parallelDestinationCollection1, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\", qt=\"/export\"))), runInterval=\"1000\", id=\"test\")";
       TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"batchNumber asc\")");
       parallelUpdateStream.setStreamContext(streamContext);
       List<Tuple> tuples = getTuples(parallelUpdateStream);
@@ -3626,7 +3626,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
         .withFunctionName("parallel", ParallelStream.class)
         .withFunctionName("update", UpdateStream.class);
 
-    String executorExpression = "parallel(workQueue1, workers=2, sort=\"EOF asc\", executor(threads=3, queueSize=100, search(workQueue1, q=\"*:*\", fl=\"id, expr_s\", rows=1000, partitionKeys=id, sort=\"id desc\")))";
+    String executorExpression = "parallel(workQueue1, workers=2, sort=\"EOF asc\", executor(threads=3, queueSize=100, search(workQueue1, q=\"*:*\", fl=\"id, expr_s\", rows=1000, partitionKeys=id, sort=\"id desc\", qt=\"/export\")))";
     executorStream = factory.constructStream(executorExpression);
 
     StreamContext context = new StreamContext();
@@ -3699,8 +3699,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
       final TupleStream stream = streamFactory.constructStream("parallel("
           + "collection1, "
           + "intersect("
-          + "search(collection1, q=a_s:(setA || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc, a_s asc\", partitionKeys=\"a_i\"),"
-          + "search(collection1, q=a_s:(setB || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"),"
+          + "search(collection1, q=a_s:(setA || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc, a_s asc\", partitionKeys=\"a_i\", qt=\"/export\"),"
+          + "search(collection1, q=a_s:(setB || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\", qt=\"/export\"),"
           + "on=\"a_i\"),"
           + "workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i asc\")");
 
@@ -3935,8 +3935,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
       final TupleStream stream = streamFactory.constructStream("parallel("
           + "collection1, "
           + "complement("
-          + "search(collection1, q=a_s:(setA || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc, a_s asc\", partitionKeys=\"a_i\"),"
-          + "search(collection1, q=a_s:(setB || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"),"
+          + "search(collection1, q=a_s:(setA || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc, a_s asc\", partitionKeys=\"a_i\", qt=\"/export\"),"
+          + "search(collection1, q=a_s:(setB || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\", qt=\"/export\"),"
           + "on=\"a_i\"),"
           + "workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i asc\")");
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 7e2451e..e0cc965 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -223,6 +223,66 @@ public class StreamExpressionTest extends SolrCloudTestCase {
   }
 
   @Test
+  public void testSearchFacadeStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext);
+
+    try {
+      StringBuilder buf = new StringBuilder();
+      for (String shardUrl : shardUrls) {
+        if (buf.length() > 0) {
+          buf.append(",");
+        }
+        buf.append(shardUrl);
+      }
+
+      ModifiableSolrParams solrParams = new ModifiableSolrParams();
+      solrParams.add("qt", "/stream");
+      solrParams.add("expr", "sort(search("+COLLECTIONORALIAS+"), by=\"a_i asc\")");
+      SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams);
+      solrStream.setStreamContext(streamContext);
+      tuples = getTuples(solrStream);
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 0, 1, 2, 3, 4);
+      assertLong(tuples.get(0), "a_i", 0);
+      assertDouble(tuples.get(0), "a_f", 0);
+      assertString(tuples.get(0), "a_s", "hello0");
+
+      assertLong(tuples.get(1), "a_i", 1);
+      assertDouble(tuples.get(1), "a_f", 1);
+      assertString(tuples.get(1), "a_s", "hello1");
+
+      assertLong(tuples.get(2), "a_i", 2);
+      assertDouble(tuples.get(2), "a_f", 0);
+      assertString(tuples.get(2), "a_s", "hello2");
+
+      assertLong(tuples.get(3), "a_i", 3);
+      assertDouble(tuples.get(3), "a_f", 3);
+      assertString(tuples.get(3), "a_s", "hello3");
+
+      assertLong(tuples.get(4), "a_i", 4);
+      assertDouble(tuples.get(4), "a_f", 4);
+      assertString(tuples.get(4), "a_s", "hello4");
+
+    } finally {
+      solrClientCache.close();
+    }
+  }
+
+
+  @Test
   public void testSqlStream() throws Exception {
 
     new UpdateRequest()
@@ -2662,6 +2722,15 @@ public class StreamExpressionTest extends SolrCloudTestCase {
 
     return true;
   }
+
+  public boolean assertDouble(Tuple tuple, String fieldName, double d) throws Exception {
+    double dv = tuple.getDouble(fieldName);
+    if(dv != d) {
+      throw new Exception("Doubles not equal:"+d+" : "+dv);
+    }
+
+    return true;
+  }
   
   protected boolean assertMaps(List<Map> maps, int... ids) throws Exception {
     if(maps.size() != ids.length) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2cac887/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index ea3ec36..3085f2c 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -180,7 +180,7 @@ public void testNonePartitionKeys() throws Exception {
   streamContext.setSolrClientCache(solrClientCache);
   try {
 
-    SolrParams sParamsA = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
+    SolrParams sParamsA = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none", "qt", "/export");
     CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
     ParallelStream pstream = parallelStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
     attachStreamFactory(pstream);
@@ -214,7 +214,7 @@ public void testParallelUniqueStream() throws Exception {
 
   try {
 
-    SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
+    SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc", "partitionKeys", "a_f", "qt", "/export");
     CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
     UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
     ParallelStream pstream = parallelStream(ustream, new FieldComparator("a_f", ComparatorOrder.ASCENDING));
@@ -315,7 +315,7 @@ public void testParallelRankStream() throws Exception {
   SolrClientCache solrClientCache = new SolrClientCache();
   streamContext.setSolrClientCache(solrClientCache);
   try {
-    SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+    SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i", "qt", "/export");
     CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
     RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
     ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
@@ -497,7 +497,7 @@ public void testParallelRankStream() throws Exception {
     streamContext.setSolrClientCache(solrClientCache);
 
     try {
-      SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
+      SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s", "qt", "/export");
       CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
 
       ReducerStream rstream = new ReducerStream(stream,
@@ -524,7 +524,7 @@ public void testParallelRankStream() throws Exception {
 
       //Test Descending with Ascending subsort
 
-      sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
+      sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s desc,a_f asc", "partitionKeys", "a_s", "qt", "/export");
       stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
 
       rstream = new ReducerStream(stream,
@@ -628,7 +628,7 @@ public void testParallelRankStream() throws Exception {
 
 
     //Test an error that originates from the /select handler
-    sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc", "partitionKeys", "a_s");
+    sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc", "partitionKeys", "a_s", "qt", "/export");
     stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
     pstream = new ParallelStream(zkHost, COLLECTIONORALIAS, stream, 2, new FieldComparator("blah", ComparatorOrder.ASCENDING));
     estream = new ExceptionStream(pstream);
@@ -1815,7 +1815,7 @@ public void testParallelRankStream() throws Exception {
 
     try {
       //Intentionally adding partitionKeys to trigger SOLR-12674
-      SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s");
+      SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s", "qt", "/export" );
       CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
 
       Bucket[] buckets = {new Bucket("a_s")};
@@ -1839,7 +1839,7 @@ public void testParallelRankStream() throws Exception {
       List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext);
       ModifiableSolrParams solrParams = new ModifiableSolrParams();
       solrParams.add("qt", "/stream");
-      solrParams.add("expr", "rollup(search(" + COLLECTIONORALIAS + ",q=\"*:*\",fl=\"a_s,a_i,a_f\",sort=\"a_s desc\",partitionKeys=\"a_s\"),over=\"a_s\")\n");
+      solrParams.add("expr", "rollup(search(" + COLLECTIONORALIAS + ",q=\"*:*\",fl=\"a_s,a_i,a_f\",sort=\"a_s desc\",partitionKeys=\"a_s\", qt=\"/export\"),over=\"a_s\")\n");
       SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams);
       streamContext = new StreamContext();
       solrStream.setStreamContext(streamContext);
@@ -1871,7 +1871,7 @@ public void testParallelRankStream() throws Exception {
     streamContext.setSolrClientCache(solrClientCache);
 
     try {
-      SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s");
+      SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s", "qt", "/export");
       CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
 
       Bucket[] buckets = {new Bucket("a_s")};
@@ -1990,7 +1990,7 @@ public void testParallelRankStream() throws Exception {
     SolrClientCache solrClientCache = new SolrClientCache();
     streamContext.setSolrClientCache(solrClientCache);
     try {
-      SolrParams sParamsA = mapParams("q", "blah", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
+      SolrParams sParamsA = mapParams("q", "a_s:blah", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s", "qt", "/export");
       CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
       ReducerStream rstream = new ReducerStream(stream,
           new FieldEqualitor("a_s"),
@@ -2151,10 +2151,10 @@ public void testParallelRankStream() throws Exception {
 
     try {
       //Test ascending
-      SolrParams sParamsA = mapParams("q", "id:(4 1 8 7 9)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+      SolrParams sParamsA = mapParams("q", "id:(4 1 8 7 9)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i", "qt", "/export");
       CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
 
-      SolrParams sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+      SolrParams sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i", "qt", "/export");
       CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);
 
       MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i", ComparatorOrder.ASCENDING));
@@ -2167,10 +2167,10 @@ public void testParallelRankStream() throws Exception {
       assertOrder(tuples, 0, 1, 2, 3, 4, 7, 6, 8, 9);
 
       //Test descending
-      sParamsA = mapParams("q", "id:(4 1 8 9)", "fl", "id,a_s,a_i", "sort", "a_i desc", "partitionKeys", "a_i");
+      sParamsA = mapParams("q", "id:(4 1 8 9)", "fl", "id,a_s,a_i", "sort", "a_i desc", "partitionKeys", "a_i", "qt", "/export");
       streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
 
-      sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i desc", "partitionKeys", "a_i");
+      sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i desc", "partitionKeys", "a_i", "qt", "/export");
       streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);
 
       mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
@@ -2208,10 +2208,10 @@ public void testParallelRankStream() throws Exception {
 
     try {
       //Test ascending
-      SolrParams sParamsA = mapParams("q", "id:(4 1 8 7 9)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+      SolrParams sParamsA = mapParams("q", "id:(4 1 8 7 9)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i", "qt", "/export");
       CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
 
-      SolrParams sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+      SolrParams sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i", "qt", "/export");
       CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);
 
       MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i", ComparatorOrder.ASCENDING));