You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/07/22 20:55:00 UTC

svn commit: r1692319 - in /lucene/dev/branches/branch_5x: ./ lucene/ solr/ solr/core/ solr/core/src/java/org/apache/solr/handler/ solr/core/src/test-files/solr/collection1/conf/ solr/core/src/test/org/apache/solr/handler/ solr/licenses/ solr/solrj/ sol...

Author: jbernste
Date: Wed Jul 22 18:54:59 2015
New Revision: 1692319

URL: http://svn.apache.org/r1692319
Log:
SOLR-7560: Parallel SQL Support

Added:
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
      - copied unchanged from r1685497, lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
    lucene/dev/branches/branch_5x/solr/core/src/test-files/solr/collection1/conf/schema-sql.xml
      - copied unchanged from r1685497, lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/schema-sql.xml
    lucene/dev/branches/branch_5x/solr/core/src/test-files/solr/collection1/conf/solrconfig-sql.xml
      - copied unchanged from r1685497, lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-sql.xml
    lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
      - copied unchanged from r1685497, lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
    lucene/dev/branches/branch_5x/solr/licenses/antlr4-runtime-4.5.jar.sha1
      - copied unchanged from r1685497, lucene/dev/trunk/solr/licenses/antlr4-runtime-4.5.jar.sha1
    lucene/dev/branches/branch_5x/solr/licenses/antlr4-runtime-LICENSE-BSD.txt
      - copied unchanged from r1685497, lucene/dev/trunk/solr/licenses/antlr4-runtime-LICENSE-BSD.txt
    lucene/dev/branches/branch_5x/solr/licenses/antlr4-runtime-NOTICE.txt
      - copied unchanged from r1685497, lucene/dev/trunk/solr/licenses/antlr4-runtime-NOTICE.txt
    lucene/dev/branches/branch_5x/solr/licenses/presto-parser-0.108.jar.sha1
      - copied unchanged from r1687603, lucene/dev/trunk/solr/licenses/presto-parser-0.108.jar.sha1
    lucene/dev/branches/branch_5x/solr/licenses/presto-parser-LICENSE-ASL.txt
      - copied unchanged from r1685497, lucene/dev/trunk/solr/licenses/presto-parser-LICENSE-ASL.txt
    lucene/dev/branches/branch_5x/solr/licenses/presto-parser-NOTICE.txt
      - copied unchanged from r1685497, lucene/dev/trunk/solr/licenses/presto-parser-NOTICE.txt
    lucene/dev/branches/branch_5x/solr/licenses/slice-0.10.jar.sha1
      - copied unchanged from r1685497, lucene/dev/trunk/solr/licenses/slice-0.10.jar.sha1
    lucene/dev/branches/branch_5x/solr/licenses/slice-LICENSE-ASL.txt
      - copied unchanged from r1685497, lucene/dev/trunk/solr/licenses/slice-LICENSE-ASL.txt
    lucene/dev/branches/branch_5x/solr/licenses/slice-NOTICE.txt
      - copied unchanged from r1685497, lucene/dev/trunk/solr/licenses/slice-NOTICE.txt
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/HashKey.java
      - copied unchanged from r1685497, lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/HashKey.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java
      - copied unchanged from r1685497, lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/
      - copied from r1685497, lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/
Modified:
    lucene/dev/branches/branch_5x/   (props changed)
    lucene/dev/branches/branch_5x/lucene/   (props changed)
    lucene/dev/branches/branch_5x/lucene/ivy-versions.properties
    lucene/dev/branches/branch_5x/solr/   (props changed)
    lucene/dev/branches/branch_5x/solr/core/   (props changed)
    lucene/dev/branches/branch_5x/solr/core/ivy.xml
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
    lucene/dev/branches/branch_5x/solr/licenses/   (props changed)
    lucene/dev/branches/branch_5x/solr/solrj/   (props changed)
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
    lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java

Modified: lucene/dev/branches/branch_5x/lucene/ivy-versions.properties
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/lucene/ivy-versions.properties?rev=1692319&r1=1692318&r2=1692319&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/lucene/ivy-versions.properties (original)
+++ lucene/dev/branches/branch_5x/lucene/ivy-versions.properties Wed Jul 22 18:54:59 2015
@@ -20,6 +20,8 @@ com.codahale.metrics.version = 3.0.1
 /com.cybozu.labs/langdetect = 1.1-20120112
 /com.drewnoakes/metadata-extractor = 2.6.2
 
