You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/05/11 14:37:19 UTC

svn commit: r1678743 [3/3] - in /lucene/dev/trunk/solr: core/src/java/org/apache/solr/handler/ core/src/java/org/apache/solr/response/ server/solr/configsets/data_driven_schema_configs/conf/ solrj/src/java/org/apache/solr/client/solrj/io/ solrj/src/jav...

Added: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java (added)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java Mon May 11 12:37:18 2015
@@ -0,0 +1,745 @@
+package org.apache.solr.client.solrj.io.stream;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
+import org.apache.solr.cloud.AbstractZkTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *  All base tests will be done with CloudSolrStream. Under the covers CloudSolrStream uses SolrStream so
+ *  SolrStream will get fully exercised through these tests.
+ *
+ **/
+
+@Slow
+@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40","Lucene41","Lucene42","Lucene45"})
+public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
+
+  private static final String SOLR_HOME = getFile("solrj" + File.separator + "solr").getAbsolutePath();
+
+  static {
+    schemaString = "schema-streaming.xml";
+  }
+
+  @BeforeClass
+  public static void beforeSuperClass() {
+    AbstractZkTestCase.SOLRHOME = new File(SOLR_HOME());
+  }
+
+  @AfterClass
+  public static void afterSuperClass() {
+
+  }
+
+  protected String getCloudSolrConfig() {
+    return "solrconfig-streaming.xml";
+  }
+
+
+  @Override
+  public String getSolrHome() {
+    return SOLR_HOME;
+  }
+
+  public static String SOLR_HOME() {
+    return SOLR_HOME;
+  }
+
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    // we expect this time of exception as shards go up and down...
+    //ignoreException(".*");
+
+    System.setProperty("numShards", Integer.toString(sliceCount));
+  }
+
+  @Override
+  @After
+  public void tearDown() throws Exception {
+    super.tearDown();
+    resetExceptionIgnores();
+  }
+
+  public StreamExpressionTest() {
+    super();
+    sliceCount = 2;
+  }
+
+  @Test
+  public void testAll() throws Exception{
+    assertNotNull(cloudClient);
+
+    handle.clear();
+    handle.put("timestamp", SKIPVAL);
+
+    waitForThingsToLevelOut(30);
+
+    del("*:*");
+    commit();
+    
+    testCloudSolrStream();
+    testCloudSolrStreamWithZkHost();
+    testMergeStream();
+    testRankStream();
+    testReducerStream();
+    testUniqueStream();
+    testParallelUniqueStream();
+    testParallelReducerStream();
+    testParallelRankStream();
+    testParallelMergeStream();
+  }
+
+  private void testCloudSolrStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+    commit();
+
+    StreamFactory factory = new StreamFactory().withCollectionZkHost("collection1", zkServer.getZkAddress());
+    StreamExpression expression;
+    CloudSolrStream stream;
+    List<Tuple> tuples;
+    
+    // Basic test
+    expression = StreamExpressionParser.parse("search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+    stream = new CloudSolrStream(expression, factory);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 5);
+    assertOrder(tuples, 0,2,1,3,4);
+    assertLong(tuples.get(0),"a_i", 0);
+
+    // Basic w/aliases
+    expression = StreamExpressionParser.parse("search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"a_i=alias.a_i, a_s=name\")");
+    stream = new CloudSolrStream(expression, factory);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 5);
+    assertOrder(tuples, 0,2,1,3,4);
+    assertLong(tuples.get(0),"alias.a_i", 0);
+    assertString(tuples.get(0),"name", "hello0");    
+
+    // Basic filtered test
+    expression = StreamExpressionParser.parse("search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+    stream = new CloudSolrStream(expression, factory);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 3);
+    assertOrder(tuples, 0,3,4);
+    assertLong(tuples.get(1),"a_i", 3);
+    
+    del("*:*");
+    commit();
+  }
+
+  private void testCloudSolrStreamWithZkHost() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+    commit();
+
+    StreamFactory factory = new StreamFactory();
+    StreamExpression expression;
+    CloudSolrStream stream;
+    List<Tuple> tuples;
+    
+    // Basic test
+    expression = StreamExpressionParser.parse("search(collection1, zkHost=" + zkServer.getZkAddress() + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+    stream = new CloudSolrStream(expression, factory);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 5);
+    assertOrder(tuples, 0,2,1,3,4);
+    assertLong(tuples.get(0),"a_i", 0);
+
+    // Basic w/aliases
+    expression = StreamExpressionParser.parse("search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"a_i=alias.a_i, a_s=name\", zkHost=" + zkServer.getZkAddress() + ")");
+    stream = new CloudSolrStream(expression, factory);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 5);
+    assertOrder(tuples, 0,2,1,3,4);
+    assertLong(tuples.get(0),"alias.a_i", 0);
+    assertString(tuples.get(0),"name", "hello0");    
+
+    // Basic filtered test
+    expression = StreamExpressionParser.parse("search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", zkHost=" + zkServer.getZkAddress() + ", sort=\"a_f asc, a_i asc\")");
+    stream = new CloudSolrStream(expression, factory);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 3);
+    assertOrder(tuples, 0,3,4);
+    assertLong(tuples.get(1),"a_i", 3);
+    
+    del("*:*");
+    commit();
+  }
+
+  
+  private void testUniqueStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+    commit();
+
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+    
+    StreamFactory factory = new StreamFactory()
+      .withCollectionZkHost("collection1", zkServer.getZkAddress())
+      .withStreamFunction("search", CloudSolrStream.class)
+      .withStreamFunction("unique", UniqueStream.class);
+    
+    // Basic test
+    expression = StreamExpressionParser.parse("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f asc\")");
+    stream = new UniqueStream(expression, factory);
+    tuples = getTuples(stream);
+    
+    assert(tuples.size() == 4);
+    assertOrder(tuples, 0, 1, 3, 4);
+
+    // Basic test desc
+    expression = StreamExpressionParser.parse("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"), over=\"a_f desc\")");
+    stream = new UniqueStream(expression, factory);
+    tuples = getTuples(stream);
+    
+    assert(tuples.size() == 4);
+    assertOrder(tuples, 4,3,1,2);
+    
+    // Basic w/multi comp
+    expression = StreamExpressionParser.parse("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f asc, a_i asc\")");
+    stream = new UniqueStream(expression, factory);
+    tuples = getTuples(stream);
+    
+    assert(tuples.size() == 5);
+    assertOrder(tuples, 0,2,1,3,4);
+    
+    // full factory w/multi comp
+    stream = factory.constructStream("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f asc, a_i asc\")");
+    tuples = getTuples(stream);
+    
+    assert(tuples.size() == 5);
+    assertOrder(tuples, 0, 2, 1, 3, 4);
+    
+    del("*:*");
+    commit();
+  }
+  
+  private void testMergeStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+    commit();
+
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+    
+    StreamFactory factory = new StreamFactory()
+      .withCollectionZkHost("collection1", zkServer.getZkAddress())
+      .withStreamFunction("search", CloudSolrStream.class)
+      .withStreamFunction("unique", UniqueStream.class)
+      .withStreamFunction("merge", MergeStream.class);
+    
+    // Basic test
+    expression = StreamExpressionParser.parse("merge("
+                                                + "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"),"
+                                                + "search(collection1, q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"),"
+                                                + "on=\"a_f asc\")");
+    stream = new MergeStream(expression, factory);
+    tuples = getTuples(stream);
+    
+    assert(tuples.size() == 4);
+    assertOrder(tuples, 0,1,3,4);
+
+    // Basic test desc
+    expression = StreamExpressionParser.parse("merge("
+                                              + "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+                                              + "search(collection1, q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+                                              + "on=\"a_f desc\")");
+    stream = new MergeStream(expression, factory);
+    tuples = getTuples(stream);
+    
+    assert(tuples.size() == 4);
+    assertOrder(tuples, 4,3,1,0);
+    
+    // Basic w/multi comp
+    expression = StreamExpressionParser.parse("merge("
+                                              + "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+                                              + "search(collection1, q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+                                              + "on=\"a_f asc, a_s asc\")");
+    stream = new MergeStream(expression, factory);
+    tuples = getTuples(stream);
+    
+    assert(tuples.size() == 5);
+    assertOrder(tuples, 0,2,1,3,4);
+    
+    // full factory w/multi comp
+    stream = factory.constructStream("merge("
+                                    + "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+                                    + "search(collection1, q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+                                    + "on=\"a_f asc, a_s asc\")");
+    tuples = getTuples(stream);
+    
+    assert(tuples.size() == 5);
+    assertOrder(tuples, 0,2,1,3,4);
+    
+    del("*:*");
+    commit();
+  }
+  
+  private void testRankStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+    commit();
+
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+    
+    StreamFactory factory = new StreamFactory()
+      .withCollectionZkHost("collection1", zkServer.getZkAddress())
+      .withStreamFunction("search", CloudSolrStream.class)
+      .withStreamFunction("unique", UniqueStream.class)
+      .withStreamFunction("top", RankStream.class);
+    
+    // Basic test
+    expression = StreamExpressionParser.parse("top("
+                                              + "n=3,"
+                                              + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
+                                              + "sort=\"a_f asc, a_i asc\")");
+    stream = new RankStream(expression, factory);
+    tuples = getTuples(stream);
+    
+    assert(tuples.size() == 3);
+    assertOrder(tuples, 0,2,1);
+
+    // Basic test desc
+    expression = StreamExpressionParser.parse("top("
+                                              + "n=2,"
+                                              + "unique("
+                                              +   "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+                                              +   "over=\"a_f desc\"),"
+                                              + "sort=\"a_f desc\")");
+    stream = new RankStream(expression, factory);
+    tuples = getTuples(stream);
+    
+    assert(tuples.size() == 2);
+    assertOrder(tuples, 4,3);
+    
+    // full factory
+    stream = factory.constructStream("top("
+                                    + "n=4,"
+                                    + "unique("
+                                    +   "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
+                                    +   "over=\"a_f asc\"),"
+                                    + "sort=\"a_f asc\")");
+    tuples = getTuples(stream);
+    
+    assert(tuples.size() == 4);
+    assertOrder(tuples, 0,1,3,4);
+    
+    del("*:*");
+    commit();
+  }
+  
+  private void testReducerStream() throws Exception{
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
+    commit();
+    
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+    Tuple t0, t1, t2;
+    List<Map> maps0, maps1, maps2;
+    
+    StreamFactory factory = new StreamFactory()
+      .withCollectionZkHost("collection1", zkServer.getZkAddress())
+      .withStreamFunction("search", CloudSolrStream.class)
+      .withStreamFunction("unique", UniqueStream.class)
+      .withStreamFunction("top", RankStream.class)
+      .withStreamFunction("group", ReducerStream.class);
+
+    // basic
+    expression = StreamExpressionParser.parse("group("
+                                              + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
+                                              + "by=\"a_s asc\")");
+    stream = new ReducerStream(expression, factory);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 3);
+    assertOrder(tuples, 0,3,4);
+
+    t0 = tuples.get(0);
+    maps0 = t0.getMaps();
+    assertMaps(maps0, 0, 2,1, 9);
+
+    t1 = tuples.get(1);
+    maps1 = t1.getMaps();
+    assertMaps(maps1, 3, 5, 7, 8);
+
+    t2 = tuples.get(2);
+    maps2 = t2.getMaps();
+    assertMaps(maps2, 4, 6);
+    
+    // basic w/spaces
+    expression = StreamExpressionParser.parse("group("
+                                              + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f       asc\"),"
+                                              + "by=\"a_s asc\")");
+    stream = new ReducerStream(expression, factory);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 3);
+    assertOrder(tuples, 0,3,4);
+
+    t0 = tuples.get(0);
+    maps0 = t0.getMaps();
+    assertMaps(maps0, 0, 2,1, 9);
+
+    t1 = tuples.get(1);
+    maps1 = t1.getMaps();
+    assertMaps(maps1, 3, 5, 7, 8);
+
+    t2 = tuples.get(2);
+    maps2 = t2.getMaps();
+    assertMaps(maps2, 4, 6);
+
+    del("*:*");
+    commit();
+  }
+
+  private void testParallelUniqueStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+    indexr(id, "5", "a_s", "hello1", "a_i", "10", "a_f", "1");
+    indexr(id, "6", "a_s", "hello1", "a_i", "11", "a_f", "5");
+    indexr(id, "7", "a_s", "hello1", "a_i", "12", "a_f", "5");
+    indexr(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    StreamFactory streamFactory = new StreamFactory().withCollectionZkHost("collection1", zkServer.getZkAddress())
+        .withStreamFunction("search", CloudSolrStream.class)
+        .withStreamFunction("unique", UniqueStream.class)
+        .withStreamFunction("top", RankStream.class)
+        .withStreamFunction("group", ReducerStream.class)
+        .withStreamFunction("parallel", ParallelStream.class);
+
+    ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f asc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_f asc\")");
+
+    List<Tuple> tuples = getTuples(pstream);
+    assert(tuples.size() == 5);
+    assertOrder(tuples, 0,1,3,4,6);
+
+    //Test the eofTuples
+
+    Map<String,Tuple> eofTuples = pstream.getEofTuples();
+    assert(eofTuples.size() == 2); //There should be an EOF tuple for each worker.
+
+    del("*:*");
+    commit();
+
+  }
+
+  private void testParallelReducerStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    StreamFactory streamFactory = new StreamFactory().withCollectionZkHost("collection1", zkServer.getZkAddress())
+        .withStreamFunction("search", CloudSolrStream.class)
+        .withStreamFunction("unique", UniqueStream.class)
+        .withStreamFunction("top", RankStream.class)
+        .withStreamFunction("group", ReducerStream.class)
+        .withStreamFunction("parallel", ParallelStream.class);
+
+    ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s asc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s asc\")");
+
+    List<Tuple> tuples = getTuples(pstream);
+
+    assert(tuples.size() == 3);
+    assertOrder(tuples, 0,3,4);
+
+    Tuple t0 = tuples.get(0);
+    List<Map> maps0 = t0.getMaps();
+    assertMaps(maps0, 0, 2, 1, 9);
+
+    Tuple t1 = tuples.get(1);
+    List<Map> maps1 = t1.getMaps();
+    assertMaps(maps1, 3, 5, 7, 8);
+
+    Tuple t2 = tuples.get(2);
+    List<Map> maps2 = t2.getMaps();
+    assertMaps(maps2, 4, 6);
+
+    //Test Descending with Ascending subsort
+
+    pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s desc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s desc\")");
+
+    tuples = getTuples(pstream);
+
+    assert(tuples.size() == 3);
+    assertOrder(tuples, 4,3,0);
+
+    t0 = tuples.get(0);
+    maps0 = t0.getMaps();
+    assertMaps(maps0, 4, 6);
+
+
+    t1 = tuples.get(1);
+    maps1 = t1.getMaps();
+    assertMaps(maps1, 3, 5, 7, 8);
+
+
+    t2 = tuples.get(2);
+    maps2 = t2.getMaps();
+    assertMaps(maps2, 0, 2, 1, 9);
+
+
+
+    del("*:*");
+    commit();
+  }
+
+  private void testParallelRankStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "5", "a_s", "hello1", "a_i", "5", "a_f", "1");
+    indexr(id, "6", "a_s", "hello1", "a_i", "6", "a_f", "1");
+    indexr(id, "7", "a_s", "hello1", "a_i", "7", "a_f", "1");
+    indexr(id, "8", "a_s", "hello1", "a_i", "8", "a_f", "1");
+    indexr(id, "9", "a_s", "hello1", "a_i", "9", "a_f", "1");
+    indexr(id, "10", "a_s", "hello1", "a_i", "10", "a_f", "1");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    StreamFactory streamFactory = new StreamFactory().withCollectionZkHost("collection1", zkServer.getZkAddress())
+        .withStreamFunction("search", CloudSolrStream.class)
+        .withStreamFunction("unique", UniqueStream.class)
+        .withStreamFunction("top", RankStream.class)
+        .withStreamFunction("group", ReducerStream.class)
+        .withStreamFunction("parallel", ParallelStream.class);
+
+    ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, top(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), n=\"11\", sort=\"a_i desc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i desc\")");
+
+    List<Tuple> tuples = getTuples(pstream);
+
+    assert(tuples.size() == 10);
+    assertOrder(tuples, 10,9,8,7,6,5,4,3,2,0);
+
+    del("*:*");
+    commit();
+  }
+
+  private void testParallelMergeStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+    indexr(id, "5", "a_s", "hello0", "a_i", "10", "a_f", "0");
+    indexr(id, "6", "a_s", "hello2", "a_i", "8", "a_f", "0");
+    indexr(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "3");
+    indexr(id, "8", "a_s", "hello4", "a_i", "11", "a_f", "4");
+    indexr(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    StreamFactory streamFactory = new StreamFactory().withCollectionZkHost("collection1", zkServer.getZkAddress())
+        .withStreamFunction("search", CloudSolrStream.class)
+        .withStreamFunction("unique", UniqueStream.class)
+        .withStreamFunction("top", RankStream.class)
+        .withStreamFunction("group", ReducerStream.class)
+        .withStreamFunction("merge", MergeStream.class)
+        .withStreamFunction("parallel", ParallelStream.class);
+
+    //Test ascending
+    ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, merge(search(collection1, q=\"id:(4 1 8 7 9)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), search(collection1, q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), on=\"a_i asc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i asc\")");
+
+    List<Tuple> tuples = getTuples(pstream);
+
+
+
+    assert(tuples.size() == 9);
+    assertOrder(tuples, 0,1,2,3,4,7,6,8,9);
+
+    //Test descending
+
+    pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, merge(search(collection1, q=\"id:(4 1 8 9)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), search(collection1, q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), on=\"a_i desc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i desc\")");
+
+    tuples = getTuples(pstream);
+
+    assert(tuples.size() == 8);
+    assertOrder(tuples, 9,8,6,4,3,2,1,0);
+
+    del("*:*");
+    commit();
+  }
+
+
+  protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
+    tupleStream.open();
+    List<Tuple> tuples = new ArrayList<Tuple>();
+    for(Tuple t = tupleStream.read(); !t.EOF; t = tupleStream.read()) {
+      tuples.add(t);
+    }
+    tupleStream.close();
+    return tuples;
+  }
+  protected boolean assertOrder(List<Tuple> tuples, int... ids) throws Exception {
+    return assertOrderOf(tuples, "id", ids);
+  }
+  protected boolean assertOrderOf(List<Tuple> tuples, String fieldName, int... ids) throws Exception {
+    int i = 0;
+    for(int val : ids) {
+      Tuple t = tuples.get(i);
+      Long tip = (Long)t.get(fieldName);
+      if(tip.intValue() != val) {
+        throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+      }
+      ++i;
+    }
+    return true;
+  }
+
+  protected boolean assertGroupOrder(Tuple tuple, int... ids) throws Exception {
+    List<?> group = (List<?>)tuple.get("tuples");
+    int i=0;
+    for(int val : ids) {
+      Map<?,?> t = (Map<?,?>)group.get(i);
+      Long tip = (Long)t.get("id");
+      if(tip.intValue() != val) {
+        throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+      }
+      ++i;
+    }
+    return true;
+  }
+
+  public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exception {
+    long lv = (long)tuple.get(fieldName);
+    if(lv != l) {
+      throw new Exception("Longs not equal:"+l+" : "+lv);
+    }
+
+    return true;
+  }
+  
+  public boolean assertString(Tuple tuple, String fieldName, String expected) throws Exception {
+    String actual = (String)tuple.get(fieldName);
+    
+    if( (null == expected && null != actual) ||
+        (null != expected && null == actual) ||
+        (null != expected && !expected.equals(actual))){
+      throw new Exception("Longs not equal:"+expected+" : "+actual);
+    }
+
+    return true;
+  }
+  
+  protected boolean assertMaps(List<Map> maps, int... ids) throws Exception {
+    if(maps.size() != ids.length) {
+      throw new Exception("Expected id count != actual map count:"+ids.length+":"+maps.size());
+    }
+
+    int i=0;
+    for(int val : ids) {
+      Map t = maps.get(i);
+      Long tip = (Long)t.get("id");
+      if(tip.intValue() != val) {
+        throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+      }
+      ++i;
+    }
+    return true;
+  }
+
+  @Override
+  protected void indexr(Object... fields) throws Exception {
+    SolrInputDocument doc = getDoc(fields);
+    indexDoc(doc);
+  }
+}

