You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/03/25 20:42:26 UTC
svn commit: r1669212 [2/2] - in /lucene/dev/branches/branch_5x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/handler/
solr/core/src/java/org/apache/solr/response/
solr/core/src/java/org/apache/solr/search/
solr/core/src/test-files/solr/collec...
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java?rev=1669212&r1=1665391&r2=1669212&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java Wed Mar 25 19:42:25 2015
@@ -19,6 +19,7 @@ package org.apache.solr.client.solrj.io;
import java.io.File;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -127,323 +128,85 @@ public class StreamingTest extends Abstr
}
- private void testHashJoinStream() throws Exception {
- indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
- indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "join_i", "1000");
- indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
- indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
- indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
- indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "0", "join_i", "2000");
- indexr(id, "7", "a_s", "hello7", "a_i", "1", "a_f", "0");
-
- commit();
+ private void testSpacesInParams() throws Exception {
String zkHost = zkServer.getZkAddress();
- //Test one-to-one
- Map paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_s desc");
- CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
- Map fieldMappings = new HashMap();
- fieldMappings.put("id","streamB.id");
-
- Map paramsB = mapParams("q","id:(2)","fl","id,a_s,a_f,join_i", "sort", "a_s desc");
- CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- streamB.setFieldMappings(fieldMappings);
-
- String[] keys = {"a_f"};
-
- HashJoinStream fstream = new HashJoinStream(streamA, streamB, keys);
- List<Tuple> tuples = getTuples(fstream);
-
- assert(tuples.size() == 1);
- assertOrder(tuples, 0);
- assertLong(tuples.get(0), "join_i", 1000);
-
-
- //Test one-to-many
-
- paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_s desc");
- streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
- fieldMappings = new HashMap();
- fieldMappings.put("id","streamB.id");
-
- paramsB = mapParams("q","id:(2 6)","fl","id,a_s,a_f,join_i", "sort", "a_s desc");
- streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- streamB.setFieldMappings(fieldMappings);
-
-
- fstream = new HashJoinStream(streamA, streamB, keys);
- tuples = getTuples(fstream);
+ Map params = mapParams("q","*:*","fl","id , a_s , a_i , a_f","sort", "a_f asc , a_i asc");
- assert(tuples.size() == 2);
- assertOrder(tuples, 0,0);
- assertLong(tuples.get(0), "join_i", 1000);
- assertLong(tuples.get(1), "join_i", 2000);
+ //CloudSolrStream compares the values of the sort with the fl field.
+ //The constructor will throw an exception if the sort fields do not the
+ //a value in the field list.
- //Test many-to-one
-
- paramsA = mapParams("q","id:(0 2 1 3 4) ","fl","id,a_s,a_f", "sort", "a_s desc");
- streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
- fieldMappings = new HashMap();
- fieldMappings.put("id","streamB.id");
-
- paramsB = mapParams("q","id:(6)","fl","id,a_s,a_f,join_i", "sort", "a_s desc");
- streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- streamB.setFieldMappings(fieldMappings);
-
-
- fstream = new HashJoinStream(streamA, streamB, keys);
- tuples = getTuples(fstream);
-
- assert(tuples.size() == 2);
- assertOrder(tuples, 2,0);
- assertLong(tuples.get(0), "join_i", 2000);
- assertLong(tuples.get(1), "join_i", 2000);
-
- //Test many-to-many
-
- paramsA = mapParams("q","id:(0 7 1 3 4) ","fl","id,a_s,a_f", "sort", "a_s desc");
- streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
- fieldMappings = new HashMap();
- fieldMappings.put("id","streamB.id");
-
- paramsB = mapParams("q","id:(6 2)","fl","id,a_s,a_f,join_i", "sort", "a_s desc");
- streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- streamB.setFieldMappings(fieldMappings);
-
-
- fstream = new HashJoinStream(streamA, streamB, keys);
- tuples = getTuples(fstream);
-
- assert(tuples.size() == 4);
- assertOrder(tuples, 7,7,0,0);
- assertLong(tuples.get(0), "join_i", 1000);
- assertLong(tuples.get(1), "join_i", 2000);
- assertLong(tuples.get(2), "join_i", 1000);
- assertLong(tuples.get(3), "join_i", 2000);
+ CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
del("*:*");
commit();
}
- private void testMergeJoinStream() throws Exception {
+ private void testNonePartitionKeys() throws Exception {
- indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
- indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "join_i", "1000");
+ indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+ indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
- indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
- indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "0", "join_i", "2000");
- indexr(id, "7", "a_s", "hello7", "a_i", "1", "a_f", "0");
+ indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+ indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+ indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+ indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+ indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+ indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
commit();
String zkHost = zkServer.getZkAddress();
- //Test one-to-one
- Map paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc");
- CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
- Map fieldMappings = new HashMap();
- fieldMappings.put("id","streamB.id");
-
- Map paramsB = mapParams("q","id:(2)","fl","id,a_s,a_f,join_i", "sort", "a_f desc");
- CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- streamB.setFieldMappings(fieldMappings);
-
- String[] keys = {"a_f"};
-
- MergeJoinStream fstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f"));
- List<Tuple> tuples = getTuples(fstream);
-
- assert(tuples.size() == 1);
- assertOrder(tuples, 0);
- assertLong(tuples.get(0), "join_i", 1000);
-
-
- //Test one-to-many
-
- paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc");
- streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
- fieldMappings = new HashMap();
- fieldMappings.put("id","streamB.id");
-
- paramsB = mapParams("q","id:(2 6)","fl","id,a_s,a_f,join_i", "sort", "a_f desc");
- streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- streamB.setFieldMappings(fieldMappings);
-
-
- fstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f"));
- tuples = getTuples(fstream);
-
- assert(tuples.size() == 2);
- assertOrder(tuples, 0,0);
- assertLong(tuples.get(0), "join_i", 1000);
- assertLong(tuples.get(1), "join_i", 2000);
-
- //Test many-to-one
-
- paramsA = mapParams("q","id:(0 2 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc");
- streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
- fieldMappings = new HashMap();
- fieldMappings.put("id","streamB.id");
-
- paramsB = mapParams("q","id:(6)","fl","id,a_s,a_f,join_i", "sort", "a_f desc");
- streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- streamB.setFieldMappings(fieldMappings);
-
-
- fstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f"));
- tuples = getTuples(fstream);
-
- assert(tuples.size() == 2);
- assertOrder(tuples, 2,0);
- assertLong(tuples.get(0), "join_i", 2000);
- assertLong(tuples.get(1), "join_i", 2000);
-
- //Test many-to-many
-
- paramsA = mapParams("q","id:(0 7 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc");
- streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
- fieldMappings = new HashMap();
- fieldMappings.put("id","streamB.id");
-
- paramsB = mapParams("q","id:(6 2)","fl","id,a_s,a_f,join_i", "sort", "a_f desc");
- streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- streamB.setFieldMappings(fieldMappings);
-
+ Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "none");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+ ParallelStream pstream = new ParallelStream(zkHost, "collection1", stream, 2, new AscFieldComp("a_s"));
- fstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f"));
- tuples = getTuples(fstream);
+ List<Tuple> tuples = getTuples(pstream);
- assert(tuples.size() == 4);
- assertOrder(tuples, 7,7,0,0);
- assertLong(tuples.get(0), "join_i", 1000);
- assertLong(tuples.get(1), "join_i", 2000);
- assertLong(tuples.get(2), "join_i", 1000);
- assertLong(tuples.get(3), "join_i", 2000);
+ assert(tuples.size() == 20); // Each tuple will be double counted.
del("*:*");
commit();
}
- private void testParallelMergeJoinStream() throws Exception {
+
+
+
+ private void testParallelUniqueStream() throws Exception {
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
- indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "join_i", "1000");
+ indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
- indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "0", "join_i", "2000");
- indexr(id, "7", "a_s", "hello7", "a_i", "1", "a_f", "0");
+ indexr(id, "5", "a_s", "hello1", "a_i", "10", "a_f", "1");
+ indexr(id, "6", "a_s", "hello1", "a_i", "11", "a_f", "5");
+ indexr(id, "7", "a_s", "hello1", "a_i", "12", "a_f", "5");
+ indexr(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4");
commit();
String zkHost = zkServer.getZkAddress();
- //Test one-to-one
- Map paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc", "partitionKeys","a_f");
- CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
- Map fieldMappings = new HashMap();
- fieldMappings.put("id","streamB.id");
-
- Map paramsB = mapParams("q","id:(2)","fl","id,a_s,a_f,join_i", "sort", "a_f desc", "partitionKeys","a_f");
- CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- streamB.setFieldMappings(fieldMappings);
-
- String[] keys = {"a_f"};
-
- MergeJoinStream mstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f"));
- ParallelStream fstream = new ParallelStream(zkHost,"collection1", mstream, 2, new DescFieldComp("a_f"));
-
- List<Tuple> tuples = getTuples(fstream);
-
- assert(tuples.size() == 1);
- assertOrder(tuples, 0);
- assertLong(tuples.get(0), "join_i", 1000);
-
-
- //Test one-to-many
-
- paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc", "partitionKeys","a_f");
- streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
- fieldMappings = new HashMap();
- fieldMappings.put("id","streamB.id");
-
- paramsB = mapParams("q","id:(2 6)","fl","id,a_s,a_f,join_i", "sort", "a_f desc", "partitionKeys","a_f");
- streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- streamB.setFieldMappings(fieldMappings);
-
-
- mstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f"));
- fstream = new ParallelStream(zkHost,"collection1", mstream, 2, new DescFieldComp("a_f"));
-
- tuples = getTuples(fstream);
-
- assert(tuples.size() == 2);
- assertOrder(tuples, 0,0);
- assertLong(tuples.get(0), "join_i", 1000);
- assertLong(tuples.get(1), "join_i", 2000);
-
- //Test many-to-one
-
- paramsA = mapParams("q","id:(0 2 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc", "partitionKeys","a_f");
- streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
- fieldMappings = new HashMap();
- fieldMappings.put("id","streamB.id");
-
- paramsB = mapParams("q","id:(6)","fl","id,a_s,a_f,join_i", "sort", "a_f desc", "partitionKeys","a_f");
- streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- streamB.setFieldMappings(fieldMappings);
-
-
- mstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f"));
- fstream = new ParallelStream(zkHost,"collection1", mstream, 2, new DescFieldComp("a_f"));
-
- tuples = getTuples(fstream);
-
- assert(tuples.size() == 2);
- assertOrder(tuples, 2,0);
- assertLong(tuples.get(0), "join_i", 2000);
- assertLong(tuples.get(1), "join_i", 2000);
-
- //Test many-to-many
-
- paramsA = mapParams("q","id:(0 7 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc", "partitionKeys","a_f");
- streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
- fieldMappings = new HashMap();
- fieldMappings.put("id","streamB.id");
-
- paramsB = mapParams("q","id:(6 2)","fl","id,a_s,a_f,join_i", "sort", "a_f desc", "partitionKeys","a_f");
- streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
- streamB.setFieldMappings(fieldMappings);
-
-
- mstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f"));
- fstream = new ParallelStream(zkHost,"collection1", mstream, 2, new DescFieldComp("a_f"));
+ Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
+ UniqueStream ustream = new UniqueStream(stream, new AscFieldComp("a_f"));
+ ParallelStream pstream = new ParallelStream(zkHost, "collection1", ustream, 2, new AscFieldComp("a_f"));
+ List<Tuple> tuples = getTuples(pstream);
+ assert(tuples.size() == 5);
+ assertOrder(tuples, 0,1,3,4,6);
- tuples = getTuples(fstream);
+ //Test the eofTuples
- assert(tuples.size() == 4);
- assertOrder(tuples, 7,7,0,0);
- assertLong(tuples.get(0), "join_i", 1000);
- assertLong(tuples.get(1), "join_i", 2000);
- assertLong(tuples.get(2), "join_i", 1000);
- assertLong(tuples.get(3), "join_i", 2000);
+ Map<String,Tuple> eofTuples = pstream.getEofTuples();
+ assert(eofTuples.size() == 2); //There should be an EOF tuple for each worker.
del("*:*");
commit();
@@ -478,467 +241,188 @@ public class StreamingTest extends Abstr
commit();
}
- private void testRollupStream() throws Exception {
- indexr(id, "0", "a_s", "hello0", "a_i", "100", "a_f", "0");
- indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "0");
- indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
- indexr(id, "4", "a_s", "hello3", "a_i", "4", "a_f", "4");
- indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
- indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "1");
- indexr(id, "7", "a_s", "hello1", "a_i", "1", "a_f", "1");
+ private void testParallelRankStream() throws Exception {
- commit();
-
- String zkHost = zkServer.getZkAddress();
- Bucket[] buckets = {new Bucket("a_s")};
- Metric[] metrics = {new SumMetric("a_i", false),
- new MeanMetric("a_i", false),
- new CountMetric(),
- new MinMetric("a_i", false),
- new MaxMetric("a_i", false)};
-
- Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc");
- CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
- RollupStream rstream = new RollupStream(stream, buckets, metrics);
- rstream.open();
- Tuple tuple = rstream.read();
- String b = (String)tuple.get("buckets");
- List<Double> values = (List<Double>)tuple.get("metricValues");
- assert(b.equals("hello0"));
- assert(values.get(0) == 102.0d);
- assert(values.get(1) == 51.0d);
- assert(values.get(2) == 2.0d);
- assert(values.get(3) == 2.0d);
- assert(values.get(4) == 100.0d);
-
- tuple = rstream.read();
- b = (String)tuple.get("buckets");
- values = (List<Double>)tuple.get("metricValues");
- assert(b.equals("hello1"));
- assert(values.get(0) == 3.0d);
- assert(values.get(1) == 1.0d);
- assert(values.get(2) == 3.0d);
- assert(values.get(3) == 1.0d);
- assert(values.get(4) == 1.0d);
-
-
- tuple = rstream.read();
- b = (String)tuple.get("buckets");
- values = (List<Double>)tuple.get("metricValues");
- assert(b.equals("hello3"));
- assert(values.get(0) == 7.0d);
- assert(values.get(1) == 3.5d);
- assert(values.get(2) == 2.0d);
- assert(values.get(3) == 3.0d);
- assert(values.get(4) == 4.0d);
-
- tuple = rstream.read();
- assert(tuple.EOF);
-
- rstream.close();
- del("*:*");
- commit();
- }
-
- private void testParallelRollupStream() throws Exception {
- indexr(id, "0", "a_s", "hello0", "a_i", "100", "a_f", "0");
- indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "0");
+ indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+ indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
- indexr(id, "4", "a_s", "hello3", "a_i", "4", "a_f", "4");
- indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
- indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "1");
- indexr(id, "7", "a_s", "hello1", "a_i", "1", "a_f", "1");
+ indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+ indexr(id, "5", "a_s", "hello1", "a_i", "5", "a_f", "1");
+ indexr(id, "6", "a_s", "hello1", "a_i", "6", "a_f", "1");
+ indexr(id, "7", "a_s", "hello1", "a_i", "7", "a_f", "1");
+ indexr(id, "8", "a_s", "hello1", "a_i", "8", "a_f", "1");
+ indexr(id, "9", "a_s", "hello1", "a_i", "9", "a_f", "1");
+ indexr(id, "10", "a_s", "hello1", "a_i", "10", "a_f", "1");
commit();
String zkHost = zkServer.getZkAddress();
- Bucket[] buckets = {new Bucket("a_s")};
- Metric[] metrics = {new SumMetric("a_i", false),
- new MeanMetric("a_i", false),
- new CountMetric(),
- new MinMetric("a_i", false),
- new MaxMetric("a_i", false)};
-
- Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc","partitionKeys","a_s");
+ Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
- RollupStream rostream = new RollupStream(stream, buckets, metrics);
- ParallelStream rstream = new ParallelStream(zkHost,"collection1", rostream, 2, new AscFieldComp("buckets"));
-
- rstream.open();
- Tuple tuple = rstream.read();
- String b = (String)tuple.get("buckets");
- List<Double> values = (List<Double>)tuple.get("metricValues");
- assert(b.equals("hello0"));
- assert(values.get(0) == 102.0d);
- assert(values.get(1) == 51.0d);
- assert(values.get(2) == 2.0d);
- assert(values.get(3) == 2.0d);
- assert(values.get(4) == 100.0d);
-
- tuple = rstream.read();
- b = (String)tuple.get("buckets");
- values = (List<Double>)tuple.get("metricValues");
- assert(b.equals("hello1"));
- assert(values.get(0) == 3.0d);
- assert(values.get(1) == 1.0d);
- assert(values.get(2) == 3.0d);
- assert(values.get(3) == 1.0d);
- assert(values.get(4) == 1.0d);
-
-
- tuple = rstream.read();
- b = (String)tuple.get("buckets");
- values = (List<Double>)tuple.get("metricValues");
- assert(b.equals("hello3"));
- assert(values.get(0) == 7.0d);
- assert(values.get(1) == 3.5d);
- assert(values.get(2) == 2.0d);
- assert(values.get(3) == 3.0d);
- assert(values.get(4) == 4.0d);
+ RankStream rstream = new RankStream(stream, 11, new DescFieldComp("a_i"));
+ ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new DescFieldComp("a_i"));
+ List<Tuple> tuples = getTuples(pstream);
- tuple = rstream.read();
- assert(tuple.EOF);
+ assert(tuples.size() == 10);
+ assertOrder(tuples, 10,9,8,7,6,5,4,3,2,0);
- rstream.close();
del("*:*");
commit();
}
+ private void testTrace() throws Exception {
-
- private void testMetricStream() throws Exception {
-
- indexr(id, "0", "a_s", "hello0", "a_i", "100", "a_f", "0");
- indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "0");
+ indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+ indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
- indexr(id, "4", "a_s", "hello3", "a_i", "4", "a_f", "4");
- indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
- indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "1");
- indexr(id, "7", "a_s", "hello1", "a_i", "1", "a_f", "1");
+ indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+ indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+ indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+ indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+ indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+ indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+ indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
commit();
String zkHost = zkServer.getZkAddress();
- Bucket[] buckets = {new Bucket("a_s")};
- Metric[] metrics = {new SumMetric("a_i", false),
- new MeanMetric("a_i", false),
- new CountMetric(),
- new MinMetric("a_i", false),
- new MaxMetric("a_i", false)};
-
- Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
- CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
- MetricStream mstream = new MetricStream(stream, buckets, metrics, "metric1", new DescBucketComp(0),5);
- getTuples(mstream);
-
- BucketMetrics[] bucketMetrics = mstream.getBucketMetrics();
- assert(bucketMetrics.length == 3);
-
- //Bucket should be is descending order based on Metric 0, which is the SumMetric.
-
- assert(bucketMetrics[0].getKey().toString().equals("hello0"));
- assert(bucketMetrics[1].getKey().toString().equals("hello3"));
- assert(bucketMetrics[2].getKey().toString().equals("hello1"));
-
- assertMetric(bucketMetrics[0].getMetrics()[0], 102.0d); //Test the first Metric of the first BucketMetrics
- assertMetric(bucketMetrics[0].getMetrics()[1], 51.0d); //Test the second Metric of the first BucketMetrics
- assertMetric(bucketMetrics[0].getMetrics()[2], 2.0d); //Test the third Metric of the first BucketMetrics
- assertMetric(bucketMetrics[0].getMetrics()[3], 2.0d); //Test the fourth Metric of the first BucketMetrics
- assertMetric(bucketMetrics[0].getMetrics()[4], 100.0d); //Test the fifth Metric of the first BucketMetrics
-
-
- assertMetric(bucketMetrics[1].getMetrics()[0], 7.0d);
- assertMetric(bucketMetrics[2].getMetrics()[0], 3.0d);
-
-
- params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
- stream = new CloudSolrStream(zkHost, "collection1", params);
- mstream = new MetricStream(stream, buckets, metrics, "metric1", new AscBucketComp(0),5);
- getTuples(mstream);
-
- bucketMetrics = mstream.getBucketMetrics();
-
- assertMetric(bucketMetrics[0].getMetrics()[0], 3.0d); //Test the first Metric of the first BucketMetrics
- assertMetric(bucketMetrics[0].getMetrics()[1], 1.0d); //Test the second Metric of the first BucketMetrics
- assertMetric(bucketMetrics[0].getMetrics()[2], 3.0d); //Test the third Metric of the first BucketMetrics
- assertMetric(bucketMetrics[0].getMetrics()[3], 1.0d); //Test the fourth Metric of the first BucketMetrics
- assertMetric(bucketMetrics[0].getMetrics()[4], 1.0d); //Test the fifth Metric of the first BucketMetrics
-
- assertMetric(bucketMetrics[1].getMetrics()[0], 7.0d);
- assertMetric(bucketMetrics[2].getMetrics()[0], 102.0d);
-
- indexr(id, "8", "a_s", "hello4", "a_i", "1000", "a_f", "1"); //Add a fourth record.
- commit();
-
- //Test desc comp with more buckets then priority queue can hold.
- params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
- stream = new CloudSolrStream(zkHost, "collection1", params);
- mstream = new MetricStream(stream, buckets, metrics, "metric1", new DescBucketComp(0),3);
- getTuples(mstream);
-
- bucketMetrics = mstream.getBucketMetrics();
- assert(bucketMetrics.length == 3);
- assert(bucketMetrics[0].getKey().toString().equals("hello4"));
- assert(bucketMetrics[1].getKey().toString().equals("hello0"));
- assert(bucketMetrics[2].getKey().toString().equals("hello3"));
-
- //Test asc comp with more buckets then priority queue can hold.
- params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
- stream = new CloudSolrStream(zkHost, "collection1", params);
- mstream = new MetricStream(stream, buckets, metrics, "metric1", new AscBucketComp(0),3);
- getTuples(mstream);
-
- bucketMetrics = mstream.getBucketMetrics();
- assert(bucketMetrics.length == 3);
- assert(bucketMetrics[0].getKey().toString().equals("hello1"));
- assert(bucketMetrics[1].getKey().toString().equals("hello3"));
- assert(bucketMetrics[2].getKey().toString().equals("hello0"));
-
-
- //Test with no buckets
- params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
- stream = new CloudSolrStream(zkHost, "collection1", params);
- mstream = new MetricStream(stream, metrics, "metric1");
- getTuples(mstream);
-
- bucketMetrics = mstream.getBucketMetrics();
- assert(bucketMetrics.length == 1);
- assert(bucketMetrics[0].getKey().toString().equals("metrics"));
- assertMetric(bucketMetrics[0].getMetrics()[0], 1112.0d); //Test the first Metric of the first BucketMetrics
+ //Test with spaces in the parameter lists.
+ Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_s asc , a_f asc");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+ stream.setTrace(true);
+ List<Tuple> tuples = getTuples(stream);
+ assert(tuples.get(0).get("_COLLECTION_").equals("collection1"));
+ assert(tuples.get(1).get("_COLLECTION_").equals("collection1"));
+ assert(tuples.get(2).get("_COLLECTION_").equals("collection1"));
+ assert(tuples.get(3).get("_COLLECTION_").equals("collection1"));
del("*:*");
commit();
}
- private void testParallelMetricStream() throws Exception {
-
- indexr(id, "0", "a_s", "hello0", "a_i", "100", "a_f", "0");
- indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "0");
- indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
- indexr(id, "4", "a_s", "hello3", "a_i", "4", "a_f", "4");
- indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
- indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "1");
- indexr(id, "7", "a_s", "hello1", "a_i", "1", "a_f", "1");
-
- commit();
-
- String zkHost = zkServer.getZkAddress();
-
- Bucket[] buckets = {new Bucket("a_s")};
- Metric[] metrics = {new SumMetric("a_i", false),
- new MeanMetric("a_i", false),
- new CountMetric(),
- new MinMetric("a_i", false),
- new MaxMetric("a_i", false)};
-
- Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
- CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
- MetricStream mstream = new MetricStream(stream, buckets, metrics, "metric1", new DescBucketComp(0),5);
- ParallelStream pstream = new ParallelStream(zkHost,"collection1",mstream,2,new AscFieldComp("a_i"));
- getTuples(pstream);
-
- BucketMetrics[] bucketMetrics = mstream.getBucketMetrics();
- assert(bucketMetrics.length == 3);
-
- //Bucket should be is descending order based on Metric 0, which is the SumMetric.
-
- assert(bucketMetrics[0].getKey().toString().equals("hello0"));
- assert(bucketMetrics[1].getKey().toString().equals("hello3"));
- assert(bucketMetrics[2].getKey().toString().equals("hello1"));
- assertMetric(bucketMetrics[0].getMetrics()[0], 102.0d); //Test the first Metric of the first BucketMetrics
- assertMetric(bucketMetrics[0].getMetrics()[1], 51.0d); //Test the second Metric of the first BucketMetrics
- assertMetric(bucketMetrics[0].getMetrics()[2], 2.0d); //Test the third Metric of the first BucketMetrics
- assertMetric(bucketMetrics[0].getMetrics()[3], 2.0d); //Test the fourth Metric of the first BucketMetrics
- assertMetric(bucketMetrics[0].getMetrics()[4], 100.0d); //Test the fifth Metric of the first BucketMetrics
+ private void testReducerStream() throws Exception {
- assertMetric(bucketMetrics[1].getMetrics()[0], 7.0d);
- assertMetric(bucketMetrics[2].getMetrics()[0], 3.0d);
-
-
- params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
- stream = new CloudSolrStream(zkHost, "collection1", params);
- mstream = new MetricStream(stream, buckets, metrics, "metric1", new AscBucketComp(0),5);
- getTuples(mstream);
-
- bucketMetrics = mstream.getBucketMetrics();
-
- assertMetric(bucketMetrics[0].getMetrics()[0], 3.0d); //Test the first Metric of the first BucketMetrics
- assertMetric(bucketMetrics[0].getMetrics()[1], 1.0d); //Test the second Metric of the first BucketMetrics
- assertMetric(bucketMetrics[0].getMetrics()[2], 3.0d); //Test the third Metric of the first BucketMetrics
- assertMetric(bucketMetrics[0].getMetrics()[3], 1.0d); //Test the fourth Metric of the first BucketMetrics
- assertMetric(bucketMetrics[0].getMetrics()[4], 1.0d); //Test the fifth Metric of the first BucketMetrics
-
- assertMetric(bucketMetrics[1].getMetrics()[0], 7.0d);
- assertMetric(bucketMetrics[2].getMetrics()[0], 102.0d);
-
- indexr(id, "8", "a_s", "hello4", "a_i", "1000", "a_f", "1"); //Add a fourth record.
- commit();
-
- //Test desc comp with more buckets then priority queue can hold.
- params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
- stream = new CloudSolrStream(zkHost, "collection1", params);
- mstream = new MetricStream(stream, buckets, metrics, "metric1", new DescBucketComp(0),3);
- getTuples(mstream);
-
- bucketMetrics = mstream.getBucketMetrics();
- assert(bucketMetrics.length == 3);
- assert(bucketMetrics[0].getKey().toString().equals("hello4"));
- assert(bucketMetrics[1].getKey().toString().equals("hello0"));
- assert(bucketMetrics[2].getKey().toString().equals("hello3"));
-
- //Test asc comp with more buckets then priority queue can hold.
- params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
- stream = new CloudSolrStream(zkHost, "collection1", params);
- mstream = new MetricStream(stream, buckets, metrics, "metric1", new AscBucketComp(0),3);
- getTuples(mstream);
-
- bucketMetrics = mstream.getBucketMetrics();
- assert(bucketMetrics.length == 3);
- assert(bucketMetrics[0].getKey().toString().equals("hello1"));
- assert(bucketMetrics[1].getKey().toString().equals("hello3"));
- assert(bucketMetrics[2].getKey().toString().equals("hello0"));
-
-
- //Test with no buckets
- params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
- stream = new CloudSolrStream(zkHost, "collection1", params);
- mstream = new MetricStream(stream, metrics, "metric1");
- getTuples(mstream);
-
- bucketMetrics = mstream.getBucketMetrics();
- assert(bucketMetrics.length == 1);
- assert(bucketMetrics[0].getKey().toString().equals("metrics"));
- assertMetric(bucketMetrics[0].getMetrics()[0], 1112.0d); //Test the first Metric of the first BucketMetrics
-
- del("*:*");
- commit();
- }
-
- private void testGroupByStream() throws Exception {
-
- indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
- indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "0");
+ indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+ indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
- indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "1");
+ indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+ indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+ indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+ indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+ indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+ indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
commit();
- //Test CloudSolrStream and SumStream over an int field
String zkHost = zkServer.getZkAddress();
- Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc");
+ //Test with spaces in the parameter lists.
+ Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_s asc , a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
- GroupByStream gstream = new GroupByStream(stream, new AscFieldComp("a_s"), new DescFieldComp("a_i"), 5);
+ ReducerStream rstream = new ReducerStream(stream, new AscFieldComp("a_s"));
- List<Tuple> tuples = getTuples(gstream);
+ List<Tuple> tuples = getTuples(rstream);
assert(tuples.size() == 3);
- assertOrder(tuples, 2,3,4);
- assertGroupOrder(tuples.get(0), 1, 0);
-
- del("*:*");
- commit();
- }
+ assertOrder(tuples, 0,3,4);
+ Tuple t0 = tuples.get(0);
+ List<Map> maps0 = t0.getMaps();
+ assertMaps(maps0, 0, 2,1, 9);
- private void testFilterStream() throws Exception {
+ Tuple t1 = tuples.get(1);
+ List<Map> maps1 = t1.getMaps();
+ assertMaps(maps1, 3, 5, 7, 8);
- indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
- indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
- indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
- indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
- indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+ Tuple t2 = tuples.get(2);
+ List<Map> maps2 = t2.getMaps();
+ assertMaps(maps2, 4, 6);
- commit();
- //Test CloudSolrStream and SumStream over an int field
- String zkHost = zkServer.getZkAddress();
-
- Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc");
- CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
- Map paramsB = mapParams("q","id:(0 2)","fl","a_s","sort", "a_s asc");
- CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
-
-
- FilterStream fstream = new FilterStream(streamA, streamB, new AscFieldComp("a_s"));
- List<Tuple> tuples = getTuples(fstream);
-
- assert(tuples.size() == 2);
- assertOrder(tuples, 0,2);
del("*:*");
commit();
}
- private void testParallelStream() throws Exception {
- indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
- indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+ private void testParallelReducerStream() throws Exception {
+
+ indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
+ indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
- indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+ indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
+ indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
+ indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
+ indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
+ indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
+ indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
commit();
String zkHost = zkServer.getZkAddress();
- Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc", "partitionKeys","a_s");
- CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
-
- Map paramsB = mapParams("q","id:(0 2)","fl","a_s","sort", "a_s asc", "partitionKeys","a_s");
- CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+ Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+ ReducerStream rstream = new ReducerStream(stream, new AscFieldComp("a_s"));
+ ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new AscFieldComp("a_s"));
- FilterStream fstream = new FilterStream(streamA, streamB, new AscFieldComp("a_s"));
- ParallelStream pstream = new ParallelStream(zkHost,"collection1", fstream, 2, new AscFieldComp("a_s"));
List<Tuple> tuples = getTuples(pstream);
- assert(tuples.size() == 2);
- assertOrder(tuples, 0,2);
+ assert(tuples.size() == 3);
+ assertOrder(tuples, 0,3,4);
- del("*:*");
- commit();
- }
+ Tuple t0 = tuples.get(0);
+ List<Map> maps0 = t0.getMaps();
+ assertMaps(maps0, 0, 2, 1, 9);
+ Tuple t1 = tuples.get(1);
+ List<Map> maps1 = t1.getMaps();
+ assertMaps(maps1, 3, 5, 7, 8);
+ Tuple t2 = tuples.get(2);
+ List<Map> maps2 = t2.getMaps();
+ assertMaps(maps2, 4, 6);
- private void testParallelHashJoinStream() {
+ //Test Descending with Ascending subsort
- }
+ paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
+ stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+ rstream = new ReducerStream(stream, new DescFieldComp("a_s"));
+ pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new DescFieldComp("a_s"));
- private void testParallelGroupByStream() throws Exception {
+ tuples = getTuples(pstream);
- indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
- indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
- indexr(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3");
- indexr(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4");
- indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+ assert(tuples.size() == 3);
+ assertOrder(tuples, 4,3,0);
- commit();
+ t0 = tuples.get(0);
+ maps0 = t0.getMaps();
+ assertMaps(maps0, 4, 6);
+
+
+ t1 = tuples.get(1);
+ maps1 = t1.getMaps();
+ assertMaps(maps1, 3, 5, 7, 8);
+
+
+ t2 = tuples.get(2);
+ maps2 = t2.getMaps();
+ assertMaps(maps2, 0, 2, 1, 9);
- String zkHost = zkServer.getZkAddress();
- Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc", "partitionKeys","a_s");
- CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
- GroupByStream gstream = new GroupByStream(stream, new AscFieldComp("a_s"), new AscFieldComp("a_i"),5);
- ParallelStream pstream = new ParallelStream(zkHost,"collection1", gstream, 2, new AscFieldComp("a_s"));
- List<Tuple> tuples = getTuples(pstream);
- assert(tuples.size() == 3);
- assertOrder(tuples, 0,1,2);
- assertGroupOrder(tuples.get(0),3,4);
del("*:*");
commit();
}
-
private void testTuple() throws Exception {
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "5.1", "s_multi", "a", "s_multi", "b", "i_multi", "1", "i_multi", "2", "f_multi", "1.2", "f_multi", "1.3");
@@ -947,7 +431,7 @@ public class StreamingTest extends Abstr
String zkHost = zkServer.getZkAddress();
- Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f,s_multi,i_multi,f_multi","sort", "a_s asc", "partitionKeys","a_s");
+ Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f,s_multi,i_multi,f_multi","sort", "a_s asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
List<Tuple> tuples = getTuples(stream);
Tuple tuple = tuples.get(0);
@@ -978,7 +462,6 @@ public class StreamingTest extends Abstr
commit();
}
-
private void testMergeStream() throws Exception {
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
@@ -1048,6 +531,104 @@ public class StreamingTest extends Abstr
}
+ private void testParallelMergeStream() throws Exception {
+
+ indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+ indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+ indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+ indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+ indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+ indexr(id, "5", "a_s", "hello0", "a_i", "10", "a_f", "0");
+ indexr(id, "6", "a_s", "hello2", "a_i", "8", "a_f", "0");
+ indexr(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "3");
+ indexr(id, "8", "a_s", "hello4", "a_i", "11", "a_f", "4");
+ indexr(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1");
+
+ commit();
+
+ String zkHost = zkServer.getZkAddress();
+
+ //Test ascending
+ Map paramsA = mapParams("q","id:(4 1 8 7 9)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
+ CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+ Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
+ CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+
+ MergeStream mstream = new MergeStream(streamA, streamB, new AscFieldComp("a_i"));
+ ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new AscFieldComp("a_i"));
+ List<Tuple> tuples = getTuples(pstream);
+
+ assert(tuples.size() == 9);
+ assertOrder(tuples, 0,1,2,3,4,7,6,8,9);
+
+ //Test descending
+ paramsA = mapParams("q","id:(4 1 8 9)","fl","id,a_s,a_i","sort", "a_i desc", "partitionKeys", "a_i");
+ streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+ paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i desc", "partitionKeys", "a_i");
+ streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+
+ mstream = new MergeStream(streamA, streamB, new DescFieldComp("a_i"));
+ pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new DescFieldComp("a_i"));
+ tuples = getTuples(pstream);
+
+ assert(tuples.size() == 8);
+ assertOrder(tuples, 9,8,6,4,3,2,1,0);
+
+ del("*:*");
+ commit();
+ }
+
+ private void testParallelEOF() throws Exception {
+
+ indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+ indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+ indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+ indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+ indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+ indexr(id, "5", "a_s", "hello0", "a_i", "10", "a_f", "0");
+ indexr(id, "6", "a_s", "hello2", "a_i", "8", "a_f", "0");
+ indexr(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "3");
+ indexr(id, "8", "a_s", "hello4", "a_i", "11", "a_f", "4");
+ indexr(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1");
+
+ commit();
+
+ String zkHost = zkServer.getZkAddress();
+
+ //Test ascending
+ Map paramsA = mapParams("q","id:(4 1 8 7 9)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
+ CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+
+ Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
+ CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+
+ MergeStream mstream = new MergeStream(streamA, streamB, new AscFieldComp("a_i"));
+ CountStream cstream = new CountStream(mstream);
+ ParallelStream pstream = new ParallelStream(zkHost, "collection1", cstream, 2, new AscFieldComp("a_i"));
+ List<Tuple> tuples = getTuples(pstream);
+
+ assert(tuples.size() == 9);
+ Map<String, Tuple> eofTuples = pstream.getEofTuples();
+ assert(eofTuples.size() == 2); // There should be an EOF Tuple for each worker.
+
+ long totalCount = 0;
+
+ Iterator<Tuple> it = eofTuples.values().iterator();
+ while(it.hasNext()) {
+ Tuple t = it.next();
+ totalCount += t.getLong("count");
+ }
+
+ assert(tuples.size() == totalCount);
+
+ del("*:*");
+ commit();
+ }
+
+
+
@Test
public void streamTests() throws Exception {
assertNotNull(cloudClient);
@@ -1072,7 +653,7 @@ public class StreamingTest extends Abstr
String zkHost = zkServer.getZkAddress();
Map params = null;
- //Basic CloudSolrStream Test with Ascending Sort
+ //Basic CloudSolrStream Test with Descending Sort
params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i desc");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
@@ -1081,7 +662,7 @@ public class StreamingTest extends Abstr
assert(tuples.size() == 5);
assertOrder(tuples, 4, 3, 2, 1, 0);
- //With Descending Sort
+ //With Ascending Sort
params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
stream = new CloudSolrStream(zkHost, "collection1", params);
tuples = getTuples(stream);
@@ -1110,21 +691,18 @@ public class StreamingTest extends Abstr
commit();
testTuple();
+ testSpacesInParams();
+ testNonePartitionKeys();
+ testTrace();
testUniqueStream();
- testMetricStream();
- testRollupStream();
testRankStream();
- testFilterStream();
- testGroupByStream();
- testHashJoinStream();
- testMergeJoinStream();
testMergeStream();
- testParallelStream();
- testParallelRollupStream();
- testParallelMetricStream();
- testParallelGroupByStream();
- testParallelHashJoinStream();
- testParallelMergeJoinStream();
+ testReducerStream();
+ testParallelEOF();
+ testParallelUniqueStream();
+ testParallelRankStream();
+ testParallelMergeStream();
+ testParallelReducerStream();
}
protected Map mapParams(String... vals) {
@@ -1179,19 +757,27 @@ public class StreamingTest extends Abstr
return true;
}
- public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exception {
- long lv = (long)tuple.get(fieldName);
- if(lv != l) {
- throw new Exception("Longs not equal:"+l+" : "+lv);
+ protected boolean assertMaps(List<Map> maps, int... ids) throws Exception {
+ if(maps.size() != ids.length) {
+ throw new Exception("Expected id count != actual map count:"+ids.length+":"+maps.size());
}
+ int i=0;
+ for(int val : ids) {
+ Map t = maps.get(i);
+ Long tip = (Long)t.get("id");
+ if(tip.intValue() != val) {
+ throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+ }
+ ++i;
+ }
return true;
}
- public boolean assertMetric(Metric metric, double value) throws Exception {
- Double d = metric.getValue();
- if(d.doubleValue() != value) {
- throw new Exception("Unexpected Metric "+d+"!="+value);
+ public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exception {
+ long lv = (long)tuple.get(fieldName);
+ if(lv != l) {
+ throw new Exception("Longs not equal:"+l+" : "+lv);
}
return true;