+/com.facebook.presto/presto-parser = 0.108
+
 com.fasterxml.jackson.core.version = 2.5.4
 /com.fasterxml.jackson.core/jackson-annotations = ${com.fasterxml.jackson.core.version}
 /com.fasterxml.jackson.core/jackson-core = ${com.fasterxml.jackson.core.version}
@@ -68,6 +70,7 @@ com.sun.jersey.version = 1.9
 /de.l3s.boilerpipe/boilerpipe = 1.1.0
 /dom4j/dom4j = 1.6.1
 /hsqldb/hsqldb = 1.8.0.10
+/io.airlift/slice = 0.10
 /io.netty/netty = 3.7.0.Final
 /jakarta-regexp/jakarta-regexp = 1.4
 /javax.activation/activation = 1.1.1
@@ -87,6 +90,7 @@ com.sun.jersey.version = 1.9
 /net.sourceforge.jmatio/jmatio = 1.0
 /net.sourceforge.nekohtml/nekohtml = 1.9.17
 /org.antlr/antlr-runtime = 3.5
+/org.antlr/antlr4-runtime = 4.5
 /org.apache.ant/ant = 1.8.2
 /org.apache.avro/avro = 1.7.5
 /org.apache.commons/commons-compress = 1.8.1

Modified: lucene/dev/branches/branch_5x/solr/core/ivy.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/ivy.xml?rev=1692319&r1=1692318&r2=1692319&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/ivy.xml (original)
+++ lucene/dev/branches/branch_5x/solr/core/ivy.xml Wed Jul 22 18:54:59 2015
@@ -129,6 +129,10 @@
     <!-- StatsComponents percentiles Dependencies-->
     <dependency org="com.tdunning" name="t-digest" rev="${/com.tdunning/t-digest}" conf="compile->*"/>
 
+    <dependency org="com.facebook.presto" name="presto-parser" rev="${/com.facebook.presto/presto-parser}"/>
+    <dependency org="org.antlr" name="antlr4-runtime" rev="${/org.antlr/antlr4-runtime}"/>
+    <dependency org="io.airlift" name="slice" rev="${/io.airlift/slice}"/>
+
     <exclude org="*" ext="*" matcher="regexp" type="${ivy.exclude.types}"/> 
   </dependencies>
 </ivy-module>

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java?rev=1692319&r1=1692318&r2=1692319&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java Wed Jul 22 18:54:59 2015
@@ -45,7 +45,7 @@ import org.apache.solr.common.util.Base6
 
 public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
 
-  private SolrClientCache clientCache = new SolrClientCache();
+  static SolrClientCache clientCache = new SolrClientCache();
   private StreamFactory streamFactory = new StreamFactory();
   
   public void inform(SolrCore core) {

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java?rev=1692319&r1=1692318&r2=1692319&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java Wed Jul 22 18:54:59 2015
@@ -58,15 +58,27 @@ public class Tuple implements Cloneable
   }
 
   public String getString(Object key) {
-    return (String)this.fields.get(key);
+    return this.fields.get(key).toString();
   }
 
   public Long getLong(Object key) {
-    return (Long)this.fields.get(key);
+    Object o = this.fields.get(key);
+    if(o instanceof Long) {
+      return (Long)o;
+    } else {
+      //Attempt to parse the long
+      return Long.parseLong(o.toString());
+    }
   }
 
   public Double getDouble(Object key) {
-    return (Double)this.fields.get(key);
+    Object o = this.fields.get(key);
+    if(o instanceof Double) {
+      return (Double)o;
+    } else {
+      //Attempt to parse the double
+      return Double.parseDouble(o.toString());
+    }
   }
 
   public List<String> getStrings(Object key) {

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java?rev=1692319&r1=1692318&r2=1692319&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java Wed Jul 22 18:54:59 2015
@@ -39,6 +39,13 @@ import org.apache.solr.client.solrj.io.s
 import org.apache.solr.client.solrj.io.stream.TupleStream;
 import org.apache.solr.client.solrj.io.stream.UniqueStream;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
+import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.Metric;
+import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
 import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
 import org.apache.solr.cloud.AbstractZkTestCase;
 import org.apache.solr.common.SolrInputDocument;
@@ -486,6 +493,241 @@ public class StreamingTest extends Abstr
     commit();
   }
 