Added: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java (added)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java Mon May 11 12:37:18 2015
@@ -0,0 +1,137 @@
+package org.apache.solr.client.solrj.io.stream;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.List;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.junit.Test;
+
+/**
+ **/
+
+public class StreamExpressionToExpessionTest extends LuceneTestCase {
+
+  private StreamFactory factory;
+  
+  public StreamExpressionToExpessionTest() {
+    super();
+    
+    factory = new StreamFactory()
+                    .withCollectionZkHost("collection1", "testhost:1234")
+                    .withStreamFunction("search", CloudSolrStream.class)
+                    .withStreamFunction("merge", MergeStream.class)
+                    .withStreamFunction("unique", UniqueStream.class)
+                    .withStreamFunction("top", RankStream.class)
+                    .withStreamFunction("group", ReducerStream.class)
+                    ;
+  }
+  
+  
+  @Test
+  public void testCloudSolrStream() throws Exception {
+
+    CloudSolrStream stream;
+    String expressionString;
+    
+    // Basic test
+    stream = new CloudSolrStream(StreamExpressionParser.parse("search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")"), factory);
+    expressionString = stream.toExpression(factory).toString();
+    assertTrue(expressionString.contains("search(collection1,"));
+    assertTrue(expressionString.contains("q=\"*:*\""));
+    assertTrue(expressionString.contains("fl=\"id,a_s,a_i,a_f\""));
+    assertTrue(expressionString.contains("sort=\"a_f asc, a_i asc\""));
+    
+    // Basic w/aliases
+    stream = new CloudSolrStream(StreamExpressionParser.parse("search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"id=izzy,a_s=kayden\")"), factory);
+    expressionString = stream.toExpression(factory).toString();
+    assertTrue(expressionString.contains("id=izzy"));
+    assertTrue(expressionString.contains("a_s=kayden"));
+
+  }
+
+  @Test
+  public void testUniqueStream() throws Exception {
+
+    UniqueStream stream;
+    String expressionString;
+    
+    // Basic test
+    stream = new UniqueStream(StreamExpressionParser.parse("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f asc\")"), factory);
+    expressionString = stream.toExpression(factory).toString();
+    assertTrue(expressionString.contains("unique(search(collection1"));
+    assertTrue(expressionString.contains("over=\"a_f asc\""));
+  }
+  
+  @Test
+  public void testMergeStream() throws Exception {
+
+    MergeStream stream;
+    String expressionString;
+    
+    // Basic test
+    stream = new MergeStream(StreamExpressionParser.parse("merge("
+                              + "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+                              + "search(collection1, q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+                              + "on=\"a_f asc, a_s asc\")"), factory);
+    expressionString = stream.toExpression(factory).toString();
+    assertTrue(expressionString.contains("q=\"id:(0 3 4)\""));
+    assertTrue(expressionString.contains("q=\"id:(1 2)\""));
+    assertTrue(expressionString.contains("on=\"a_f asc,a_s asc\""));
+  }
+  
+  @Test
+  public void testRankStream() throws Exception {
+
+    RankStream stream;
+    String expressionString;
+    
+    // Basic test
+    stream = new RankStream(StreamExpressionParser.parse("top("
+                                              + "n=3,"
+                                              + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc,a_i asc\"),"
+                                              + "sort=\"a_f asc, a_i asc\")"), factory);
+    expressionString = stream.toExpression(factory).toString();
+    assertTrue(expressionString.contains("top(n=3,search(collection1"));
+    assertTrue(expressionString.contains("sort=\"a_f asc,a_i asc\""));
+    // find 2nd instance of sort
+    assertTrue(expressionString.substring(expressionString.indexOf("sort=\"a_f asc,a_i asc\"") + 1).contains("sort=\"a_f asc,a_i asc\""));
+  }
+
+  @Test
+  public void testReducerStream() throws Exception {
+
+    ReducerStream stream;
+    String expressionString;
+    
+    // Basic test
+    stream = new ReducerStream(StreamExpressionParser.parse("group("
+                                                  + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc, a_f asc\"),"
+                                                  + "by=\"a_s desc\")"), factory);
+    expressionString = stream.toExpression(factory).toString();
+    assertTrue(expressionString.contains("group(search(collection1"));
+    assertTrue(expressionString.contains("by=\"a_s desc\""));
+  }
+
+}

Added: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java (added)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java Mon May 11 12:37:18 2015
@@ -0,0 +1,896 @@
+package org.apache.solr.client.solrj.io.stream;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultiComp;
+import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
+import org.apache.solr.client.solrj.io.stream.MergeStream;
+import org.apache.solr.client.solrj.io.stream.ParallelStream;
+import org.apache.solr.client.solrj.io.stream.RankStream;
+import org.apache.solr.client.solrj.io.stream.ReducerStream;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.stream.UniqueStream;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
+import org.apache.solr.cloud.AbstractZkTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ *  All base tests will be done with CloudSolrStream. Under the covers CloudSolrStream uses SolrStream so
+ *  SolrStream will get fully exercised through these tests.
+ *
+ **/
+
+@Slow
+@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40","Lucene41","Lucene42","Lucene45"})
+public class StreamingTest extends AbstractFullDistribZkTestBase {
+
+  private static final String SOLR_HOME = getFile("solrj" + File.separator + "solr").getAbsolutePath();
+  private StreamFactory streamFactory;
+
+  static {
+    schemaString = "schema-streaming.xml";
+  }
+
+  @BeforeClass
+  public static void beforeSuperClass() {
+    AbstractZkTestCase.SOLRHOME = new File(SOLR_HOME());
+  }
+
+  @AfterClass
+  public static void afterSuperClass() {
+
+  }
+
+  protected String getCloudSolrConfig() {
+    return "solrconfig-streaming.xml";
+  }
+
+
+  @Override
+  public String getSolrHome() {
+    return SOLR_HOME;
+  }
+
+  public static String SOLR_HOME() {
+    return SOLR_HOME;
+  }
+
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    // we expect this time of exception as shards go up and down...
+    //ignoreException(".*");
+
+    System.setProperty("numShards", Integer.toString(sliceCount));
+  }
+
+  @Override
+  @After
+  public void tearDown() throws Exception {
+    super.tearDown();
+    resetExceptionIgnores();
+  }
+
+  public StreamingTest() {
+    super();
+    sliceCount = 2;
+
+    streamFactory = new StreamFactory()
+                    .withStreamFunction("search", CloudSolrStream.class)
+                    .withStreamFunction("merge", MergeStream.class)
+                    .withStreamFunction("unique", UniqueStream.class)
+                    .withStreamFunction("top", RankStream.class)
+                    .withStreamFunction("group", ReducerStream.class)
+                    .withStreamFunction("count", CountStream.class)
+                    ;
+  }
+
+  private void testUniqueStream() throws Exception {
+
+    //Test CloudSolrStream and UniqueStream
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+
+    commit();
+
+
+    String zkHost = zkServer.getZkAddress();
+    streamFactory.withCollectionZkHost("collection1", zkHost);
+
+    Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
+    UniqueStream ustream = new UniqueStream(stream, new FieldComparator("a_f",ComparatorOrder.ASCENDING));
+    List<Tuple> tuples = getTuples(ustream);
+    assert(tuples.size() == 4);
+    assertOrder(tuples, 0,1,3,4);
+
+    del("*:*");
+    commit();
+
+  }
+
+
+  private void testSpacesInParams() throws Exception {
+
+    String zkHost = zkServer.getZkAddress();
+    streamFactory.withCollectionZkHost("collection1", zkHost);
+
+    Map params = mapParams("q","*:*","fl","id , a_s , a_i , a_f","sort", "a_f  asc , a_i  asc");
+
+    //CloudSolrStream compares the values of the sort with the fl field.
+    //The constructor will throw an exception if the sort fields do not the
+    //a value in the field list.
+
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
+
+    del("*:*");
+    commit();
+
+  }
+
+  private void testNonePartitionKeys() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    streamFactory.withCollectionZkHost("collection1", zkHost);
+
+    Map paramsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", stream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
+
+    List<Tuple> tuples = getTuples(pstream);
+
+    assert(tuples.size() == 20); // Each tuple will be double counted.
+
+    del("*:*");
+    commit();
+
+  }
+
+
+
+
+  private void testParallelUniqueStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+    indexr(id, "5", "a_s", "hello1", "a_i", "10", "a_f", "1");
+    indexr(id, "6", "a_s", "hello1", "a_i", "11", "a_f", "5");
+    indexr(id, "7", "a_s", "hello1", "a_i", "12", "a_f", "5");
+    indexr(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    streamFactory.withCollectionZkHost("collection1", zkHost);
+
+    Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
+    UniqueStream ustream = new UniqueStream(stream, new FieldComparator("a_f",ComparatorOrder.ASCENDING));
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", ustream, 2, new FieldComparator("a_f",ComparatorOrder.ASCENDING));
+    List<Tuple> tuples = getTuples(pstream);
+    assert(tuples.size() == 5);
+    assertOrder(tuples, 0,1,3,4,6);
+
+    //Test the eofTuples
+
+    Map<String,Tuple> eofTuples = pstream.getEofTuples();
+    assert(eofTuples.size() == 2); //There should be an EOF tuple for each worker.
+
+    del("*:*");
+    commit();
+
+  }
+
+
+
+  private void testRankStream() throws Exception {
+
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    streamFactory.withCollectionZkHost("collection1", zkHost);
+
+    Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
+    RankStream rstream = new RankStream(stream, 3, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
+    List<Tuple> tuples = getTuples(rstream);
+
+
+    assert(tuples.size() == 3);
+    assertOrder(tuples, 4,3,2);
+
+    del("*:*");
+    commit();
+  }
+
+  private void testParallelRankStream() throws Exception {
+
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "5", "a_s", "hello1", "a_i", "5", "a_f", "1");
+    indexr(id, "6", "a_s", "hello1", "a_i", "6", "a_f", "1");
+    indexr(id, "7", "a_s", "hello1", "a_i", "7", "a_f", "1");
+    indexr(id, "8", "a_s", "hello1", "a_i", "8", "a_f", "1");
+    indexr(id, "9", "a_s", "hello1", "a_i", "9", "a_f", "1");
+    indexr(id, "10", "a_s", "hello1", "a_i", "10", "a_f", "1");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    streamFactory.withCollectionZkHost("collection1", zkHost);
+
+    Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
+    RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
+    List<Tuple> tuples = getTuples(pstream);
+
+    assert(tuples.size() == 10);
+    assertOrder(tuples, 10,9,8,7,6,5,4,3,2,0);
+
+    del("*:*");
+    commit();
+  }
+
+  private void testTrace() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    streamFactory.withCollectionZkHost("collection1", zkHost);
+
+    //Test with spaces in the parameter lists.
+    Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i,  a_f","sort", "a_s asc  ,  a_f   asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    stream.setTrace(true);
+    List<Tuple> tuples = getTuples(stream);
+    assert(tuples.get(0).get("_COLLECTION_").equals("collection1"));
+    assert(tuples.get(1).get("_COLLECTION_").equals("collection1"));
+    assert(tuples.get(2).get("_COLLECTION_").equals("collection1"));
+    assert(tuples.get(3).get("_COLLECTION_").equals("collection1"));
+
+    del("*:*");
+    commit();
+  }
+
+
+
+
+  private void testReducerStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    streamFactory.withCollectionZkHost("collection1", zkHost);
+
+    //Test with spaces in the parameter lists.
+    Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i,  a_f","sort", "a_s asc  ,  a_f   asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
+
+    List<Tuple> tuples = getTuples(rstream);
+
+    assert(tuples.size() == 3);
+    assertOrder(tuples, 0,3,4);
+
+    Tuple t0 = tuples.get(0);
+    List<Map> maps0 = t0.getMaps();
+    assertMaps(maps0, 0, 2, 1, 9);
+
+    Tuple t1 = tuples.get(1);
+    List<Map> maps1 = t1.getMaps();
+    assertMaps(maps1, 3, 5, 7, 8);
+
+    Tuple t2 = tuples.get(2);
+    List<Map> maps2 = t2.getMaps();
+    assertMaps(maps2, 4, 6);
+
+
+
+    del("*:*");
+    commit();
+  }
+
+  private void testZeroReducerStream() throws Exception {
+
+    //Gracefully handle zero results
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    streamFactory.withCollectionZkHost("collection1", zkHost);
+
+    //Test with spaces in the parameter lists.
+    Map paramsA = mapParams("q", "blah", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+
+    List<Tuple> tuples = getTuples(rstream);
+
+    assert(tuples.size() == 0);
+
+    del("*:*");
+    commit();
+  }
+
+
+  private void testParallelReducerStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    streamFactory.withCollectionZkHost("collection1", zkHost);
+
+    Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
+
+    List<Tuple> tuples = getTuples(pstream);
+
+    assert(tuples.size() == 3);
+    assertOrder(tuples, 0,3,4);
+
+    Tuple t0 = tuples.get(0);
+    List<Map> maps0 = t0.getMaps();
+    assertMaps(maps0, 0, 2, 1, 9);
+
+    Tuple t1 = tuples.get(1);
+    List<Map> maps1 = t1.getMaps();
+    assertMaps(maps1, 3, 5, 7, 8);
+
+    Tuple t2 = tuples.get(2);
+    List<Map> maps2 = t2.getMaps();
+    assertMaps(maps2, 4, 6);
+
+    //Test Descending with Ascending subsort
+
+    paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
+    stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.DESCENDING));
+    pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.DESCENDING));
+
+    tuples = getTuples(pstream);
+
+    assert(tuples.size() == 3);
+    assertOrder(tuples, 4,3,0);
+
+    t0 = tuples.get(0);
+    maps0 = t0.getMaps();
+    assertMaps(maps0, 4, 6);
+
+
+    t1 = tuples.get(1);
+    maps1 = t1.getMaps();
+    assertMaps(maps1, 3, 5, 7, 8);
+
+
+    t2 = tuples.get(2);
+    maps2 = t2.getMaps();
+    assertMaps(maps2, 0, 2, 1, 9);
+
+
+
+    del("*:*");
+    commit();
+  }
+
+  private void testZeroParallelReducerStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    streamFactory.withCollectionZkHost("collection1", zkHost);
+
+    Map paramsA = mapParams("q","blah","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+
+    List<Tuple> tuples = getTuples(pstream);
+    assert(tuples.size() == 0);
+    del("*:*");
+    commit();
+  }
+
+
+  private void testTuple() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "5.1", "s_multi", "a", "s_multi", "b", "i_multi", "1", "i_multi", "2", "f_multi", "1.2", "f_multi", "1.3");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    streamFactory.withCollectionZkHost("collection1", zkHost);
+
+    Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f,s_multi,i_multi,f_multi","sort", "a_s asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
+    List<Tuple> tuples = getTuples(stream);
+    Tuple tuple = tuples.get(0);
+
+    String s = tuple.getString("a_s");
+    assert(s.equals("hello0")) ;
+
+    long l = tuple.getLong("a_i");
+    assert(l == 0);
+
+    double d = tuple.getDouble("a_f");
+    assert(d == 5.1);
+
+
+    List<String> stringList = tuple.getStrings("s_multi");
+    assert(stringList.get(0).equals("a"));
+    assert(stringList.get(1).equals("b"));
+
+    List<Long> longList = tuple.getLongs("i_multi");
+    assert(longList.get(0).longValue() == 1);
+    assert(longList.get(1).longValue() == 2);
+
+    List<Double> doubleList = tuple.getDoubles("f_multi");
+    assert(doubleList.get(0).doubleValue() == 1.2);
+    assert(doubleList.get(1).doubleValue() == 1.3);
+
+    del("*:*");
+    commit();
+  }
+
+  private void testMergeStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    streamFactory.withCollectionZkHost("collection1", zkHost);
+
+    //Test ascending
+    Map paramsA = mapParams("q","id:(4 1)","fl","id,a_s,a_i","sort", "a_i asc");
+    CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+    Map paramsB = mapParams("q","id:(0 2 3)","fl","id,a_s,a_i","sort", "a_i asc");
+    CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+
+    MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
+    List<Tuple> tuples = getTuples(mstream);
+
+    assert(tuples.size() == 5);
+    assertOrder(tuples, 0,1,2,3,4);
+
+    //Test descending
+    paramsA = mapParams("q","id:(4 1)","fl","id,a_s,a_i","sort", "a_i desc");
+    streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+    paramsB = mapParams("q","id:(0 2 3)","fl","id,a_s,a_i","sort", "a_i desc");
+    streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+
+    mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
+    tuples = getTuples(mstream);
+
+    assert(tuples.size() == 5);
+    assertOrder(tuples, 4,3,2,1,0);
+
+    //Test compound sort
+
+    paramsA = mapParams("q","id:(2 4 1)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
+    streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+    paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
+    streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+
+    mstream = new MergeStream(streamA, streamB, new MultiComp(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.ASCENDING)));
+    tuples = getTuples(mstream);
+
+    assert(tuples.size() == 5);
+    assertOrder(tuples, 0,2,1,3,4);
+
+    paramsA = mapParams("q","id:(2 4 1)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
+    streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+    paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
+    streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+
+    mstream = new MergeStream(streamA, streamB, new MultiComp(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.DESCENDING)));
+    tuples = getTuples(mstream);
+
+    assert(tuples.size() == 5);
+    assertOrder(tuples, 2,0,1,3,4);
+
+    del("*:*");
+    commit();
+  }
+
+
+  private void testParallelMergeStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+    indexr(id, "5", "a_s", "hello0", "a_i", "10", "a_f", "0");
+    indexr(id, "6", "a_s", "hello2", "a_i", "8", "a_f", "0");
+    indexr(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "3");
+    indexr(id, "8", "a_s", "hello4", "a_i", "11", "a_f", "4");
+    indexr(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    streamFactory.withCollectionZkHost("collection1", zkHost);
+
+    //Test ascending
+    Map paramsA = mapParams("q","id:(4 1 8 7 9)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
+    CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+    Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
+    CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+
+    MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
+    List<Tuple> tuples = getTuples(pstream);
+
+    assert(tuples.size() == 9);
+    assertOrder(tuples, 0,1,2,3,4,7,6,8,9);
+
+    //Test descending
+    paramsA = mapParams("q","id:(4 1 8 9)","fl","id,a_s,a_i","sort", "a_i desc", "partitionKeys", "a_i");
+    streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+    paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i desc", "partitionKeys", "a_i");
+    streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+
+    mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
+    pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
+    tuples = getTuples(pstream);
+
+    assert(tuples.size() == 8);
+    assertOrder(tuples, 9,8,6,4,3,2,1,0);
+
+    del("*:*");
+    commit();
+  }
+
+  private void testParallelEOF() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+    indexr(id, "5", "a_s", "hello0", "a_i", "10", "a_f", "0");
+    indexr(id, "6", "a_s", "hello2", "a_i", "8", "a_f", "0");
+    indexr(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "3");
+    indexr(id, "8", "a_s", "hello4", "a_i", "11", "a_f", "4");
+    indexr(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    streamFactory.withCollectionZkHost("collection1", zkHost);
+
+    //Test ascending
+    Map paramsA = mapParams("q","id:(4 1 8 7 9)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
+    CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+    Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
+    CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+
+    MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
+    CountStream cstream = new CountStream(mstream);
+    ParallelStream pstream = new ParallelStream(zkHost, "collection1", cstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
+    List<Tuple> tuples = getTuples(pstream);
+
+    assert(tuples.size() == 9);
+    Map<String, Tuple> eofTuples = pstream.getEofTuples();
+    assert(eofTuples.size() == 2); // There should be an EOF Tuple for each worker.
+
+    long totalCount = 0;
+
+    Iterator<Tuple> it = eofTuples.values().iterator();
+    while(it.hasNext()) {
+      Tuple t = it.next();
+      totalCount += t.getLong("count");
+    }
+
+    assert(tuples.size() == totalCount);
+
+    del("*:*");
+    commit();
+  }
+
+
+
+  @Test
+  public void streamTests() throws Exception {
+    assertNotNull(cloudClient);
+
+    handle.clear();
+    handle.put("timestamp", SKIPVAL);
+
+    waitForThingsToLevelOut(30);
+
+    del("*:*");
+
+    commit();
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    streamFactory.withCollectionZkHost("collection1", zkHost);
+    Map params = null;
+
+    //Basic CloudSolrStream Test with Descending Sort
+
+    params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i desc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
+    List<Tuple> tuples = getTuples(stream);
+
+    assert(tuples.size() == 5);
+    assertOrder(tuples, 4, 3, 2, 1, 0);
+
+    //With Ascending Sort
+    params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
+    stream = new CloudSolrStream(zkHost, "collection1", params);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 5);
+    assertOrder(tuples, 0,1,2,3,4);
+
+
+    //Test compound sort
+    params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
+    stream = new CloudSolrStream(zkHost, "collection1", params);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 5);
+    assertOrder(tuples, 2,0,1,3,4);
+
+
+    params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
+    stream = new CloudSolrStream(zkHost, "collection1", params);
+    tuples = getTuples(stream);
+
+    assert(tuples.size() == 5);
+    assertOrder(tuples, 0,2,1,3,4);
+
+    del("*:*");
+    commit();
+
+    testTuple();
+    testSpacesInParams();
+    testNonePartitionKeys();
+    testTrace();
+    testUniqueStream();
+    testRankStream();
+    testMergeStream();
+    testReducerStream();
+    testZeroReducerStream();
+    testParallelEOF();
+    testParallelUniqueStream();
+    testParallelRankStream();
+    testParallelMergeStream();
+    testParallelReducerStream();
+    testZeroParallelReducerStream();
+  }
+
+  protected Map mapParams(String... vals) {
+    Map params = new HashMap();
+    String k = null;
+    for(String val : vals) {
+      if(k == null) {
+        k = val;
+      } else {
+        params.put(k, val);
+        k = null;
+      }
+    }
+
+    return params;
+  }
+
+  protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
+    tupleStream.open();
+    List<Tuple> tuples = new ArrayList();
+    for(;;) {
+      Tuple t = tupleStream.read();
+      if(t.EOF) {
+        break;
+      } else {
+        tuples.add(t);
+      }
+    }
+    tupleStream.close();
+    return tuples;
+  }
+
+  protected boolean assertOrder(List<Tuple> tuples, int... ids) throws Exception {
+    int i = 0;
+    for(int val : ids) {
+      Tuple t = tuples.get(i);
+      Long tip = (Long)t.get("id");
+      if(tip.intValue() != val) {
+        throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+      }
+      ++i;
+    }
+    return true;
+  }
+
+  protected boolean assertGroupOrder(Tuple tuple, int... ids) throws Exception {
+    List group = (List)tuple.get("tuples");
+    int i=0;
+    for(int val : ids) {
+      Map t = (Map)group.get(i);
+      Long tip = (Long)t.get("id");
+      if(tip.intValue() != val) {
+        throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+      }
+      ++i;
+    }
+    return true;
+  }
+
+  protected boolean assertMaps(List<Map> maps, int... ids) throws Exception {
+    if(maps.size() != ids.length) {
+      throw new Exception("Expected id count != actual map count:"+ids.length+":"+maps.size());
+    }
+
+    int i=0;
+    for(int val : ids) {
+      Map t = maps.get(i);
+      Long tip = (Long)t.get("id");
+      if(tip.intValue() != val) {
+        throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+      }
+      ++i;
+    }
+    return true;
+  }
+
+  public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exception {
+    long lv = (long)tuple.get(fieldName);
+    if(lv != l) {
+      throw new Exception("Longs not equal:"+l+" : "+lv);
+    }
+
+    return true;
+  }
+
+  @Override
+  protected void indexr(Object... fields) throws Exception {
+    SolrInputDocument doc = getDoc(fields);
+    indexDoc(doc);
+  }
+}

