You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/05/11 14:37:19 UTC

svn commit: r1678743 [1/3] - in /lucene/dev/trunk/solr: core/src/java/org/apache/solr/handler/ core/src/java/org/apache/solr/response/ server/solr/configsets/data_driven_schema_configs/conf/ solrj/src/java/org/apache/solr/client/solrj/io/ solrj/src/jav...

Author: jbernste
Date: Mon May 11 12:37:18 2015
New Revision: 1678743

URL: http://svn.apache.org/r1678743
Log:
SOLR-7377: Streaming Expressions

Added:
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorOrder.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ExpressibleComparator.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultiComp.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/package-info.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExpressibleStream.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JSONTupleStream.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpression.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionNamedParameter.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParameter.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParser.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionValue.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/package-info.java   (with props)
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/package-info.java   (with props)
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java   (with props)
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java   (with props)
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java   (with props)
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java   (with props)
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/expr/
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParserTest.java   (with props)
Removed:
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/AscFieldComp.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/DescFieldComp.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/JSONTupleStream.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/MergeStream.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/MultiComp.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/ParallelStream.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/PushBackStream.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/RankStream.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/ReducerStream.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrStream.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/StreamContext.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/TupleStream.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/UniqueStream.java
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/CountStream.java
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java
Modified:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java
    lucene/dev/trunk/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
    lucene/dev/trunk/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java?rev=1678743&r1=1678742&r2=1678743&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java Mon May 11 12:37:18 2015
@@ -17,32 +17,79 @@
 
 package org.apache.solr.handler;
 
+import java.util.Map.Entry;
+import java.net.URLDecoder;
 import java.io.ByteArrayInputStream;
 import java.io.ObjectInputStream;
-import java.net.URLDecoder;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.solr.client.solrj.io.SolrClientCache;
-import org.apache.solr.client.solrj.io.TupleStream;
-import org.apache.solr.client.solrj.io.StreamContext;
-import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
+import org.apache.solr.client.solrj.io.stream.ExpressibleStream;
+import org.apache.solr.client.solrj.io.stream.MergeStream;
+import org.apache.solr.client.solrj.io.stream.ParallelStream;
+import org.apache.solr.client.solrj.io.stream.RankStream;
+import org.apache.solr.client.solrj.io.stream.ReducerStream;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.stream.UniqueStream;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CloseHook;
+import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.util.plugin.SolrCoreAware;
 import org.apache.solr.common.util.Base64;
 
-
 public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
 
   private SolrClientCache clientCache = new SolrClientCache();