+  private void testRollupStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    streamFactory.withCollectionZkHost("collection1", zkHost);
+
+    Map paramsA = mapParams("q","*:*","fl","a_s,a_i,a_f","sort", "a_s asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+    Bucket[] buckets =  {new Bucket("a_s")};
+
+    Metric[] metrics = {new SumMetric("a_i"),
+                        new SumMetric("a_f"),
+                        new MinMetric("a_i"),
+                        new MinMetric("a_f"),
+                        new MaxMetric("a_i"),
+                        new MaxMetric("a_f"),
+                        new MeanMetric("a_i"),
+                        new MeanMetric("a_f"),
+                        new CountMetric()};
+
+    RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
+    List<Tuple> tuples = getTuples(rollupStream);
+
+    assert(tuples.size() == 3);
+
+    //Test Long and Double Sums
+
+    Tuple tuple = tuples.get(0);
+    String bucket = tuple.getString("a_s");
+    Double sumi = tuple.getDouble("sum(a_i)");
+    Double sumf = tuple.getDouble("sum(a_f)");
+    Double mini = tuple.getDouble("min(a_i)");
+    Double minf = tuple.getDouble("min(a_f)");
+    Double maxi = tuple.getDouble("max(a_i)");
+    Double maxf = tuple.getDouble("max(a_f)");
+    Double avgi = tuple.getDouble("avg(a_i)");
+    Double avgf = tuple.getDouble("avg(a_f)");
+    Double count = tuple.getDouble("count(*)");
+
+
+    assertTrue(bucket.equals("hello0"));
+    assertTrue(sumi.doubleValue() == 17.0D);
+    assertTrue(sumf.doubleValue() == 18.0D);
+    assertTrue(mini.doubleValue() == 0.0D);
+    assertTrue(minf.doubleValue() == 1.0D);
+    assertTrue(maxi.doubleValue() == 14.0D);
+    assertTrue(maxf.doubleValue() == 10.0D);
+    assertTrue(avgi.doubleValue() == 4.25D);
+    assertTrue(avgf.doubleValue() == 4.5D);
+    assertTrue(count.doubleValue() == 4);
+
+
+    tuple = tuples.get(1);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket.equals("hello3"));
+    assertTrue(sumi.doubleValue() == 38.0D);
+    assertTrue(sumf.doubleValue() == 26.0D);
+    assertTrue(mini.doubleValue() == 3.0D);
+    assertTrue(minf.doubleValue() == 3.0D);
+    assertTrue(maxi.doubleValue() == 13.0D);
+    assertTrue(maxf.doubleValue() == 9.0D);
+    assertTrue(avgi.doubleValue() == 9.5D);
+    assertTrue(avgf.doubleValue() == 6.5D);
+    assertTrue(count.doubleValue() == 4);
+
+
+    tuple = tuples.get(2);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket.equals("hello4"));
+    assertTrue(sumi.longValue() == 15);
+    assertTrue(sumf.doubleValue() == 11.0D);
+    assertTrue(mini.doubleValue() == 4.0D);
+    assertTrue(minf.doubleValue() == 4.0D);
+    assertTrue(maxi.doubleValue() == 11.0D);
+    assertTrue(maxf.doubleValue() == 7.0D);
+    assertTrue(avgi.doubleValue() == 7.5D);
+    assertTrue(avgf.doubleValue() == 5.5D);
+    assertTrue(count.doubleValue() == 2);
+
+
+
+    del("*:*");
+    commit();
+  }
+
+
+
+
+  private void testParallelRollupStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
+
+    commit();
+
+    String zkHost = zkServer.getZkAddress();
+    streamFactory.withCollectionZkHost("collection1", zkHost);
+
+    Map paramsA = mapParams("q","*:*","fl","a_s,a_i,a_f","sort", "a_s asc", "partitionKeys", "a_s");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+    Bucket[] buckets =  {new Bucket("a_s")};
+
+    Metric[] metrics = {new SumMetric("a_i"),
+                        new SumMetric("a_f"),
+                        new MinMetric("a_i"),
+                        new MinMetric("a_f"),
+                        new MaxMetric("a_i"),
+                        new MaxMetric("a_f"),
+                        new MeanMetric("a_i"),
+                        new MeanMetric("a_f"),
+                        new CountMetric()};
+
+    RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
+    ParallelStream parallelStream = new ParallelStream(zkHost, "collection1", rollupStream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+    List<Tuple> tuples = getTuples(parallelStream);
+
+    assert(tuples.size() == 3);
+
+    //Test Long and Double Sums
+
+    Tuple tuple = tuples.get(0);
+    String bucket = tuple.getString("a_s");
+    Double sumi = tuple.getDouble("sum(a_i)");
+    Double sumf = tuple.getDouble("sum(a_f)");
+    Double mini = tuple.getDouble("min(a_i)");
+    Double minf = tuple.getDouble("min(a_f)");
+    Double maxi = tuple.getDouble("max(a_i)");
+    Double maxf = tuple.getDouble("max(a_f)");
+    Double avgi = tuple.getDouble("avg(a_i)");
+    Double avgf = tuple.getDouble("avg(a_f)");
+    Double count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket.equals("hello0"));
+    assertTrue(sumi.doubleValue() == 17.0D);
+    assertTrue(sumf.doubleValue() == 18.0D);
+    assertTrue(mini.doubleValue() == 0.0D);
+    assertTrue(minf.doubleValue() == 1.0D);
+    assertTrue(maxi.doubleValue() == 14.0D);
+    assertTrue(maxf.doubleValue() == 10.0D);
+    assertTrue(avgi.doubleValue() == 4.25D);
+    assertTrue(avgf.doubleValue() == 4.5D);
+    assertTrue(count.doubleValue() == 4);
+
+    tuple = tuples.get(1);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket.equals("hello3"));
+    assertTrue(sumi.doubleValue() == 38.0D);
+    assertTrue(sumf.doubleValue() == 26.0D);
+    assertTrue(mini.doubleValue() == 3.0D);
+    assertTrue(minf.doubleValue() == 3.0D);
+    assertTrue(maxi.doubleValue() == 13.0D);
+    assertTrue(maxf.doubleValue() == 9.0D);
+    assertTrue(avgi.doubleValue() == 9.5D);
+    assertTrue(avgf.doubleValue() == 6.5D);
+    assertTrue(count.doubleValue() == 4);
+
+    tuple = tuples.get(2);
+    bucket = tuple.getString("a_s");
+    sumi = tuple.getDouble("sum(a_i)");
+    sumf = tuple.getDouble("sum(a_f)");
+    mini = tuple.getDouble("min(a_i)");
+    minf = tuple.getDouble("min(a_f)");
+    maxi = tuple.getDouble("max(a_i)");
+    maxf = tuple.getDouble("max(a_f)");
+    avgi = tuple.getDouble("avg(a_i)");
+    avgf = tuple.getDouble("avg(a_f)");
+    count = tuple.getDouble("count(*)");
+
+    assertTrue(bucket.equals("hello4"));
+    assertTrue(sumi.longValue() == 15);
+    assertTrue(sumf.doubleValue() == 11.0D);
+    assertTrue(mini.doubleValue() == 4.0D);
+    assertTrue(minf.doubleValue() == 4.0D);
+    assertTrue(maxi.doubleValue() == 11.0D);
+    assertTrue(maxf.doubleValue() == 7.0D);
+    assertTrue(avgi.doubleValue() == 7.5D);
+    assertTrue(avgf.doubleValue() == 5.5D);
+    assertTrue(count.doubleValue() == 2);
+
+    del("*:*");
+    commit();
+  }
+
   private void testZeroParallelReducerStream() throws Exception {
 
     indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
@@ -782,8 +1024,8 @@ public class StreamingTest extends Abstr
     stream = new CloudSolrStream(zkHost, "collection1", params);
     tuples = getTuples(stream);
 
-    assert(tuples.size() == 5);
-    assertOrder(tuples, 0,2,1,3,4);
+    assert (tuples.size() == 5);
+    assertOrder(tuples, 0, 2, 1, 3, 4);
 
     del("*:*");
     commit();
@@ -796,11 +1038,13 @@ public class StreamingTest extends Abstr
     testRankStream();
     testMergeStream();
     testReducerStream();
+    testRollupStream();
     testZeroReducerStream();
     testParallelEOF();
     testParallelUniqueStream();
     testParallelRankStream();
     testParallelMergeStream();
+    testParallelRollupStream();
     testParallelReducerStream();
     testZeroParallelReducerStream();
   }