You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/07/22 20:55:00 UTC
svn commit: r1692319 - in /lucene/dev/branches/branch_5x: ./ lucene/ solr/
solr/core/ solr/core/src/java/org/apache/solr/handler/
solr/core/src/test-files/solr/collection1/conf/
solr/core/src/test/org/apache/solr/handler/ solr/licenses/ solr/solrj/ sol...
Author: jbernste
Date: Wed Jul 22 18:54:59 2015
New Revision: 1692319
URL: http://svn.apache.org/r1692319
Log:
SOLR-7560: Parallel SQL Support
Added:
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
- copied unchanged from r1685497, lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
lucene/dev/branches/branch_5x/solr/core/src/test-files/solr/collection1/conf/schema-sql.xml
- copied unchanged from r1685497, lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/schema-sql.xml
lucene/dev/branches/branch_5x/solr/core/src/test-files/solr/collection1/conf/solrconfig-sql.xml
- copied unchanged from r1685497, lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-sql.xml
lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
- copied unchanged from r1685497, lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
lucene/dev/branches/branch_5x/solr/licenses/antlr4-runtime-4.5.jar.sha1
- copied unchanged from r1685497, lucene/dev/trunk/solr/licenses/antlr4-runtime-4.5.jar.sha1
lucene/dev/branches/branch_5x/solr/licenses/antlr4-runtime-LICENSE-BSD.txt
- copied unchanged from r1685497, lucene/dev/trunk/solr/licenses/antlr4-runtime-LICENSE-BSD.txt
lucene/dev/branches/branch_5x/solr/licenses/antlr4-runtime-NOTICE.txt
- copied unchanged from r1685497, lucene/dev/trunk/solr/licenses/antlr4-runtime-NOTICE.txt
lucene/dev/branches/branch_5x/solr/licenses/presto-parser-0.108.jar.sha1
- copied unchanged from r1687603, lucene/dev/trunk/solr/licenses/presto-parser-0.108.jar.sha1
lucene/dev/branches/branch_5x/solr/licenses/presto-parser-LICENSE-ASL.txt
- copied unchanged from r1685497, lucene/dev/trunk/solr/licenses/presto-parser-LICENSE-ASL.txt
lucene/dev/branches/branch_5x/solr/licenses/presto-parser-NOTICE.txt
- copied unchanged from r1685497, lucene/dev/trunk/solr/licenses/presto-parser-NOTICE.txt
lucene/dev/branches/branch_5x/solr/licenses/slice-0.10.jar.sha1
- copied unchanged from r1685497, lucene/dev/trunk/solr/licenses/slice-0.10.jar.sha1
lucene/dev/branches/branch_5x/solr/licenses/slice-LICENSE-ASL.txt
- copied unchanged from r1685497, lucene/dev/trunk/solr/licenses/slice-LICENSE-ASL.txt
lucene/dev/branches/branch_5x/solr/licenses/slice-NOTICE.txt
- copied unchanged from r1685497, lucene/dev/trunk/solr/licenses/slice-NOTICE.txt
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/HashKey.java
- copied unchanged from r1685497, lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/HashKey.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java
- copied unchanged from r1685497, lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/
- copied from r1685497, lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/
Modified:
lucene/dev/branches/branch_5x/ (props changed)
lucene/dev/branches/branch_5x/lucene/ (props changed)
lucene/dev/branches/branch_5x/lucene/ivy-versions.properties
lucene/dev/branches/branch_5x/solr/ (props changed)
lucene/dev/branches/branch_5x/solr/core/ (props changed)
lucene/dev/branches/branch_5x/solr/core/ivy.xml
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
lucene/dev/branches/branch_5x/solr/licenses/ (props changed)
lucene/dev/branches/branch_5x/solr/solrj/ (props changed)
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
Modified: lucene/dev/branches/branch_5x/lucene/ivy-versions.properties
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/lucene/ivy-versions.properties?rev=1692319&r1=1692318&r2=1692319&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/lucene/ivy-versions.properties (original)
+++ lucene/dev/branches/branch_5x/lucene/ivy-versions.properties Wed Jul 22 18:54:59 2015
@@ -20,6 +20,8 @@ com.codahale.metrics.version = 3.0.1
/com.cybozu.labs/langdetect = 1.1-20120112
/com.drewnoakes/metadata-extractor = 2.6.2
+/com.facebook.presto/presto-parser = 0.108
+
com.fasterxml.jackson.core.version = 2.5.4
/com.fasterxml.jackson.core/jackson-annotations = ${com.fasterxml.jackson.core.version}
/com.fasterxml.jackson.core/jackson-core = ${com.fasterxml.jackson.core.version}
@@ -68,6 +70,7 @@ com.sun.jersey.version = 1.9
/de.l3s.boilerpipe/boilerpipe = 1.1.0
/dom4j/dom4j = 1.6.1
/hsqldb/hsqldb = 1.8.0.10
+/io.airlift/slice = 0.10
/io.netty/netty = 3.7.0.Final
/jakarta-regexp/jakarta-regexp = 1.4
/javax.activation/activation = 1.1.1
@@ -87,6 +90,7 @@ com.sun.jersey.version = 1.9
/net.sourceforge.jmatio/jmatio = 1.0
/net.sourceforge.nekohtml/nekohtml = 1.9.17
/org.antlr/antlr-runtime = 3.5
+/org.antlr/antlr4-runtime = 4.5
/org.apache.ant/ant = 1.8.2
/org.apache.avro/avro = 1.7.5
/org.apache.commons/commons-compress = 1.8.1
Modified: lucene/dev/branches/branch_5x/solr/core/ivy.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/ivy.xml?rev=1692319&r1=1692318&r2=1692319&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/ivy.xml (original)
+++ lucene/dev/branches/branch_5x/solr/core/ivy.xml Wed Jul 22 18:54:59 2015
@@ -129,6 +129,10 @@
<!-- StatsComponents percentiles Dependencies-->
<dependency org="com.tdunning" name="t-digest" rev="${/com.tdunning/t-digest}" conf="compile->*"/>
+ <dependency org="com.facebook.presto" name="presto-parser" rev="${/com.facebook.presto/presto-parser}"/>
+ <dependency org="org.antlr" name="antlr4-runtime" rev="${/org.antlr/antlr4-runtime}"/>
+ <dependency org="io.airlift" name="slice" rev="${/io.airlift/slice}"/>
+
<exclude org="*" ext="*" matcher="regexp" type="${ivy.exclude.types}"/>
</dependencies>
</ivy-module>
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java?rev=1692319&r1=1692318&r2=1692319&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java Wed Jul 22 18:54:59 2015
@@ -45,7 +45,7 @@ import org.apache.solr.common.util.Base6
public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
- private SolrClientCache clientCache = new SolrClientCache();
+ static SolrClientCache clientCache = new SolrClientCache();
private StreamFactory streamFactory = new StreamFactory();
public void inform(SolrCore core) {
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java?rev=1692319&r1=1692318&r2=1692319&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java Wed Jul 22 18:54:59 2015
@@ -58,15 +58,27 @@ public class Tuple implements Cloneable
}
public String getString(Object key) {
- return (String)this.fields.get(key);
+ return this.fields.get(key).toString();
}
public Long getLong(Object key) {
- return (Long)this.fields.get(key);
+ Object o = this.fields.get(key);
+ if(o instanceof Long) {
+ return (Long)o;
+ } else {
+ //Attempt to parse the long
+ return Long.parseLong(o.toString());
+ }
}
public Double getDouble(Object key) {
- return (Double)this.fields.get(key);
+ Object o = this.fields.get(key);
+ if(o instanceof Double) {
+ return (Double)o;
+ } else {
+ //Attempt to parse the double
+ return Double.parseDouble(o.toString());
+ }
}
public List<String> getStrings(Object key) {
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java?rev=1692319&r1=1692318&r2=1692319&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java Wed Jul 22 18:54:59 2015
@@ -39,6 +39,13 @@ import org.apache.solr.client.solrj.io.s
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.UniqueStream;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
+import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.Metric;
+import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.common.SolrInputDocument;
@@ -486,6 +493,241 @@ public class StreamingTest extends Abstr
commit();
}
+ private void testRollupStream() throws Exception {
+
+ indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+ indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+ indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+ indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+ indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+ indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+ indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+ indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+ indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+ indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
+
+ commit();
+
+ String zkHost = zkServer.getZkAddress();
+ streamFactory.withCollectionZkHost("collection1", zkHost);
+
+ Map paramsA = mapParams("q","*:*","fl","a_s,a_i,a_f","sort", "a_s asc");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+ Bucket[] buckets = {new Bucket("a_s")};
+
+ Metric[] metrics = {new SumMetric("a_i"),
+ new SumMetric("a_f"),
+ new MinMetric("a_i"),
+ new MinMetric("a_f"),
+ new MaxMetric("a_i"),
+ new MaxMetric("a_f"),
+ new MeanMetric("a_i"),
+ new MeanMetric("a_f"),
+ new CountMetric()};
+
+ RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
+ List<Tuple> tuples = getTuples(rollupStream);
+
+ assert(tuples.size() == 3);
+
+ //Test Long and Double Sums
+
+ Tuple tuple = tuples.get(0);
+ String bucket = tuple.getString("a_s");
+ Double sumi = tuple.getDouble("sum(a_i)");
+ Double sumf = tuple.getDouble("sum(a_f)");
+ Double mini = tuple.getDouble("min(a_i)");
+ Double minf = tuple.getDouble("min(a_f)");
+ Double maxi = tuple.getDouble("max(a_i)");
+ Double maxf = tuple.getDouble("max(a_f)");
+ Double avgi = tuple.getDouble("avg(a_i)");
+ Double avgf = tuple.getDouble("avg(a_f)");
+ Double count = tuple.getDouble("count(*)");
+
+
+ assertTrue(bucket.equals("hello0"));
+ assertTrue(sumi.doubleValue() == 17.0D);
+ assertTrue(sumf.doubleValue() == 18.0D);
+ assertTrue(mini.doubleValue() == 0.0D);
+ assertTrue(minf.doubleValue() == 1.0D);
+ assertTrue(maxi.doubleValue() == 14.0D);
+ assertTrue(maxf.doubleValue() == 10.0D);
+ assertTrue(avgi.doubleValue() == 4.25D);
+ assertTrue(avgf.doubleValue() == 4.5D);
+ assertTrue(count.doubleValue() == 4);
+
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello3"));
+ assertTrue(sumi.doubleValue() == 38.0D);
+ assertTrue(sumf.doubleValue() == 26.0D);
+ assertTrue(mini.doubleValue() == 3.0D);
+ assertTrue(minf.doubleValue() == 3.0D);
+ assertTrue(maxi.doubleValue() == 13.0D);
+ assertTrue(maxf.doubleValue() == 9.0D);
+ assertTrue(avgi.doubleValue() == 9.5D);
+ assertTrue(avgf.doubleValue() == 6.5D);
+ assertTrue(count.doubleValue() == 4);
+
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello4"));
+ assertTrue(sumi.longValue() == 15);
+ assertTrue(sumf.doubleValue() == 11.0D);
+ assertTrue(mini.doubleValue() == 4.0D);
+ assertTrue(minf.doubleValue() == 4.0D);
+ assertTrue(maxi.doubleValue() == 11.0D);
+ assertTrue(maxf.doubleValue() == 7.0D);
+ assertTrue(avgi.doubleValue() == 7.5D);
+ assertTrue(avgf.doubleValue() == 5.5D);
+ assertTrue(count.doubleValue() == 2);
+
+
+
+ del("*:*");
+ commit();
+ }
+
+
+
+
+ private void testParallelRollupStream() throws Exception {
+
+ indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+ indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
+ indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+ indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+ indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+ indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+ indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+ indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+ indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+ indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
+
+ commit();
+
+ String zkHost = zkServer.getZkAddress();
+ streamFactory.withCollectionZkHost("collection1", zkHost);
+
+ Map paramsA = mapParams("q","*:*","fl","a_s,a_i,a_f","sort", "a_s asc", "partitionKeys", "a_s");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+ Bucket[] buckets = {new Bucket("a_s")};
+
+ Metric[] metrics = {new SumMetric("a_i"),
+ new SumMetric("a_f"),
+ new MinMetric("a_i"),
+ new MinMetric("a_f"),
+ new MaxMetric("a_i"),
+ new MaxMetric("a_f"),
+ new MeanMetric("a_i"),
+ new MeanMetric("a_f"),
+ new CountMetric()};
+
+ RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
+ ParallelStream parallelStream = new ParallelStream(zkHost, "collection1", rollupStream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+ List<Tuple> tuples = getTuples(parallelStream);
+
+ assert(tuples.size() == 3);
+
+ //Test Long and Double Sums
+
+ Tuple tuple = tuples.get(0);
+ String bucket = tuple.getString("a_s");
+ Double sumi = tuple.getDouble("sum(a_i)");
+ Double sumf = tuple.getDouble("sum(a_f)");
+ Double mini = tuple.getDouble("min(a_i)");
+ Double minf = tuple.getDouble("min(a_f)");
+ Double maxi = tuple.getDouble("max(a_i)");
+ Double maxf = tuple.getDouble("max(a_f)");
+ Double avgi = tuple.getDouble("avg(a_i)");
+ Double avgf = tuple.getDouble("avg(a_f)");
+ Double count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello0"));
+ assertTrue(sumi.doubleValue() == 17.0D);
+ assertTrue(sumf.doubleValue() == 18.0D);
+ assertTrue(mini.doubleValue() == 0.0D);
+ assertTrue(minf.doubleValue() == 1.0D);
+ assertTrue(maxi.doubleValue() == 14.0D);
+ assertTrue(maxf.doubleValue() == 10.0D);
+ assertTrue(avgi.doubleValue() == 4.25D);
+ assertTrue(avgf.doubleValue() == 4.5D);
+ assertTrue(count.doubleValue() == 4);
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello3"));
+ assertTrue(sumi.doubleValue() == 38.0D);
+ assertTrue(sumf.doubleValue() == 26.0D);
+ assertTrue(mini.doubleValue() == 3.0D);
+ assertTrue(minf.doubleValue() == 3.0D);
+ assertTrue(maxi.doubleValue() == 13.0D);
+ assertTrue(maxf.doubleValue() == 9.0D);
+ assertTrue(avgi.doubleValue() == 9.5D);
+ assertTrue(avgf.doubleValue() == 6.5D);
+ assertTrue(count.doubleValue() == 4);
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello4"));
+ assertTrue(sumi.longValue() == 15);
+ assertTrue(sumf.doubleValue() == 11.0D);
+ assertTrue(mini.doubleValue() == 4.0D);
+ assertTrue(minf.doubleValue() == 4.0D);
+ assertTrue(maxi.doubleValue() == 11.0D);
+ assertTrue(maxf.doubleValue() == 7.0D);
+ assertTrue(avgi.doubleValue() == 7.5D);
+ assertTrue(avgf.doubleValue() == 5.5D);
+ assertTrue(count.doubleValue() == 2);
+
+ del("*:*");
+ commit();
+ }
+
private void testZeroParallelReducerStream() throws Exception {
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
@@ -782,8 +1024,8 @@ public class StreamingTest extends Abstr
stream = new CloudSolrStream(zkHost, "collection1", params);
tuples = getTuples(stream);
- assert(tuples.size() == 5);
- assertOrder(tuples, 0,2,1,3,4);
+ assert (tuples.size() == 5);
+ assertOrder(tuples, 0, 2, 1, 3, 4);
del("*:*");
commit();
@@ -796,11 +1038,13 @@ public class StreamingTest extends Abstr
testRankStream();
testMergeStream();
testReducerStream();
+ testRollupStream();
testZeroReducerStream();
testParallelEOF();
testParallelUniqueStream();
testParallelRankStream();
testParallelMergeStream();
+ testParallelRollupStream();
testParallelReducerStream();
testZeroParallelReducerStream();
}