-
+  private StreamFactory streamFactory = new StreamFactory();
+  
   public void inform(SolrCore core) {
-
-    core.addCloseHook( new CloseHook() {
+    
+    /* The stream factory will always contain the zkUrl for the given collection
+     * Adds default streams with their corresponding function names. These 
+     * defaults can be overridden or added to in the solrConfig in the stream 
+     * RequestHandler def. Example config override
+     *  <lst name="streamFunctions">
+     *    <str name="group">org.apache.solr.client.solrj.io.stream.ReducerStream</str>
+     *    <str name="count">org.apache.solr.client.solrj.io.stream.CountStream</str>
+     *  </lst>
+     * */
+
+    String defaultCollection = null;
+    String defaultZkhost     = null;
+    CoreContainer coreContainer = core.getCoreDescriptor().getCoreContainer();
+
+    if(coreContainer.isZooKeeperAware()) {
+      defaultCollection = core.getCoreDescriptor().getCollectionName();
+      defaultZkhost = core.getCoreDescriptor().getCoreContainer().getZkController().getZkServerAddress();
+      streamFactory.withCollectionZkHost(defaultCollection, defaultZkhost);
+    }
+
+     streamFactory
+      .withStreamFunction("search", CloudSolrStream.class)
+      .withStreamFunction("merge", MergeStream.class)
+      .withStreamFunction("unique", UniqueStream.class)
+      .withStreamFunction("top", RankStream.class)
+      .withStreamFunction("group", ReducerStream.class)
+      .withStreamFunction("parallel", ParallelStream.class);
+
+    
+    // This pulls all the overrides and additions from the config
+    Object functionMappingsObj = initArgs.get("streamFunctions");
+    if(null != functionMappingsObj){
+      NamedList<?> functionMappings = (NamedList<?>)functionMappingsObj;
+      for(Entry<String,?> functionMapping : functionMappings){
+        Class<?> clazz = core.getResourceLoader().findClass((String)functionMapping.getValue(), ExpressibleStream.class);
+        streamFactory.withStreamFunction(functionMapping.getKey(), clazz);
+      }
+    }
+        
+    core.addCloseHook(new CloseHook() {
       @Override
       public void preClose(SolrCore core) {
         //To change body of implemented methods use File | Settings | File Templates.
@@ -57,15 +104,23 @@ public class StreamHandler extends Reque
 
   public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
     SolrParams params = req.getParams();
-    String encodedStream = params.get("stream");
-    encodedStream = URLDecoder.decode(encodedStream, "UTF-8");
-    byte[] bytes = Base64.base64ToByteArray(encodedStream);
-    ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
-    ObjectInputStream objectInputStream = new ObjectInputStream(byteStream);
-    TupleStream tupleStream = (TupleStream)objectInputStream.readObject();
 
-    int worker = params.getInt("workerID");
-    int numWorkers = params.getInt("numWorkers");
+    boolean objectSerialize = params.getBool("objectSerialize", false);
+    TupleStream tupleStream = null;
+
+    if(objectSerialize) {
+      String encodedStream = params.get("stream");
+      encodedStream = URLDecoder.decode(encodedStream, "UTF-8");
+      byte[] bytes = Base64.base64ToByteArray(encodedStream);
+      ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
+      ObjectInputStream objectInputStream = new ObjectInputStream(byteStream);
+      tupleStream = (TupleStream)objectInputStream.readObject();
+    } else {
+      tupleStream = this.streamFactory.constructStream(params.get("stream"));
+    }
+
+    int worker = params.getInt("workerID", 0);
+    int numWorkers = params.getInt("numWorkers", 1);
     StreamContext context = new StreamContext();
     context.workerID = worker;
     context.numWorkers = numWorkers;
@@ -81,4 +136,4 @@ public class StreamHandler extends Reque
   public String getSource() {
     return null;
   }
-}
\ No newline at end of file
+}

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java?rev=1678743&r1=1678742&r2=1678743&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java Mon May 11 12:37:18 2015
@@ -21,7 +21,7 @@ import java.io.IOException;
 import java.io.Writer;
 import java.util.*;
 
-import org.apache.solr.client.solrj.io.TupleStream;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.lucene.index.StorableField;
 import org.apache.lucene.index.StoredDocument;

Modified: lucene/dev/trunk/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml?rev=1678743&r1=1678742&r2=1678743&view=diff
==============================================================================
--- lucene/dev/trunk/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml (original)
+++ lucene/dev/trunk/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml Mon May 11 12:37:18 2015
@@ -854,6 +854,37 @@
     </lst>
   </requestHandler>
 
+
+    <!--
+     The export request handler is used to export full sorted result sets.
+     Do not change these defaults.
+   -->
+
+    <requestHandler name="/export" class="solr.SearchHandler">
+        <lst name="invariants">
+            <str name="rq">{!xport}</str>
+            <str name="wt">xsort</str>
+            <str name="distrib">false</str>
+        </lst>
+
+        <arr name="components">
+            <str>query</str>
+        </arr>
+    </requestHandler>
+
+
+    <!--
+    Distributed Stream processing.
+    -->
+
+    <requestHandler name="/stream" class="solr.StreamHandler">
+        <lst name="invariants">
+            <str name="wt">json</str>
+            <str name="distrib">false</str>
+        </lst>
+    </requestHandler>
+
+
   <!-- A Robust Example
        
        This example SearchHandler declaration shows off usage of the

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java?rev=1678743&r1=1678742&r2=1678743&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java Mon May 11 12:37:18 2015
@@ -93,11 +93,11 @@ public class Tuple implements Cloneable
     this.fields.put("_MAPS_", maps);
   }
 
-  public Map<String,Tuple> getMetrics() {
-    return (Map<String,Tuple>)this.fields.get("_METRICS_");
+  public Map<String,Map> getMetrics() {
+    return (Map<String,Map>)this.fields.get("_METRICS_");
   }
 
-  public void setMetrics(Map<String, Tuple> metrics) {
+  public void setMetrics(Map<String, Map> metrics) {
     this.fields.put("_METRICS_", metrics);
   }
 

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java Mon May 11 12:37:18 2015
@@ -0,0 +1,27 @@
+package org.apache.solr.client.solrj.io.comp;
+
+import org.apache.solr.client.solrj.io.Tuple;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Interface for use with a comparator lambda
+ */
+public interface ComparatorLambda {
+  public int compare(Tuple leftTuple, Tuple rightTuple);
+}

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorOrder.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorOrder.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorOrder.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorOrder.java Mon May 11 12:37:18 2015
@@ -0,0 +1,48 @@
+package org.apache.solr.client.solrj.io.comp;
+
+import java.util.Locale;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Enum for supported comparator ordering
+ */
+public enum ComparatorOrder {
+  ASCENDING, DESCENDING;
+  
+  public static ComparatorOrder fromString(String order){
+    switch(order.toLowerCase(Locale.ROOT)){
+      case "asc":
+        return ComparatorOrder.ASCENDING;
+      case "desc":
+        return ComparatorOrder.DESCENDING;
+      default:
+        throw new IllegalArgumentException(String.format(Locale.ROOT,"Unknown order '%s'", order));
+    }
+  }
+  
+  public String toString(){
+    switch(this){
+      case DESCENDING:
+        return "desc";
+      default:
+        return "asc";
+        
+    }
+  }
+}

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ExpressibleComparator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ExpressibleComparator.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ExpressibleComparator.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ExpressibleComparator.java Mon May 11 12:37:18 2015
@@ -0,0 +1,33 @@
+package org.apache.solr.client.solrj.io.comp;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Defines a comparator that can be expressed in an expression
+ */
+public interface ExpressibleComparator {
+  StreamExpressionParameter toExpression(StreamFactory factory) throws IOException;
+}

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java Mon May 11 12:37:18 2015
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.comp;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/**
+ *  An equality field Comparator which compares a field of two Tuples and determines sort order.
+ **/
+public class FieldComparator extends StreamComparator implements Comparator<Tuple>, ExpressibleComparator, Serializable {
+
+  private static final long serialVersionUID = 1;
+  private ComparatorLambda comparator;
+
+  public FieldComparator(String field, ComparatorOrder order) {
+    super(field, order);
+    assignComparator();
+  }
+  public FieldComparator(String leftField, String rightField, ComparatorOrder order){
+    super(leftField,rightField,order);
+    assignComparator();
+  }
+  
+  public StreamExpressionParameter toExpression(StreamFactory factory){
+    StringBuilder sb = new StringBuilder();
+    
+    sb.append(leftField);
+    
+    if(!leftField.equals(rightField)){
+      sb.append("=");
+      sb.append(rightField); 
+    }
+    
+    sb.append(" ");
+    sb.append(order);
+    
+    return new StreamExpressionValue(sb.toString());
+  }
+  
+  /*
+   * What're we doing here messing around with lambdas for the comparator logic?
+   * We want the compare(...) function to run as fast as possible because it will be called many many
+   * times over the lifetime of this object. For that reason we want to limit the number of comparisons
+   * taking place in the compare(...) function. Because this class supports both ascending and
+   * descending comparisons and the logic for each is slightly different, we want to do the 
+   *   if(ascending){ compare like this } else { compare like this }
+   * check only once - we can do that in the constructor of this class, create a lambda, and then execute 
+   * that lambda in the compare function. A little bit of branch prediction savings right here.
+   */
+  private void assignComparator(){
+    if(ComparatorOrder.DESCENDING == order){
+      // What black magic is this type intersection??
+      // Because this class is serializable we need to make sure the lambda is also serializable.
+      // This can be done by providing this type intersection on the definition of the lambda.
+      // Why not do it in the lambda interface? Functional Interfaces don't allow extends clauses
+      comparator = (ComparatorLambda & Serializable)(leftTuple, rightTuple) -> {
+        Comparable leftComp = (Comparable)leftTuple.get(leftField);
+        Comparable rightComp = (Comparable)rightTuple.get(rightField);
+        return rightComp.compareTo(leftComp);
+      };
+    }
+    else{
+      // See above for black magic reasoning.
+      comparator = (ComparatorLambda & Serializable)(leftTuple, rightTuple) -> {
+        Comparable leftComp = (Comparable)leftTuple.get(leftField);
+        Comparable rightComp = (Comparable)rightTuple.get(rightField);
+        return leftComp.compareTo(rightComp);
+      };
+    }
+  }
+
+  public int compare(Tuple leftTuple, Tuple rightTuple) {
+    return comparator.compare(leftTuple, rightTuple); 
+  }
+}
\ No newline at end of file

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultiComp.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultiComp.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultiComp.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultiComp.java Mon May 11 12:37:18 2015
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.comp;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+
+/**
+ *  Wraps multiple Comparators to provide sub-sorting.
+ **/
+
+public class MultiComp implements Comparator<Tuple>, ExpressibleComparator, Serializable {
+
+  private static final long serialVersionUID = 1;
+
+  private Comparator<Tuple>[] comps;
+
+  public MultiComp(Comparator<Tuple>... comps) {
+    this.comps = comps;
+  }
+
+  public int compare(Tuple t1, Tuple t2) {
+    for(Comparator<Tuple> comp : comps) {
+      int i = comp.compare(t1, t2);
+      if(i != 0) {
+        return i;
+      }
+    }
+
+    return 0;
+  }
+
+  @Override
+  public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+    StringBuilder sb = new StringBuilder();
+    for(Comparator<Tuple> comp : comps){
+      if(comp instanceof ExpressibleComparator){
+        if(sb.length() > 0){ sb.append(","); }
+        sb.append(((ExpressibleComparator)comp).toExpression(factory));
+      }
+      else{
+        throw new IOException("This MultiComp contains a non-expressible comparator - it cannot be converted to an expression");
+      }
+    }
+    
+    return new StreamExpressionValue(sb.toString());
+  }
+}
\ No newline at end of file

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java Mon May 11 12:37:18 2015
@@ -0,0 +1,45 @@
+package org.apache.solr.client.solrj.io.comp;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Defines a comparator that can be expressed in an expression
+ */
+public abstract class StreamComparator implements Comparator<Tuple>, Serializable {
+  protected String leftField;
+  protected String rightField;
+  protected final ComparatorOrder order;
+
+  public StreamComparator(String field, ComparatorOrder order) {
+    this.leftField = field;
+    this.rightField = field;
+    this.order = order;
+  }
+  public StreamComparator(String leftField, String rightField, ComparatorOrder order){
+    this.leftField = leftField;
+    this.rightField = rightField;
+    this.order = order;
+  }
+}

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/package-info.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/package-info.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/package-info.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/package-info.java Mon May 11 12:37:18 2015
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+/**
+ * Comparators for the Streaming Aggregation API
+ **/
+package org.apache.solr.client.solrj.io.comp;
+
+

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultiComp;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+
+/**
+ * Connects to Zookeeper to pick replicas from a specific collection to send the query to.
+ * Under the covers the SolrStream instances send the query to the replicas.
+ * SolrStreams are opened using a thread pool, but a single thread is used
+ * to iterate and merge Tuples from each SolrStream.
+ **/
+
+public class CloudSolrStream extends TupleStream implements ExpressibleStream {
+
+  private static final long serialVersionUID = 1;
+
+  protected String zkHost;
+  protected String collection;
+  protected Map<String,String> params;
+  private Map<String, String> fieldMappings;
+  protected Comparator<Tuple> comp;
+  private int zkConnectTimeout = 10000;
+  private int zkClientTimeout = 10000;
+  private int numWorkers;
+  private int workerID;
+  private boolean trace;
+  protected transient Map<String, Tuple> eofTuples;
+  protected transient SolrClientCache cache;
+  protected transient CloudSolrClient cloudSolrClient;
+  protected transient List<TupleStream> solrStreams;
+  protected transient TreeSet<TupleWrapper> tuples;
+  protected transient StreamContext streamContext;
+
+  // Used by parallel stream
+  protected CloudSolrStream(){
+    
+  }
+  public CloudSolrStream(String zkHost, String collectionName, Map params) throws IOException {
+    init(collectionName, zkHost, params);
+  }
+
+  public CloudSolrStream(StreamExpression expression, StreamFactory factory) throws IOException{   
+    // grab all parameters out
+    String collectionName = factory.getValueOperand(expression, 0);
+    List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
+    StreamExpressionNamedParameter aliasExpression = factory.getNamedOperand(expression, "aliases");
+    StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
+    
+    // Validate there are no unknown parameters - zkHost and alias are namedParameter so we don't need to count it twice
+    if(expression.getParameters().size() != 1 + namedParams.size()){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - unknown operands found",expression));
+    }
+    
+    // Collection Name
+    if(null == collectionName){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
+    }
+        
+    // Named parameters - passed directly to solr as solrparams
+    if(0 == namedParams.size()){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression));
+    }
+    
+    Map<String,String> params = new HashMap<String,String>();
+    for(StreamExpressionNamedParameter namedParam : namedParams){
+      if(!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("aliases")){
+        params.put(namedParam.getName(), namedParam.getParameter().toString().trim());
+      }
+    }
+
+    // Aliases, optional, if provided then need to split
+    if(null != aliasExpression && aliasExpression.getParameter() instanceof StreamExpressionValue){
+      fieldMappings = new HashMap<String,String>();
+      for(String mapping : ((StreamExpressionValue)aliasExpression.getParameter()).getValue().split(",")){
+        String[] parts = mapping.trim().split("=");
+        if(2 == parts.length){
+          fieldMappings.put(parts[0], parts[1]);
+        }
+        else{
+          throw new IOException(String.format(Locale.ROOT,"invalid expression %s - alias expected of the format origName=newName",expression));
+        }
+      }
+    }
+
+    // zkHost, optional - if not provided then will look into factory list to get
+    String zkHost = null;
+    if(null == zkHostExpression){
+      zkHost = factory.getCollectionZkHost(collectionName);
+    }
+    else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
+      zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
+    }
+    if(null == zkHost){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
+    }
+    
+    // We've got all the required items
+    init(collectionName, zkHost, params);
+  }
+  
+  @Override
+  public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+    // functionName(collectionName, param1, param2, ..., paramN, sort="comp", [aliases="field=alias,..."])
+    
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    
+    // collection
+    expression.addParameter(collection);
+    
+    // parameters
+    for(Entry<String,String> param : params.entrySet()){
+      expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), param.getValue()));
+    }
+    
+    // zkHost
+    expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
+    
+    // aliases
+    if(null != fieldMappings && 0 != fieldMappings.size()){
+      StringBuilder sb = new StringBuilder();
+      for(Entry<String,String> mapping : fieldMappings.entrySet()){
+        if(sb.length() > 0){ sb.append(","); }
+        sb.append(mapping.getKey());
+        sb.append("=");
+        sb.append(mapping.getValue());
+      }
+      
+      expression.addParameter(new StreamExpressionNamedParameter("aliases", sb.toString()));
+    }
+        
+    return expression;   
+  }
+  
+  private void init(String collectionName, String zkHost, Map params) throws IOException {
+    this.zkHost = zkHost;
+    this.collection = collectionName;
+    this.params = params;
+
+    // If the comparator is null then it was not explicitly set so we will create one using the sort parameter
+    // of the query. While doing this we will also take into account any aliases such that if we are sorting on
+    // fieldA but fieldA is aliased to alias.fieldA then the comparater will be against alias.fieldA.
+    if(!params.containsKey("fl")){
+      throw new IOException("fl param expected for a stream");
+    }
+    if(!params.containsKey("sort")){
+      throw new IOException("sort param expected for a stream");
+    }
+    this.comp = parseComp((String)params.get("sort"), (String)params.get("fl")); 
+  }
+  
+  public void setFieldMappings(Map<String, String> fieldMappings) {
+    this.fieldMappings = fieldMappings;
+  }
+
+  public void setTrace(boolean trace) {
+    this.trace = trace;
+  }
+
+  public void setStreamContext(StreamContext context) {
+    this.numWorkers = context.numWorkers;
+    this.workerID = context.workerID;
+    this.cache = context.getSolrClientCache();
+    this.streamContext = context;
+  }
+
+  /**
+  * Opens the CloudSolrStream
+  *
+  ***/
+  public void open() throws IOException {
+    this.tuples = new TreeSet();
+    this.solrStreams = new ArrayList();
+    this.eofTuples = Collections.synchronizedMap(new HashMap());
+    if(this.cache != null) {
+      this.cloudSolrClient = this.cache.getCloudSolrClient(zkHost);
+    } else {
+      this.cloudSolrClient = new CloudSolrClient(zkHost);
+      this.cloudSolrClient.connect();
+    }
+    constructStreams();
+    openStreams();
+  }
+
+
+  public Map getEofTuples() {
+    return this.eofTuples;
+  }
+
+  public List<TupleStream> children() {
+    return solrStreams;
+  }
+
+  private Comparator<Tuple> parseComp(String sort, String fl) throws IOException {
+
+    String[] fls = fl.split(",");
+    HashSet fieldSet = new HashSet();
+    for(String f : fls) {
+      fieldSet.add(f.trim()); //Handle spaces in the field list.
+    }
+
+    String[] sorts = sort.split(",");
+    Comparator[] comps = new Comparator[sorts.length];
+    for(int i=0; i<sorts.length; i++) {
+      String s = sorts[i];
+
+      String[] spec = s.trim().split("\\s+"); //This should take into account spaces in the sort spec.
+      
+      String fieldName = spec[0].trim();
+      String order = spec[1].trim();
+      
+      if(!fieldSet.contains(spec[0])) {
+        throw new IOException("Fields in the sort spec must be included in the field list:"+spec[0]);
+      }
+      
+      // if there's an alias for the field then use the alias
+      if(null != fieldMappings && fieldMappings.containsKey(fieldName)){
+        fieldName = fieldMappings.get(fieldName);
+      }
+      
+      comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
+    }
+
+    if(comps.length > 1) {
+      return new MultiComp(comps);
+    } else {
+      return comps[0];
+    }
+  }
+
+  protected void constructStreams() throws IOException {
+
+    try {
+
+      ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
+      ClusterState clusterState = zkStateReader.getClusterState();
+      //System.out.println("Connected to zk an got cluster state.");
+
+      Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
+      long time = System.currentTimeMillis();
+      params.put("distrib","false"); // We are the aggregator.
+
+      for(Slice slice : slices) {
+        Collection<Replica> replicas = slice.getReplicas();
+        List<Replica> shuffler = new ArrayList();
+        for(Replica replica : replicas) {
+          shuffler.add(replica);
+        }
+
+        Collections.shuffle(shuffler, new Random(time));
+        Replica rep = shuffler.get(0);
+        ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
+        String url = zkProps.getCoreUrl();
+        SolrStream solrStream = new SolrStream(url, params);
+        if(streamContext != null) {
+          solrStream.setStreamContext(streamContext);
+        }
+        solrStream.setFieldMappings(this.fieldMappings);
+        solrStreams.add(solrStream);
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  private void openStreams() throws IOException {
+    ExecutorService service = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("CloudSolrStream"));
+    try {
+      List<Future<TupleWrapper>> futures = new ArrayList();
+      for (TupleStream solrStream : solrStreams) {
+        StreamOpener so = new StreamOpener((SolrStream) solrStream, comp);
+        Future<TupleWrapper> future = service.submit(so);
+        futures.add(future);
+      }
+
+      try {
+        for (Future<TupleWrapper> f : futures) {
+          TupleWrapper w = f.get();
+          if (w != null) {
+            tuples.add(w);
+          }
+        }
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    } finally {
+      service.shutdown();
+    }
+  }
+
+  /**
+   *  Closes the CloudSolrStream
+   **/
+  public void close() throws IOException {
+    for(TupleStream solrStream : solrStreams) {
+      solrStream.close();
+    }
+
+    if(cache == null) {
+      cloudSolrClient.close();
+    }
+  }
+
+  public Tuple read() throws IOException {
+    return _read();
+  }
+
+  protected Tuple _read() throws IOException {
+    TupleWrapper tw = tuples.pollFirst();
+    if(tw != null) {
+      Tuple t = tw.getTuple();
+
+      if (trace) {
+        t.put("_COLLECTION_", this.collection);
+      }
+
+      if(tw.next()) {
+        tuples.add(tw);
+      }
+      return t;
+    } else {
+      Map m = new HashMap();
+      if(trace) {
+        m.put("_COLLECTION_", this.collection);
+      }
+
+      m.put("EOF", true);
+
+      return new Tuple(m);
+    }
+  }
+
+  protected class TupleWrapper implements Comparable<TupleWrapper> {
+    private Tuple tuple;
+    private SolrStream stream;
+    private Comparator comp;
+
+    public TupleWrapper(SolrStream stream, Comparator comp) {
+      this.stream = stream;
+      this.comp = comp;
+    }
+
+    public int compareTo(TupleWrapper w) {
+      if(this == w) {
+        return 0;
+      }
+
+      int i = comp.compare(tuple, w.tuple);
+      if(i == 0) {
+        return 1;
+      } else {
+        return i;
+      }
+    }
+
+    public boolean equals(Object o) {
+      return this == o;
+    }
+
+    public Tuple getTuple() {
+      return tuple;
+    }
+
+    public boolean next() throws IOException {
+      this.tuple = stream.read();
+
+      if(tuple.EOF) {
+        eofTuples.put(stream.getBaseUrl(), tuple);
+      }
+
+      return !tuple.EOF;
+    }
+  }
+
+  protected class StreamOpener implements Callable<TupleWrapper> {
+
+    private SolrStream stream;
+    private Comparator<Tuple> comp;
+
+    public StreamOpener(SolrStream stream, Comparator<Tuple> comp) {
+      this.stream = stream;
+      this.comp = comp;
+    }
+
+    public TupleWrapper call() throws Exception {
+      stream.open();
+      TupleWrapper wrapper = new TupleWrapper(stream, comp);
+      if(wrapper.next()) {
+        return wrapper;
+      } else {
+        return null;
+      }
+    }
+  }
+}
\ No newline at end of file

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExpressibleStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExpressibleStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExpressibleStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExpressibleStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,30 @@
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Defines a stream that can be expressed in an expression
+ */
+public interface ExpressibleStream {
+  StreamExpressionParameter toExpression(StreamFactory factory) throws IOException;
+}

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JSONTupleStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JSONTupleStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JSONTupleStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JSONTupleStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,165 @@
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.InputStreamResponseParser;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.noggit.JSONParser;
+import org.noggit.ObjectBuilder;
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+  Queries a Solr instance, and maps SolrDocs to Tuples.
+  Initial version works with the json format and only SolrDocs are handled.
+*/
+
+public class JSONTupleStream {
+  private List<String> path;  // future... for more general stream handling
+  private Reader reader;
+  private JSONParser parser;
+  private boolean atDocs;
+
+  public JSONTupleStream(Reader reader) {
+    this.reader = reader;
+    this.parser = new JSONParser(reader);
+  }
+
+  // temporary...
+  public static JSONTupleStream create(SolrClient server, SolrParams requestParams) throws IOException, SolrServerException {
+    String p = requestParams.get("qt");
+    if(p != null) {
+      ModifiableSolrParams modifiableSolrParams = (ModifiableSolrParams) requestParams;
+      modifiableSolrParams.remove("qt");
+    }
+
+    QueryRequest query = new QueryRequest( requestParams );
+    query.setPath(p);
+    query.setResponseParser(new InputStreamResponseParser("json"));
+    query.setMethod(SolrRequest.METHOD.POST);
+    NamedList<Object> genericResponse = server.request(query);
+    InputStream stream = (InputStream)genericResponse.get("stream");
+    InputStreamReader reader = new InputStreamReader(stream, "UTF-8");
+    return new JSONTupleStream(reader);
+  }
+
+
+  /** returns the next Tuple or null */
+  public Map<String,Object> next() throws IOException {
+    if (!atDocs) {
+      boolean found = advanceToDocs();
+      atDocs = true;
+      if (!found) return null;
+    }
+    // advance past ARRAY_START (in the case that we just advanced to docs, or OBJECT_END left over from the last call.
+    int event = parser.nextEvent();
+    if (event == JSONParser.ARRAY_END) return null;
+
+    Object o = ObjectBuilder.getVal(parser);
+    // right now, getVal will leave the last event read as OBJECT_END
+
+    return (Map<String,Object>)o;
+  }
+
+  public void close() throws IOException {
+    reader.close();
+  }
+
+
+  private void expect(int parserEventType) throws IOException {
+    int event = parser.nextEvent();
+    if (event != parserEventType) {
+      throw new IOException("JSONTupleStream: expected " + JSONParser.getEventString(parserEventType) + " but got " + JSONParser.getEventString(event) );
+    }
+  }
+
+  private void expect(String mapKey) {
+
+
+  }
+
+  private boolean advanceToMapKey(String key, boolean deepSearch) throws IOException {
+    for (;;) {
+      int event = parser.nextEvent();
+      switch (event) {
+        case JSONParser.STRING:
+          if (key != null) {
+            String val = parser.getString();
+            if (key.equals(val)) {
+              return true;
+            }
+          }
+          break;
+        case JSONParser.OBJECT_END:
+          return false;
+        case JSONParser.OBJECT_START:
+          if (deepSearch) {
+            boolean found = advanceToMapKey(key, true);
+            if (found) {
+              return true;
+            }
+          } else {
+            advanceToMapKey(null, false);
+          }
+          break;
+        case JSONParser.ARRAY_START:
+          skipArray(key, deepSearch);
+          break;
+      }
+    }
+  }
+
+  private void skipArray(String key, boolean deepSearch) throws IOException {
+    for (;;) {
+      int event = parser.nextEvent();
+      switch (event) {
+        case JSONParser.OBJECT_START:
+          advanceToMapKey(key, deepSearch);
+          break;
+        case JSONParser.ARRAY_START:
+          skipArray(key, deepSearch);
+          break;
+        case JSONParser.ARRAY_END:
+          return;
+      }
+    }
+  }
+
+
+  private boolean advanceToDocs() throws IOException {
+    expect(JSONParser.OBJECT_START);
+    boolean found = advanceToMapKey("docs", true);
+    expect(JSONParser.ARRAY_START);
+    return found;
+  }
+
+
+
+}
\ No newline at end of file

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Locale;
+import java.util.Map.Entry;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.ExpressibleComparator;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/**
+* Unions streamA with streamB ordering the Tuples based on a Comparator.
+* Both streams must be sorted by the fields being compared.
+**/
+
+
+public class MergeStream extends TupleStream implements ExpressibleStream {
+
+  private static final long serialVersionUID = 1;
+
+  private PushBackStream streamA;
+  private PushBackStream streamB;
+  private Comparator<Tuple> comp;
+
+  public MergeStream(TupleStream streamA, TupleStream streamB, Comparator<Tuple> comp) {
+    this.streamA = new PushBackStream(streamA);
+    this.streamB = new PushBackStream(streamB);
+    this.comp = comp;
+  }
+  
+  public MergeStream(StreamExpression expression,StreamFactory factory) throws IOException {
+    // grab all parameters out
+    List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ExpressibleStream.class, TupleStream.class);
+    StreamExpressionNamedParameter onExpression = factory.getNamedOperand(expression, "on");
+    
+    // validate expression contains only what we want.
+    if(expression.getParameters().size() != streamExpressions.size() + 1){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
+    }
+    
+    if(2 != streamExpressions.size()){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting two streams but found %d (must be PushBackStream types)",expression, streamExpressions.size()));
+    }
+    this.streamA = new PushBackStream(factory.constructStream(streamExpressions.get(0)));
+    this.streamB = new PushBackStream(factory.constructStream(streamExpressions.get(1)));
+    
+    if(null == onExpression || !(onExpression.getParameter() instanceof StreamExpressionValue)){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'on' parameter listing fields to merge on but didn't find one",expression));
+    }
+    
+    // Merge is always done over equality, so always use an EqualTo comparator
+    this.comp = factory.constructComparator(((StreamExpressionValue)onExpression.getParameter()).getValue(), FieldComparator.class);
+  }
+  
+  @Override
+  public StreamExpression toExpression(StreamFactory factory) throws IOException {    
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    
+    // streams
+    expression.addParameter(streamA.toExpression(factory));
+    expression.addParameter(streamB.toExpression(factory));
+    
+    // on
+    if(comp instanceof ExpressibleComparator){
+      expression.addParameter(new StreamExpressionNamedParameter("on",((ExpressibleComparator)comp).toExpression(factory)));
+    }
+    else{
+      throw new IOException("This MergeStream contains a non-expressible comparator - it cannot be converted to an expression");
+    }
+    
+    return expression;   
+  }
+
+  public void setStreamContext(StreamContext context) {
+    this.streamA.setStreamContext(context);
+    this.streamB.setStreamContext(context);
+  }
+
+  public List<TupleStream> children() {
+    List<TupleStream> l =  new ArrayList();
+    l.add(streamA);
+    l.add(streamB);
+    return l;
+  }
+
+  public void open() throws IOException {
+    streamA.open();
+    streamB.open();
+  }
+
+  public void close() throws IOException {
+    streamA.close();
+    streamB.close();
+  }
+
+  public Tuple read() throws IOException {
+    Tuple a = streamA.read();
+    Tuple b = streamB.read();
+
+    if(a.EOF && b.EOF) {
+      return a;
+    }
+
+    if(a.EOF) {
+      streamA.pushBack(a);
+      return b;
+    }
+
+    if(b.EOF) {
+      streamB.pushBack(b);
+      return a;
+    }
+
+    int c = comp.compare(a,b);
+
+    if(c < 0) {
+      streamB.pushBack(b);
+      return a;
+    } else {
+      streamA.pushBack(a);
+      return b;
+    }
+  }
+
+  public int getCost() {
+    return 0;
+  }
+}
\ No newline at end of file

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.net.URLEncoder;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.io.ByteArrayOutputStream;
+import java.util.Random;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.ExpressibleComparator;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.Base64;
+
+/**
+ * The ParallelStream decorates a TupleStream implementation and pushes it to N workers for parallel execution.
+ * Workers are chosen from a SolrCloud collection.
+ * Tuples that are streamed back from the workers are ordered by a Comparator.
+ **/
+
+
+public class ParallelStream extends CloudSolrStream implements ExpressibleStream {
+
+  private TupleStream tupleStream;
+  private int workers;
+  private boolean objectSerialize = true;
+  private transient StreamFactory streamFactory;
+
+  public ParallelStream(String zkHost,
+                        String collection,
+                        TupleStream tupleStream,
+                        int workers,
+                        Comparator<Tuple> comp) throws IOException {
+    init(zkHost,collection,tupleStream,workers,comp);
+  }
+
+
+  public ParallelStream(String zkHost,
+                        String collection,
+                        String expressionString,
+                        int workers,
+                        Comparator<Tuple> comp) throws IOException {
+    objectSerialize = false;
+    TupleStream tStream = this.streamFactory.constructStream(expressionString);
+    init(zkHost,collection, tStream, workers,comp);
+  }
+
+  public ParallelStream(StreamExpression expression, StreamFactory factory) throws IOException {
+    // grab all parameters out
+    objectSerialize = false;
+    String collectionName = factory.getValueOperand(expression, 0);
+    StreamExpressionNamedParameter workersParam = factory.getNamedOperand(expression, "workers");
+    List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ExpressibleStream.class, TupleStream.class);
+    StreamExpressionNamedParameter sortExpression = factory.getNamedOperand(expression, "sort");
+    StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
+    
+    // validate expression contains only what we want.
+
+    if(expression.getParameters().size() != streamExpressions.size() + 3 + (null != zkHostExpression ? 1 : 0)){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
+    }
+    
+    // Collection Name
+    if(null == collectionName){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
+    }
+
+    // Workers
+    if(null == workersParam || null == workersParam.getParameter() || !(workersParam.getParameter() instanceof StreamExpressionValue)){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single 'workersParam' parameter of type positive integer but didn't find one",expression));
+    }
+    String workersStr = ((StreamExpressionValue)workersParam.getParameter()).getValue();
+    int workersInt = 0;
+    try{
+      workersInt = Integer.parseInt(workersStr);
+      if(workersInt <= 0){
+        throw new IOException(String.format(Locale.ROOT,"invalid expression %s - workers '%s' must be greater than 0.",expression, workersStr));
+      }
+    }
+    catch(NumberFormatException e){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - workers '%s' is not a valid integer.",expression, workersStr));
+    }    
+
+    // Stream
+    if(1 != streamExpressions.size()){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
+    }
+    
+    // Sort
+    if(null == sortExpression || !(sortExpression.getParameter() instanceof StreamExpressionValue)){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'sort' parameter telling us how to join the parallel streams but didn't find one",expression));
+    }
+    
+    // zkHost, optional - if not provided then will look into factory list to get
+    String zkHost = null;
+    if(null == zkHostExpression){
+      zkHost = factory.getCollectionZkHost(collectionName);
+    }
+    else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
+      zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
+    }
+    if(null == zkHost){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
+    }
+    
+    // We've got all the required items    
+    TupleStream stream = factory.constructStream(streamExpressions.get(0));
+    Comparator<Tuple> comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), FieldComparator.class);
+    streamFactory = factory;
+    init(zkHost,collectionName,stream,workersInt,comp);
+  }
+
+  private void init(String zkHost,String collection,TupleStream tupleStream,int workers,Comparator<Tuple> comp) throws IOException{
+    this.zkHost = zkHost;
+    this.collection = collection;
+    this.workers = workers;
+    this.comp = comp;
+    this.tupleStream = tupleStream;
+
+    // requires Expressible stream and comparator
+    if(!objectSerialize && !(tupleStream instanceof ExpressibleStream)){
+      throw new IOException("Unable to create ParallelStream with a non-expressible TupleStream.");
+    }
+  }
+
+  @Override
+  public StreamExpression toExpression(StreamFactory factory) throws IOException {    
+
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    
+    // collection
+    expression.addParameter(collection);
+    
+    // workers
+    expression.addParameter(new StreamExpressionNamedParameter("workers", Integer.toString(workers)));
+    
+    // stream
+    if(tupleStream instanceof ExpressibleStream){
+      expression.addParameter(((ExpressibleStream)tupleStream).toExpression(factory));
+    }
+    else{
+      throw new IOException("This ParallelStream contains a non-expressible TupleStream - it cannot be converted to an expression");
+    }
+        
+    // sort
+    if(comp instanceof ExpressibleComparator){
+      expression.addParameter(new StreamExpressionNamedParameter("sort",((ExpressibleComparator)comp).toExpression(factory)));
+    }
+    else{
+      throw new IOException("This ParallelStream contains a non-expressible comparator - it cannot be converted to an expression");
+    }
+    
+    // zkHost
+    expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
+    
+    return expression;   
+  }
+  
+  public List<TupleStream> children() {
+    List l = new ArrayList();
+    l.add(tupleStream);
+    return l;
+  }
+
+  public Tuple read() throws IOException {
+    Tuple tuple = _read();
+
+    if(tuple.EOF) {
+      Map m = new HashMap();
+      m.put("EOF", true);
+      Tuple t = new Tuple(m);
+
+      Map<String, Map> metrics = new HashMap();
+      Iterator<Entry<String,Tuple>> it = this.eofTuples.entrySet().iterator();
+      while(it.hasNext()) {
+        Map.Entry<String, Tuple> entry = it.next();
+        metrics.put(entry.getKey(), entry.getValue().fields);
+      }
+
+      t.setMetrics(metrics);
+      return t;
+    }
+
+    return tuple;
+  }
+
+  public void setStreamContext(StreamContext streamContext) {
+    this.streamContext = streamContext;
+    if(streamFactory == null) {
+      this.streamFactory = streamContext.getStreamFactory();
+    }
+    this.tupleStream.setStreamContext(streamContext);
+  }
+
+  protected void constructStreams() throws IOException {
+
+    try {
+      Object pushStream = null;
+
+      if (objectSerialize) {
+        ByteArrayOutputStream bout = new ByteArrayOutputStream();
+        ObjectOutputStream out = new ObjectOutputStream(bout);
+        out.writeObject(tupleStream);
+        byte[] bytes = bout.toByteArray();
+        String encoded = Base64.byteArrayToBase64(bytes, 0, bytes.length);
+        pushStream = URLEncoder.encode(encoded, "UTF-8");
+      } else {
+        pushStream = ((ExpressibleStream) tupleStream).toExpression(streamFactory);
+      }
+
+      ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
+      ClusterState clusterState = zkStateReader.getClusterState();
+      Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
+      long time = System.currentTimeMillis();
+      List<Replica> shuffler = new ArrayList();
+      for(Slice slice : slices) {
+        Collection<Replica> replicas = slice.getReplicas();
+        for (Replica replica : replicas) {
+          shuffler.add(replica);
+        }
+      }
+
+      if(workers > shuffler.size()) {
+        throw new IOException("Number of workers exceeds nodes in the worker collection");
+      }
+
+      Collections.shuffle(shuffler, new Random(time));
+
+      for(int w=0; w<workers; w++) {
+        HashMap params = new HashMap();
+        params.put("distrib","false"); // We are the aggregator.
+        params.put("numWorkers", workers);
+        params.put("workerID", w);
+        params.put("stream", pushStream);
+        params.put("qt","/stream");
+        params.put("objectSerialize", objectSerialize);
+        Replica rep = shuffler.get(w);
+        ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
+        String url = zkProps.getCoreUrl();
+        SolrStream solrStream = new SolrStream(url, params);
+        solrStreams.add(solrStream);
+      }
+
+      assert(solrStreams.size() == workers);
+
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  public void setObjectSerialize(boolean objectSerialize) {
+    this.objectSerialize = objectSerialize;
+  }
+
+  public boolean getObjectSerialize() {
+    return objectSerialize;
+  }
+}

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/**
+ * A TupleStream that allows a single Tuple to be pushed back onto the stream after it's been read.
+ * This is a useful class when building streams that maintain the order of Tuples between multiple
+ * substreams.
+ **/
+
+public class PushBackStream extends TupleStream {
+
+  private static final long serialVersionUID = 1;
+
+  private TupleStream stream;
+  private Tuple tuple;
+
+  public PushBackStream(TupleStream stream) {
+    this.stream = stream;
+  }
+  
+  public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException{
+    if(stream instanceof ExpressibleStream){
+      return ((ExpressibleStream)stream).toExpression(factory);
+    }
+    
+    throw new IOException("This PushBackStream contains a non-expressible TupleStream - it cannot be converted to an expression");
+  }
+
+  public void setStreamContext(StreamContext context) {
+    this.stream.setStreamContext(context);
+  }
+
+  public List<TupleStream> children() {
+    List<TupleStream> l =  new ArrayList();
+    l.add(stream);
+    return l;
+  }
+
+  public void open() throws IOException {
+    stream.open();
+  }
+
+  public void close() throws IOException {
+    stream.close();
+  }
+
+  public void pushBack(Tuple tuple) {
+    this.tuple = tuple;
+  }
+
+  public Tuple read() throws IOException {
+    if(tuple != null) {
+      Tuple t = tuple;
+      tuple = null;
+      return t;
+    } else {
+      return stream.read();
+    }
+  }
+
+  public int getCost() {
+    return 0;
+  }
+}
\ No newline at end of file

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.PriorityQueue;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.ExpressibleComparator;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+
+/**
+*  Iterates over a TupleStream and Ranks the topN tuples based on a Comparator.
+**/
+
+public class RankStream extends TupleStream implements ExpressibleStream {
+
+  private static final long serialVersionUID = 1;
+
+  private TupleStream tupleStream;
+  private Comparator<Tuple> comp;
+  private int size;
+  private transient PriorityQueue<Tuple> top;
+  private transient boolean finished = false;
+  private transient LinkedList<Tuple> topList;
+
+  public RankStream(TupleStream tupleStream, int size, Comparator<Tuple> comp) {
+    init(tupleStream,size,comp);
+  }
+  
+  public RankStream(StreamExpression expression, StreamFactory factory) throws IOException {
+    // grab all parameters out
+    List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ExpressibleStream.class, TupleStream.class);
+    StreamExpressionNamedParameter nParam = factory.getNamedOperand(expression, "n");
+    StreamExpressionNamedParameter sortExpression = factory.getNamedOperand(expression, "sort");
+    
+    // validate expression contains only what we want.
+    if(expression.getParameters().size() != streamExpressions.size() + 2){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
+    }
+    
+    if(null == nParam || null == nParam.getParameter() || !(nParam.getParameter() instanceof StreamExpressionValue)){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single 'n' parameter of type positive integer but didn't find one",expression));
+    }
+    String nStr = ((StreamExpressionValue)nParam.getParameter()).getValue();
+    int nInt = 0;
+    try{
+      nInt = Integer.parseInt(nStr);
+      if(nInt <= 0){
+        throw new IOException(String.format(Locale.ROOT,"invalid expression %s - topN '%s' must be greater than 0.",expression, nStr));
+      }
+    }
+    catch(NumberFormatException e){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - topN '%s' is not a valid integer.",expression, nStr));
+    }    
+    if(1 != streamExpressions.size()){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
+    }    
+    if(null == sortExpression || !(sortExpression.getParameter() instanceof StreamExpressionValue)){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'over' parameter listing fields to unique over but didn't find one",expression));
+    }
+    
+    TupleStream stream = factory.constructStream(streamExpressions.get(0));
+    Comparator<Tuple> comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), FieldComparator.class);
+    
+    init(stream,nInt,comp);    
+  }
+  
+  private void init(TupleStream tupleStream, int size, Comparator<Tuple> comp){
+    this.tupleStream = tupleStream;
+    this.comp = comp;
+    this.size = size;
+  }
+  
+  @Override
+  public StreamExpression toExpression(StreamFactory factory) throws IOException {    
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    
+    // n
+    expression.addParameter(new StreamExpressionNamedParameter("n", Integer.toString(size)));
+    
+    // stream
+    if(tupleStream instanceof ExpressibleStream){
+      expression.addParameter(((ExpressibleStream)tupleStream).toExpression(factory));
+    }
+    else{
+      throw new IOException("This RankStream contains a non-expressible TupleStream - it cannot be converted to an expression");
+    }
+        
+    // sort
+    if(comp instanceof ExpressibleComparator){
+      expression.addParameter(new StreamExpressionNamedParameter("sort",((ExpressibleComparator)comp).toExpression(factory)));
+    }
+    else{
+      throw new IOException("This RankStream contains a non-expressible comparator - it cannot be converted to an expression");
+    }
+    
+    return expression;   
+  }
+  
+  public void setStreamContext(StreamContext context) {
+    this.tupleStream.setStreamContext(context);
+  }
+
+  public List<TupleStream> children() {
+    List<TupleStream> l =  new ArrayList();
+    l.add(tupleStream);
+    return l;
+  }
+
+  public void open() throws IOException {
+    this.top = new PriorityQueue(size, new ReverseComp(comp));
+    this.topList = new LinkedList();
+    tupleStream.open();
+  }
+
+  public void close() throws IOException {
+    tupleStream.close();
+  }
+  
+  public Comparator<Tuple> getComparator(){
+    return this.comp;
+  }
+
+  public Tuple read() throws IOException {
+    if(!finished) {
+      while(true) {
+        Tuple tuple = tupleStream.read();
+        if(tuple.EOF) {
+          finished = true;
+          int s = top.size();
+          for(int i=0; i<s; i++) {
+            Tuple t = top.poll();
+            topList.addFirst(t);
+          }
+          topList.addLast(tuple);
+          break;
+        } else {
+          Tuple peek = top.peek();
+          if(top.size() >= size) {
+            if(comp.compare(tuple, peek) < 0) {
+              top.poll();
+              top.add(tuple);
+            }
+          } else {
+            top.add(tuple);
+          }
+        }
+      }
+    }
+
+    return topList.pollFirst();
+  }
+
+  public int getCost() {
+    return 0;
+  }
+
+  class ReverseComp implements Comparator<Tuple>, Serializable {
+
+    private Comparator<Tuple> comp;
+
+    public ReverseComp(Comparator<Tuple> comp) {
+      this.comp = comp;
+    }
+
+    public int compare(Tuple t1, Tuple t2) {
+      return comp.compare(t1, t2)*(-1);
+    }
+  }
+}
\ No newline at end of file