Added: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParserTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParserTest.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParserTest.java (added)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParserTest.java Mon May 11 12:37:18 2015
@@ -0,0 +1,103 @@
+package org.apache.solr.client.solrj.io.stream.expr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.junit.Test;
+
+/**
+ **/
+
+public class StreamExpressionParserTest extends LuceneTestCase {
+
+  public StreamExpressionParserTest() {
+    super();
+  }
+  
+  
+  @Test
+  public void testParsing() throws Exception{
+    StreamExpression actual, expected;
+        
+    actual = StreamExpressionParser.parse("aliases(a_i=alias.a_i)");
+    expected = new StreamExpression("aliases")
+                    .withParameter(new StreamExpressionNamedParameter("a_i", "alias.a_i"));
+    assertEquals(expected,actual);
+    
+    actual = StreamExpressionParser.parse("search(a,b)");
+    expected = new StreamExpression("search").withParameter("a").withParameter("b");
+    assertEquals(expected, actual);
+
+    actual = StreamExpressionParser.parse("search(collection1, q=*:*, sort=\"fieldA desc, fieldB asc, fieldC asc\")");
+    expected = new StreamExpression("search")
+                    .withParameter(new StreamExpressionValue("collection1"))
+                    .withParameter(new StreamExpressionNamedParameter("q").withParameter("*:*"))
+                    .withParameter(new StreamExpressionNamedParameter("sort").withParameter("fieldA desc, fieldB asc, fieldC asc"));
+    assertEquals(expected,actual);
+    
+    actual = StreamExpressionParser.parse("unique(search(collection1, q=*:*, sort=\"fieldA desc, fieldB asc, fieldC asc\"))");
+    expected = new StreamExpression("unique")
+                .withParameter(new StreamExpression("search")
+                    .withParameter(new StreamExpressionValue("collection1"))
+                    .withParameter(new StreamExpressionNamedParameter("q").withParameter("*:*"))
+                    .withParameter(new StreamExpressionNamedParameter("sort").withParameter("fieldA desc, fieldB asc, fieldC asc"))
+                );
+    assertEquals(expected,actual);
+    
+    actual = StreamExpressionParser.parse("unique(search(collection1, q=*:*, sort=\"fieldA desc, fieldB asc, fieldC asc\"), alt=search(collection1, foo=bar))");
+    expected = new StreamExpression("unique")
+                .withParameter(new StreamExpression("search")
+                    .withParameter(new StreamExpressionValue("collection1"))
+                    .withParameter(new StreamExpressionNamedParameter("q").withParameter("*:*"))
+                    .withParameter(new StreamExpressionNamedParameter("sort").withParameter("fieldA desc, fieldB asc, fieldC asc")))
+                .withParameter(new StreamExpressionNamedParameter("alt")
+                    .withParameter(new StreamExpression("search")
+                      .withParameter("collection1")
+                      .withParameter(new StreamExpressionNamedParameter("foo")
+                        .withParameter("bar"))));
+    assertEquals(expected,actual);
+    
+    actual = StreamExpressionParser.parse("innerJoin("
+                                + "left=search(collection1, q=*:*, fl=\"fieldA,fieldB,fieldC\", sort=\"fieldA asc, fieldB asc\"),"
+                                + "right=search(collection2, q=*:*, fl=\"fieldA,fieldD\", sort=fieldA asc),"
+                                + "on(equals(fieldA), notEquals(fieldC,fieldD))"
+                                + ")");
+    expected = new StreamExpression("innerJoin")
+                .withParameter(new StreamExpressionNamedParameter("left")
+                  .withParameter(new StreamExpression("search")
+                    .withParameter("collection1")
+                    .withParameter(new StreamExpressionNamedParameter("q","*:*"))
+                    .withParameter(new StreamExpressionNamedParameter("fl","fieldA,fieldB,fieldC"))
+                    .withParameter(new StreamExpressionNamedParameter("sort","fieldA asc, fieldB asc"))))
+                .withParameter(new StreamExpressionNamedParameter("right")
+                  .withParameter(new StreamExpression("search")
+                    .withParameter("collection2")
+                    .withParameter(new StreamExpressionNamedParameter("q","*:*"))
+                    .withParameter(new StreamExpressionNamedParameter("fl","fieldA,fieldD"))
+                    .withParameter(new StreamExpressionNamedParameter("sort","fieldA asc"))))
+                .withParameter(new StreamExpression("on")
+                  .withParameter(new StreamExpression("equals").withParameter("fieldA"))
+                  .withParameter(new StreamExpression("notEquals").withParameter("fieldC").withParameter("fieldD")));
+    assertEquals(expected,actual);    
+  }
+  
+}