You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/04/20 10:20:36 UTC
[03/23] lucene-solr:feature/autoscaling: Squash-merge from master.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index 2f2273e..0de3aa0 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -131,13 +131,20 @@ public void testUniqueStream() throws Exception {
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- SolrParams sParams = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
- UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
- List<Tuple> tuples = getTuples(ustream);
- assertEquals(4, tuples.size());
- assertOrder(tuples, 0,1,3,4);
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+ try {
+ SolrParams sParams = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
+ UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
+ ustream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(ustream);
+ assertEquals(4, tuples.size());
+ assertOrder(tuples, 0, 1, 3, 4);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -167,15 +174,22 @@ public void testNonePartitionKeys() throws Exception {
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+ try {
- SolrParams sParamsA = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- ParallelStream pstream = parallelStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
- attachStreamFactory(pstream);
- List<Tuple> tuples = getTuples(pstream);
-
- assert(tuples.size() == (10 * numWorkers)); // Each tuple will be double counted.
+ SolrParams sParamsA = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ ParallelStream pstream = parallelStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+ attachStreamFactory(pstream);
+ pstream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(pstream);
+ assert (tuples.size() == (10 * numWorkers)); // Each tuple will be double counted.
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -193,19 +207,29 @@ public void testParallelUniqueStream() throws Exception {
.add(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
- UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
- ParallelStream pstream = parallelStream(ustream, new FieldComparator("a_f", ComparatorOrder.ASCENDING));
- attachStreamFactory(pstream);
- List<Tuple> tuples = getTuples(pstream);
- assertEquals(5, tuples.size());
- assertOrder(tuples, 0, 1, 3, 4, 6);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
- //Test the eofTuples
+ SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
+ UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
+ ParallelStream pstream = parallelStream(ustream, new FieldComparator("a_f", ComparatorOrder.ASCENDING));
+ attachStreamFactory(pstream);
+ pstream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(pstream);
+ assertEquals(5, tuples.size());
+ assertOrder(tuples, 0, 1, 3, 4, 6);
- Map<String,Tuple> eofTuples = pstream.getEofTuples();
- assertEquals(numWorkers, eofTuples.size()); //There should be an EOF tuple for each worker.
+ //Test the eofTuples
+
+ Map<String, Tuple> eofTuples = pstream.getEofTuples();
+ assertEquals(numWorkers, eofTuples.size()); //There should be an EOF tuple for each worker.
+ }finally {
+ solrClientCache.close();
+ }
}
@@ -226,12 +250,21 @@ public void testMultipleFqClauses() throws Exception {
streamFactory.withCollectionZkHost(COLLECTIONORALIAS, zkHost);
- ModifiableSolrParams params = new ModifiableSolrParams(mapParams("q", "*:*", "fl", "id,a_i",
- "sort", "a_i asc", "fq", "a_ss:hello0", "fq", "a_ss:hello1"));
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, params);
- List<Tuple> tuples = getTuples(stream);
- assertEquals("Multiple fq clauses should have been honored", 1, tuples.size());
- assertEquals("should only have gotten back document 0", "0", tuples.get(0).getString("id"));
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+ ModifiableSolrParams params = new ModifiableSolrParams(mapParams("q", "*:*", "fl", "id,a_i",
+ "sort", "a_i asc", "fq", "a_ss:hello0", "fq", "a_ss:hello1"));
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, params);
+ stream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(stream);
+ assertEquals("Multiple fq clauses should have been honored", 1, tuples.size());
+ assertEquals("should only have gotten back document 0", "0", tuples.get(0).getString("id"));
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -245,15 +278,20 @@ public void testRankStream() throws Exception {
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
- SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
- RankStream rstream = new RankStream(stream, 3, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
- List<Tuple> tuples = getTuples(rstream);
-
- assertEquals(3, tuples.size());
- assertOrder(tuples, 4,3,2);
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+ try {
+ SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
+ RankStream rstream = new RankStream(stream, 3, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
+ rstream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(rstream);
+ assertEquals(3, tuples.size());
+ assertOrder(tuples, 4, 3, 2);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -272,22 +310,30 @@ public void testParallelRankStream() throws Exception {
.add(id, "10", "a_s", "hello1", "a_i", "10", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
- RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
- ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
- attachStreamFactory(pstream);
- List<Tuple> tuples = getTuples(pstream);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+ try {
+ SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
+ RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
+ ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
+ attachStreamFactory(pstream);
+ pstream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(pstream);
- assertEquals(10, tuples.size());
- assertOrder(tuples, 10,9,8,7,6,5,4,3,2,0);
+ assertEquals(10, tuples.size());
+ assertOrder(tuples, 10, 9, 8, 7, 6, 5, 4, 3, 2, 0);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
-public void testTrace() throws Exception {
+ public void testTrace() throws Exception {
- new UpdateRequest()
+ new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
.add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
@@ -300,15 +346,24 @@ public void testTrace() throws Exception {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- //Test with spaces in the parameter lists.
- SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,a_f", "sort", "a_s asc,a_f asc");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- stream.setTrace(true);
- List<Tuple> tuples = getTuples(stream);
- assertEquals(COLLECTIONORALIAS, tuples.get(0).get("_COLLECTION_"));
- assertEquals(COLLECTIONORALIAS, tuples.get(1).get("_COLLECTION_"));
- assertEquals(COLLECTIONORALIAS, tuples.get(2).get("_COLLECTION_"));
- assertEquals(COLLECTIONORALIAS, tuples.get(3).get("_COLLECTION_"));
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+ //Test with spaces in the parameter lists.
+ SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,a_f", "sort", "a_s asc,a_f asc");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ stream.setTrace(true);
+ stream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(stream);
+ assertEquals(COLLECTIONORALIAS, tuples.get(0).get("_COLLECTION_"));
+ assertEquals(COLLECTIONORALIAS, tuples.get(1).get("_COLLECTION_"));
+ assertEquals(COLLECTIONORALIAS, tuples.get(2).get("_COLLECTION_"));
+ assertEquals(COLLECTIONORALIAS, tuples.get(3).get("_COLLECTION_"));
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -327,52 +382,60 @@ public void testTrace() throws Exception {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- //Test with spaces in the parameter lists.
- SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- ReducerStream rstream = new ReducerStream(stream,
- new FieldEqualitor("a_s"),
- new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
-
- List<Tuple> tuples = getTuples(rstream);
-
- assertEquals(3, tuples.size());
-
- Tuple t0 = tuples.get(0);
- List<Map> maps0 = t0.getMaps("group");
- assertMaps(maps0, 0, 2, 1, 9);
-
- Tuple t1 = tuples.get(1);
- List<Map> maps1 = t1.getMaps("group");
- assertMaps(maps1, 3, 5, 7, 8);
-
- Tuple t2 = tuples.get(2);
- List<Map> maps2 = t2.getMaps("group");
- assertMaps(maps2, 4, 6);
-
- //Test with spaces in the parameter lists using a comparator
- sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
- stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- rstream = new ReducerStream(stream,
- new FieldComparator("a_s", ComparatorOrder.ASCENDING),
- new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
-
- tuples = getTuples(rstream);
-
- assertEquals(3, tuples.size());
-
- t0 = tuples.get(0);
- maps0 = t0.getMaps("group");
- assertMaps(maps0, 9, 1, 2, 0);
-
- t1 = tuples.get(1);
- maps1 = t1.getMaps("group");
- assertMaps(maps1, 8, 7, 5, 3);
-
- t2 = tuples.get(2);
- maps2 = t2.getMaps("group");
- assertMaps(maps2, 6, 4);
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+ //Test with spaces in the parameter lists.
+ SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ ReducerStream rstream = new ReducerStream(stream,
+ new FieldEqualitor("a_s"),
+ new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
+
+ rstream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(rstream);
+
+ assertEquals(3, tuples.size());
+
+ Tuple t0 = tuples.get(0);
+ List<Map> maps0 = t0.getMaps("group");
+ assertMaps(maps0, 0, 2, 1, 9);
+
+ Tuple t1 = tuples.get(1);
+ List<Map> maps1 = t1.getMaps("group");
+ assertMaps(maps1, 3, 5, 7, 8);
+
+ Tuple t2 = tuples.get(2);
+ List<Map> maps2 = t2.getMaps("group");
+ assertMaps(maps2, 4, 6);
+
+ //Test with spaces in the parameter lists using a comparator
+ sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
+ stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ rstream = new ReducerStream(stream,
+ new FieldComparator("a_s", ComparatorOrder.ASCENDING),
+ new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
+ rstream.setStreamContext(streamContext);
+ tuples = getTuples(rstream);
+
+ assertEquals(3, tuples.size());
+
+ t0 = tuples.get(0);
+ maps0 = t0.getMaps("group");
+ assertMaps(maps0, 9, 1, 2, 0);
+
+ t1 = tuples.get(1);
+ maps1 = t1.getMaps("group");
+ assertMaps(maps1, 8, 7, 5, 3);
+
+ t2 = tuples.get(2);
+ maps2 = t2.getMaps("group");
+ assertMaps(maps2, 6, 4);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -392,17 +455,24 @@ public void testTrace() throws Exception {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- //Test with spaces in the parameter lists.
- SolrParams sParamsA = mapParams("q", "blah", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- ReducerStream rstream = new ReducerStream(stream,
- new FieldEqualitor("a_s"),
- new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
-
- List<Tuple> tuples = getTuples(rstream);
-
- assertEquals(0, tuples.size());
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+ //Test with spaces in the parameter lists.
+ SolrParams sParamsA = mapParams("q", "blah", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ ReducerStream rstream = new ReducerStream(stream,
+ new FieldEqualitor("a_s"),
+ new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
+ rstream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(rstream);
+
+ assertEquals(0, tuples.size());
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -421,56 +491,65 @@ public void testTrace() throws Exception {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
- ReducerStream rstream = new ReducerStream(stream,
- new FieldEqualitor("a_s"),
- new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
- ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
- attachStreamFactory(pstream);
- List<Tuple> tuples = getTuples(pstream);
+ try {
+ SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- assertEquals(3, tuples.size());
+ ReducerStream rstream = new ReducerStream(stream,
+ new FieldEqualitor("a_s"),
+ new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
+ ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+ attachStreamFactory(pstream);
+ pstream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(pstream);
- Tuple t0 = tuples.get(0);
- List<Map> maps0 = t0.getMaps("group");
- assertMaps(maps0, 9, 1, 2, 0);
+ assertEquals(3, tuples.size());
- Tuple t1 = tuples.get(1);
- List<Map> maps1 = t1.getMaps("group");
- assertMaps(maps1, 8, 7, 5, 3);
+ Tuple t0 = tuples.get(0);
+ List<Map> maps0 = t0.getMaps("group");
+ assertMaps(maps0, 9, 1, 2, 0);
- Tuple t2 = tuples.get(2);
- List<Map> maps2 = t2.getMaps("group");
- assertMaps(maps2, 6, 4);
+ Tuple t1 = tuples.get(1);
+ List<Map> maps1 = t1.getMaps("group");
+ assertMaps(maps1, 8, 7, 5, 3);
- //Test Descending with Ascending subsort
+ Tuple t2 = tuples.get(2);
+ List<Map> maps2 = t2.getMaps("group");
+ assertMaps(maps2, 6, 4);
- sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
- stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ //Test Descending with Ascending subsort
- rstream = new ReducerStream(stream,
- new FieldEqualitor("a_s"),
- new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 3));
- pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.DESCENDING));
- attachStreamFactory(pstream);
- tuples = getTuples(pstream);
+ sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
+ stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- assertEquals(3, tuples.size());
+ rstream = new ReducerStream(stream,
+ new FieldEqualitor("a_s"),
+ new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 3));
+ pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.DESCENDING));
+ attachStreamFactory(pstream);
+ pstream.setStreamContext(streamContext);
+ tuples = getTuples(pstream);
- t0 = tuples.get(0);
- maps0 = t0.getMaps("group");
- assertMaps(maps0, 4, 6);
+ assertEquals(3, tuples.size());
- t1 = tuples.get(1);
- maps1 = t1.getMaps("group");
- assertMaps(maps1, 3, 5, 7);
+ t0 = tuples.get(0);
+ maps0 = t0.getMaps("group");
+ assertMaps(maps0, 4, 6);
- t2 = tuples.get(2);
- maps2 = t2.getMaps("group");
- assertMaps(maps2, 0, 2, 1);
+ t1 = tuples.get(1);
+ maps1 = t1.getMaps("group");
+ assertMaps(maps1, 3, 5, 7);
+ t2 = tuples.get(2);
+ maps2 = t2.getMaps("group");
+ assertMaps(maps2, 0, 2, 1);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -490,24 +569,33 @@ public void testTrace() throws Exception {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
//Test an error that comes originates from the /select handler
- SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- ExceptionStream estream = new ExceptionStream(stream);
- Tuple t = getTuple(estream);
- assertTrue(t.EOF);
- assertTrue(t.EXCEPTION);
- assertTrue(t.getException().contains("sort param field can't be found: blah"));
-
- //Test an error that comes originates from the /export handler
- sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt", "/export");
- stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- estream = new ExceptionStream(stream);
- t = getTuple(estream);
- assertTrue(t.EOF);
- assertTrue(t.EXCEPTION);
- //The /export handler will pass through a real exception.
- assertTrue(t.getException().contains("undefined field:"));
+ try {
+ SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ ExceptionStream estream = new ExceptionStream(stream);
+ estream.setStreamContext(streamContext);
+ Tuple t = getTuple(estream);
+ assertTrue(t.EOF);
+ assertTrue(t.EXCEPTION);
+ assertTrue(t.getException().contains("sort param field can't be found: blah"));
+
+ //Test an error that comes originates from the /export handler
+ sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt", "/export");
+ stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ estream = new ExceptionStream(stream);
+ estream.setStreamContext(streamContext);
+ t = getTuple(estream);
+ assertTrue(t.EOF);
+ assertTrue(t.EXCEPTION);
+ //The /export handler will pass through a real exception.
+ assertTrue(t.getException().contains("undefined field:"));
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -577,48 +665,55 @@ public void testTrace() throws Exception {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- SolrParams sParamsA = mapParams("q", "*:*");
-
- Metric[] metrics = {new SumMetric("a_i"),
- new SumMetric("a_f"),
- new MinMetric("a_i"),
- new MinMetric("a_f"),
- new MaxMetric("a_i"),
- new MaxMetric("a_f"),
- new MeanMetric("a_i"),
- new MeanMetric("a_f"),
- new CountMetric()};
-
- StatsStream statsStream = new StatsStream(zkHost, COLLECTIONORALIAS, sParamsA, metrics);
-
- List<Tuple> tuples = getTuples(statsStream);
-
- assertEquals(1, tuples.size());
-
- //Test Long and Double Sums
-
- Tuple tuple = tuples.get(0);
-
- Double sumi = tuple.getDouble("sum(a_i)");
- Double sumf = tuple.getDouble("sum(a_f)");
- Double mini = tuple.getDouble("min(a_i)");
- Double minf = tuple.getDouble("min(a_f)");
- Double maxi = tuple.getDouble("max(a_i)");
- Double maxf = tuple.getDouble("max(a_f)");
- Double avgi = tuple.getDouble("avg(a_i)");
- Double avgf = tuple.getDouble("avg(a_f)");
- Double count = tuple.getDouble("count(*)");
-
- assertEquals(70, sumi.longValue());
- assertEquals(55.0, sumf.doubleValue(), 0.01);
- assertEquals(0.0, mini.doubleValue(), 0.01);
- assertEquals(1.0, minf.doubleValue(), 0.01);
- assertEquals(14.0, maxi.doubleValue(), 0.01);
- assertEquals(10.0, maxf.doubleValue(), 0.01);
- assertEquals(7.0, avgi.doubleValue(), .01);
- assertEquals(5.5, avgf.doubleValue(), .001);
- assertEquals(10, count.doubleValue(), .01);
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+ SolrParams sParamsA = mapParams("q", "*:*");
+
+ Metric[] metrics = {new SumMetric("a_i"),
+ new SumMetric("a_f"),
+ new MinMetric("a_i"),
+ new MinMetric("a_f"),
+ new MaxMetric("a_i"),
+ new MaxMetric("a_f"),
+ new MeanMetric("a_i"),
+ new MeanMetric("a_f"),
+ new CountMetric()};
+
+ StatsStream statsStream = new StatsStream(zkHost, COLLECTIONORALIAS, sParamsA, metrics);
+ statsStream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(statsStream);
+
+ assertEquals(1, tuples.size());
+
+ //Test Long and Double Sums
+
+ Tuple tuple = tuples.get(0);
+
+ Double sumi = tuple.getDouble("sum(a_i)");
+ Double sumf = tuple.getDouble("sum(a_f)");
+ Double mini = tuple.getDouble("min(a_i)");
+ Double minf = tuple.getDouble("min(a_f)");
+ Double maxi = tuple.getDouble("max(a_i)");
+ Double maxf = tuple.getDouble("max(a_f)");
+ Double avgi = tuple.getDouble("avg(a_i)");
+ Double avgf = tuple.getDouble("avg(a_f)");
+ Double count = tuple.getDouble("count(*)");
+
+ assertEquals(70, sumi.longValue());
+ assertEquals(55.0, sumf.doubleValue(), 0.01);
+ assertEquals(0.0, mini.doubleValue(), 0.01);
+ assertEquals(1.0, minf.doubleValue(), 0.01);
+ assertEquals(14.0, maxi.doubleValue(), 0.01);
+ assertEquals(10.0, maxf.doubleValue(), 0.01);
+ assertEquals(7.0, avgi.doubleValue(), .01);
+ assertEquals(5.5, avgf.doubleValue(), .001);
+ assertEquals(10, count.doubleValue(), .01);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -637,344 +732,352 @@ public void testTrace() throws Exception {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
-
- Bucket[] buckets = {new Bucket("a_s")};
-
- Metric[] metrics = {new SumMetric("a_i"),
- new SumMetric("a_f"),
- new MinMetric("a_i"),
- new MinMetric("a_f"),
- new MaxMetric("a_i"),
- new MaxMetric("a_f"),
- new MeanMetric("a_i"),
- new MeanMetric("a_f"),
- new CountMetric()};
-
- FieldComparator[] sorts = {new FieldComparator("sum(a_i)",
- ComparatorOrder.ASCENDING)};
-
- FacetStream facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
-
- List<Tuple> tuples = getTuples(facetStream);
-
- assert(tuples.size() == 3);
-
- //Test Long and Double Sums
-
- Tuple tuple = tuples.get(0);
- String bucket = tuple.getString("a_s");
- Double sumi = tuple.getDouble("sum(a_i)");
- Double sumf = tuple.getDouble("sum(a_f)");
- Double mini = tuple.getDouble("min(a_i)");
- Double minf = tuple.getDouble("min(a_f)");
- Double maxi = tuple.getDouble("max(a_i)");
- Double maxf = tuple.getDouble("max(a_f)");
- Double avgi = tuple.getDouble("avg(a_i)");
- Double avgf = tuple.getDouble("avg(a_f)");
- Double count = tuple.getDouble("count(*)");
-
- assertEquals("hello4", bucket);
- assertEquals(15, sumi.longValue());
- assertEquals(11.0, sumf.doubleValue(), 0.01);
- assertEquals(4.0, mini.doubleValue(), 0.01);
- assertEquals(4.0, minf.doubleValue(), 0.01);
- assertEquals(11.0, maxi.doubleValue(), 0.01);
- assertEquals(7.0, maxf.doubleValue(), 0.01);
- assertEquals(7.5, avgi.doubleValue(), 0.01);
- assertEquals(5.5, avgf.doubleValue(), 0.01);
- assertEquals(2, count.doubleValue(), 0.01);
-
- tuple = tuples.get(1);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello0", bucket);
- assertEquals(17, sumi.doubleValue(), .01);
- assertEquals(18, sumf.doubleValue(), .01);
- assertEquals(0.0, mini.doubleValue(), .01);
- assertEquals(1.0, minf.doubleValue(), .01);
- assertEquals(14.0, maxi.doubleValue(), .01);
- assertEquals(10.0, maxf.doubleValue(), .01);
- assertEquals(4.25, avgi.doubleValue(), .01);
- assertEquals(4.5, avgf.doubleValue(), .01);
- assertEquals(4, count.doubleValue(), .01);
-
- tuple = tuples.get(2);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello3", bucket);
- assertEquals(38.0, sumi.doubleValue(), 0.01);
- assertEquals(26.0, sumf.doubleValue(), 0.01);
- assertEquals(3.0, mini.doubleValue(), 0.01);
- assertEquals(3.0, minf.doubleValue(), 0.01);
- assertEquals(13.0, maxi.doubleValue(), 0.01);
- assertEquals(9.0, maxf.doubleValue(), 0.01);
- assertEquals(9.5, avgi.doubleValue(), 0.01);
- assertEquals(6.5, avgf.doubleValue(), 0.01);
- assertEquals(4, count.doubleValue(), 0.01);
-
-
- //Reverse the Sort.
-
- sorts[0] = new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING);
-
- facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
-
- tuples = getTuples(facetStream);
-
- assertEquals(3, tuples.size());
-
- //Test Long and Double Sums
-
- tuple = tuples.get(0);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello3", bucket);
- assertEquals(38, sumi.doubleValue(), 0.1);
- assertEquals(26, sumf.doubleValue(), 0.1);
- assertEquals(3, mini.doubleValue(), 0.1);
- assertEquals(3, minf.doubleValue(), 0.1);
- assertEquals(13, maxi.doubleValue(), 0.1);
- assertEquals(9, maxf.doubleValue(), 0.1);
- assertEquals(9.5, avgi.doubleValue(), 0.1);
- assertEquals(6.5, avgf.doubleValue(), 0.1);
- assertEquals(4, count.doubleValue(), 0.1);
-
- tuple = tuples.get(1);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello0", bucket);
- assertEquals(17, sumi.doubleValue(), 0.01);
- assertEquals(18, sumf.doubleValue(), 0.01);
- assertEquals(0, mini.doubleValue(), 0.01);
- assertEquals(1, minf.doubleValue(), 0.01);
- assertEquals(14, maxi.doubleValue(), 0.01);
- assertEquals(10, maxf.doubleValue(), 0.01);
- assertEquals(4.25, avgi.doubleValue(), 0.01);
- assertEquals(4.5, avgf.doubleValue(), 0.01);
- assertEquals(4, count.doubleValue(), 0.01);
-
- tuple = tuples.get(2);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello4", bucket);
- assertEquals(15, sumi.longValue());
- assertEquals(11, sumf.doubleValue(), 0.01);
- assertEquals(4.0, mini.doubleValue(), 0.01);
- assertEquals(4.0, minf.doubleValue(), 0.01);
- assertEquals(11.0, maxi.doubleValue(), 0.01);
- assertEquals(7.0, maxf.doubleValue(), 0.01);
- assertEquals(7.5, avgi.doubleValue(), 0.01);
- assertEquals(5.5, avgf.doubleValue(), 0.01);
- assertEquals(2, count.doubleValue(), 0.01);
-
-
- //Test index sort
-
- sorts[0] = new FieldComparator("a_s", ComparatorOrder.DESCENDING);
-
-
- facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
-
- tuples = getTuples(facetStream);
-
- assertEquals(3, tuples.size());
-
-
- tuple = tuples.get(0);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
-
- assertEquals("hello4", bucket);
- assertEquals(15, sumi.longValue());
- assertEquals(11, sumf.doubleValue(), 0.01);
- assertEquals(4, mini.doubleValue(), 0.01);
- assertEquals(4, minf.doubleValue(), 0.01);
- assertEquals(11, maxi.doubleValue(), 0.01);
- assertEquals(7, maxf.doubleValue(), 0.01);
- assertEquals(7.5, avgi.doubleValue(), 0.01);
- assertEquals(5.5, avgf.doubleValue(), 0.01);
- assertEquals(2, count.doubleValue(), 0.01);
-
- tuple = tuples.get(1);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertTrue(bucket.equals("hello3"));
- assertTrue(sumi.doubleValue() == 38.0D);
- assertTrue(sumf.doubleValue() == 26.0D);
- assertTrue(mini.doubleValue() == 3.0D);
- assertTrue(minf.doubleValue() == 3.0D);
- assertTrue(maxi.doubleValue() == 13.0D);
- assertTrue(maxf.doubleValue() == 9.0D);
- assertTrue(avgi.doubleValue() == 9.5D);
- assertTrue(avgf.doubleValue() == 6.5D);
- assertTrue(count.doubleValue() == 4);
-
- tuple = tuples.get(2);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello0", bucket);
- assertEquals(17, sumi.doubleValue(), 0.01);
- assertEquals(18, sumf.doubleValue(), 0.01);
- assertEquals(0, mini.doubleValue(), 0.01);
- assertEquals(1, minf.doubleValue(), 0.01);
- assertEquals(14, maxi.doubleValue(), 0.01);
- assertEquals(10, maxf.doubleValue(), 0.01);
- assertEquals(4.25, avgi.doubleValue(), 0.01);
- assertEquals(4.5, avgf.doubleValue(), 0.01);
- assertEquals(4, count.doubleValue(), 0.01);
-
- //Test index sort
-
- sorts[0] = new FieldComparator("a_s", ComparatorOrder.ASCENDING);
-
- facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
-
- tuples = getTuples(facetStream);
-
- assertEquals(3, tuples.size());
-
- tuple = tuples.get(0);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello0", bucket);
- assertEquals(17, sumi.doubleValue(), 0.01);
- assertEquals(18, sumf.doubleValue(), 0.01);
- assertEquals(0, mini.doubleValue(), 0.01);
- assertEquals(1, minf.doubleValue(), 0.01);
- assertEquals(14, maxi.doubleValue(), 0.01);
- assertEquals(10, maxf.doubleValue(), 0.01);
- assertEquals(4.25, avgi.doubleValue(), 0.0001);
- assertEquals(4.5, avgf.doubleValue(), 0.001);
- assertEquals(4, count.doubleValue(), 0.01);
-
- tuple = tuples.get(1);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello3", bucket);
- assertEquals(38, sumi.doubleValue(), 0.01);
- assertEquals(26, sumf.doubleValue(), 0.01);
- assertEquals(3, mini.doubleValue(), 0.01);
- assertEquals(3, minf.doubleValue(), 0.01);
- assertEquals(13, maxi.doubleValue(), 0.01);
- assertEquals(9, maxf.doubleValue(), 0.01);
- assertEquals(9.5, avgi.doubleValue(), 0.01);
- assertEquals(6.5, avgf.doubleValue(), 0.01);
- assertEquals(4, count.doubleValue(), 0.01);
-
- tuple = tuples.get(2);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello4", bucket);
- assertEquals(15, sumi.longValue());
- assertEquals(11.0, sumf.doubleValue(), 0.1);
- assertEquals(4.0, mini.doubleValue(), 0.1);
- assertEquals(4.0, minf.doubleValue(), 0.1);
- assertEquals(11.0, maxi.doubleValue(), 0.1);
- assertEquals(7.0, maxf.doubleValue(), 0.1);
- assertEquals(7.5, avgi.doubleValue(), 0.1);
- assertEquals(5.5, avgf.doubleValue(), 0.1);
- assertEquals(2, count.doubleValue(), 0.1);
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+ SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
+
+ Bucket[] buckets = {new Bucket("a_s")};
+
+ Metric[] metrics = {new SumMetric("a_i"),
+ new SumMetric("a_f"),
+ new MinMetric("a_i"),
+ new MinMetric("a_f"),
+ new MaxMetric("a_i"),
+ new MaxMetric("a_f"),
+ new MeanMetric("a_i"),
+ new MeanMetric("a_f"),
+ new CountMetric()};
+
+ FieldComparator[] sorts = {new FieldComparator("sum(a_i)",
+ ComparatorOrder.ASCENDING)};
+
+ FacetStream facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
+
+ List<Tuple> tuples = getTuples(facetStream);
+
+ assert (tuples.size() == 3);
+
+ //Test Long and Double Sums
+
+ Tuple tuple = tuples.get(0);
+ String bucket = tuple.getString("a_s");
+ Double sumi = tuple.getDouble("sum(a_i)");
+ Double sumf = tuple.getDouble("sum(a_f)");
+ Double mini = tuple.getDouble("min(a_i)");
+ Double minf = tuple.getDouble("min(a_f)");
+ Double maxi = tuple.getDouble("max(a_i)");
+ Double maxf = tuple.getDouble("max(a_f)");
+ Double avgi = tuple.getDouble("avg(a_i)");
+ Double avgf = tuple.getDouble("avg(a_f)");
+ Double count = tuple.getDouble("count(*)");
+
+ assertEquals("hello4", bucket);
+ assertEquals(15, sumi.longValue());
+ assertEquals(11.0, sumf.doubleValue(), 0.01);
+ assertEquals(4.0, mini.doubleValue(), 0.01);
+ assertEquals(4.0, minf.doubleValue(), 0.01);
+ assertEquals(11.0, maxi.doubleValue(), 0.01);
+ assertEquals(7.0, maxf.doubleValue(), 0.01);
+ assertEquals(7.5, avgi.doubleValue(), 0.01);
+ assertEquals(5.5, avgf.doubleValue(), 0.01);
+ assertEquals(2, count.doubleValue(), 0.01);
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello0", bucket);
+ assertEquals(17, sumi.doubleValue(), .01);
+ assertEquals(18, sumf.doubleValue(), .01);
+ assertEquals(0.0, mini.doubleValue(), .01);
+ assertEquals(1.0, minf.doubleValue(), .01);
+ assertEquals(14.0, maxi.doubleValue(), .01);
+ assertEquals(10.0, maxf.doubleValue(), .01);
+ assertEquals(4.25, avgi.doubleValue(), .01);
+ assertEquals(4.5, avgf.doubleValue(), .01);
+ assertEquals(4, count.doubleValue(), .01);
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello3", bucket);
+ assertEquals(38.0, sumi.doubleValue(), 0.01);
+ assertEquals(26.0, sumf.doubleValue(), 0.01);
+ assertEquals(3.0, mini.doubleValue(), 0.01);
+ assertEquals(3.0, minf.doubleValue(), 0.01);
+ assertEquals(13.0, maxi.doubleValue(), 0.01);
+ assertEquals(9.0, maxf.doubleValue(), 0.01);
+ assertEquals(9.5, avgi.doubleValue(), 0.01);
+ assertEquals(6.5, avgf.doubleValue(), 0.01);
+ assertEquals(4, count.doubleValue(), 0.01);
+
+
+ //Reverse the Sort.
+
+ sorts[0] = new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING);
+
+ facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
+
+ tuples = getTuples(facetStream);
+
+ assertEquals(3, tuples.size());
+
+ //Test Long and Double Sums
+
+ tuple = tuples.get(0);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello3", bucket);
+ assertEquals(38, sumi.doubleValue(), 0.1);
+ assertEquals(26, sumf.doubleValue(), 0.1);
+ assertEquals(3, mini.doubleValue(), 0.1);
+ assertEquals(3, minf.doubleValue(), 0.1);
+ assertEquals(13, maxi.doubleValue(), 0.1);
+ assertEquals(9, maxf.doubleValue(), 0.1);
+ assertEquals(9.5, avgi.doubleValue(), 0.1);
+ assertEquals(6.5, avgf.doubleValue(), 0.1);
+ assertEquals(4, count.doubleValue(), 0.1);
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello0", bucket);
+ assertEquals(17, sumi.doubleValue(), 0.01);
+ assertEquals(18, sumf.doubleValue(), 0.01);
+ assertEquals(0, mini.doubleValue(), 0.01);
+ assertEquals(1, minf.doubleValue(), 0.01);
+ assertEquals(14, maxi.doubleValue(), 0.01);
+ assertEquals(10, maxf.doubleValue(), 0.01);
+ assertEquals(4.25, avgi.doubleValue(), 0.01);
+ assertEquals(4.5, avgf.doubleValue(), 0.01);
+ assertEquals(4, count.doubleValue(), 0.01);
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello4", bucket);
+ assertEquals(15, sumi.longValue());
+ assertEquals(11, sumf.doubleValue(), 0.01);
+ assertEquals(4.0, mini.doubleValue(), 0.01);
+ assertEquals(4.0, minf.doubleValue(), 0.01);
+ assertEquals(11.0, maxi.doubleValue(), 0.01);
+ assertEquals(7.0, maxf.doubleValue(), 0.01);
+ assertEquals(7.5, avgi.doubleValue(), 0.01);
+ assertEquals(5.5, avgf.doubleValue(), 0.01);
+ assertEquals(2, count.doubleValue(), 0.01);
+
+
+ //Test index sort
+
+ sorts[0] = new FieldComparator("a_s", ComparatorOrder.DESCENDING);
+
+
+ facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
+ facetStream.setStreamContext(streamContext);
+
+ tuples = getTuples(facetStream);
+
+ assertEquals(3, tuples.size());
+
+
+ tuple = tuples.get(0);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+
+ assertEquals("hello4", bucket);
+ assertEquals(15, sumi.longValue());
+ assertEquals(11, sumf.doubleValue(), 0.01);
+ assertEquals(4, mini.doubleValue(), 0.01);
+ assertEquals(4, minf.doubleValue(), 0.01);
+ assertEquals(11, maxi.doubleValue(), 0.01);
+ assertEquals(7, maxf.doubleValue(), 0.01);
+ assertEquals(7.5, avgi.doubleValue(), 0.01);
+ assertEquals(5.5, avgf.doubleValue(), 0.01);
+ assertEquals(2, count.doubleValue(), 0.01);
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello3"));
+ assertTrue(sumi.doubleValue() == 38.0D);
+ assertTrue(sumf.doubleValue() == 26.0D);
+ assertTrue(mini.doubleValue() == 3.0D);
+ assertTrue(minf.doubleValue() == 3.0D);
+ assertTrue(maxi.doubleValue() == 13.0D);
+ assertTrue(maxf.doubleValue() == 9.0D);
+ assertTrue(avgi.doubleValue() == 9.5D);
+ assertTrue(avgf.doubleValue() == 6.5D);
+ assertTrue(count.doubleValue() == 4);
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello0", bucket);
+ assertEquals(17, sumi.doubleValue(), 0.01);
+ assertEquals(18, sumf.doubleValue(), 0.01);
+ assertEquals(0, mini.doubleValue(), 0.01);
+ assertEquals(1, minf.doubleValue(), 0.01);
+ assertEquals(14, maxi.doubleValue(), 0.01);
+ assertEquals(10, maxf.doubleValue(), 0.01);
+ assertEquals(4.25, avgi.doubleValue(), 0.01);
+ assertEquals(4.5, avgf.doubleValue(), 0.01);
+ assertEquals(4, count.doubleValue(), 0.01);
+
+ //Test index sort
+
+ sorts[0] = new FieldComparator("a_s", ComparatorOrder.ASCENDING);
+
+ facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
+ facetStream.setStreamContext(streamContext);
+ tuples = getTuples(facetStream);
+
+ assertEquals(3, tuples.size());
+
+ tuple = tuples.get(0);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello0", bucket);
+ assertEquals(17, sumi.doubleValue(), 0.01);
+ assertEquals(18, sumf.doubleValue(), 0.01);
+ assertEquals(0, mini.doubleValue(), 0.01);
+ assertEquals(1, minf.doubleValue(), 0.01);
+ assertEquals(14, maxi.doubleValue(), 0.01);
+ assertEquals(10, maxf.doubleValue(), 0.01);
+ assertEquals(4.25, avgi.doubleValue(), 0.0001);
+ assertEquals(4.5, avgf.doubleValue(), 0.001);
+ assertEquals(4, count.doubleValue(), 0.01);
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello3", bucket);
+ assertEquals(38, sumi.doubleValue(), 0.01);
+ assertEquals(26, sumf.doubleValue(), 0.01);
+ assertEquals(3, mini.doubleValue(), 0.01);
+ assertEquals(3, minf.doubleValue(), 0.01);
+ assertEquals(13, maxi.doubleValue(), 0.01);
+ assertEquals(9, maxf.doubleValue(), 0.01);
+ assertEquals(9.5, avgi.doubleValue(), 0.01);
+ assertEquals(6.5, avgf.doubleValue(), 0.01);
+ assertEquals(4, count.doubleValue(), 0.01);
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello4", bucket);
+ assertEquals(15, sumi.longValue());
+ assertEquals(11.0, sumf.doubleValue(), 0.1);
+ assertEquals(4.0, mini.doubleValue(), 0.1);
+ assertEquals(4.0, minf.doubleValue(), 0.1);
+ assertEquals(11.0, maxi.doubleValue(), 0.1);
+ assertEquals(7.0, maxf.doubleValue(), 0.1);
+ assertEquals(7.5, avgi.doubleValue(), 0.1);
+ assertEquals(5.5, avgf.doubleValue(), 0.1);
+ assertEquals(2, count.doubleValue(), 0.1);
+ } finally {
+ solrClientCache.close();
+ }
}
@@ -1042,7 +1145,11 @@ public void testTrace() throws Exception {
List<String> selectOrder = ("asc".equals(sortDir)) ? Arrays.asList(ascOrder) : Arrays.asList(descOrder);
List<String> selectOrderBool = ("asc".equals(sortDir)) ? Arrays.asList(ascOrderBool) : Arrays.asList(descOrderBool);
SolrParams exportParams = mapParams("q", "*:*", "qt", "/export", "fl", "id," + field, "sort", field + " " + sortDir + ",id asc");
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
try (CloudSolrStream solrStream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, exportParams)) {
+ solrStream.setStreamContext(streamContext);
List<Tuple> tuples = getTuples(solrStream);
assertEquals("There should be exactly 32 responses returned", 32, tuples.size());
// Since the getTuples method doesn't return the EOF tuple, these two entries should be the same size.
@@ -1053,6 +1160,8 @@ public void testTrace() throws Exception {
"' RESTORE GETTING selectOrder from select statement after LUCENE-7548",
tuples.get(idx).getString("id"), (field.startsWith("b_") ? selectOrderBool.get(idx) : selectOrder.get(idx)));
}
+ } finally {
+ solrClientCache.close();
}
}
@@ -1081,7 +1190,12 @@ public void testTrace() throws Exception {
}
SolrParams sParams = mapParams("q", "*:*", "qt", "/export", "fl", fl.toString(), "sort", "id asc");
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
try (CloudSolrStream solrStream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams)) {
+ solrStream.setStreamContext(streamContext);
List<Tuple> tuples = getTuples(solrStream);
assertEquals("There should be exactly 32 responses returned", 32, tuples.size());
@@ -1097,6 +1211,8 @@ public void testTrace() throws Exception {
}
}
}
+ } finally {
+ solrClientCache.close();
}
}
@@ -1229,173 +1345,181 @@ public void testTrace() throws Exception {
.add(id, "9", "level1_s", "hello0", "level2_s", "b", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_i,a_f");
-
- Bucket[] buckets = {new Bucket("level1_s"), new Bucket("level2_s")};
-
- Metric[] metrics = {new SumMetric("a_i"),
- new CountMetric()};
-
- FieldComparator[] sorts = {new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING), new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING)};
-
- FacetStream facetStream = new FacetStream(
- zkHost,
- COLLECTIONORALIAS,
- sParamsA,
- buckets,
- metrics,
- sorts,
- 100);
-
- List<Tuple> tuples = getTuples(facetStream);
- assertEquals(6, tuples.size());
-
- Tuple tuple = tuples.get(0);
- String bucket1 = tuple.getString("level1_s");
- String bucket2 = tuple.getString("level2_s");
- Double sumi = tuple.getDouble("sum(a_i)");
- Double count = tuple.getDouble("count(*)");
-
- assertEquals("hello3", bucket1);
- assertEquals("b", bucket2);
- assertEquals(35, sumi.longValue());
- assertEquals(3, count, 0.1);
-
- tuple = tuples.get(1);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello0", bucket1);
- assertEquals("b", bucket2);
- assertEquals(15, sumi.longValue());
- assertEquals(2, count, 0.1);
-
- tuple = tuples.get(2);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello4", bucket1);
- assertEquals("b", bucket2);
- assertEquals(11, sumi.longValue());
- assertEquals(1, count.doubleValue(), 0.1);
-
- tuple = tuples.get(3);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello4", bucket1);
- assertEquals("a", bucket2);
- assertEquals(4, sumi.longValue());
- assertEquals(1, count.doubleValue(), 0.1);
-
- tuple = tuples.get(4);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello3", bucket1);
- assertEquals("a", bucket2);
- assertEquals(3, sumi.longValue());
- assertEquals(1, count.doubleValue(), 0.1);
-
- tuple = tuples.get(5);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello0", bucket1);
- assertEquals("a", bucket2);
- assertEquals(2, sumi.longValue());
- assertEquals(2, count.doubleValue(), 0.1);
-
- sorts[0] = new FieldComparator("level1_s", ComparatorOrder.DESCENDING );
- sorts[1] = new FieldComparator("level2_s", ComparatorOrder.DESCENDING );
- facetStream = new FacetStream(
- zkHost,
- COLLECTIONORALIAS,
- sParamsA,
- buckets,
- metrics,
- sorts,
- 100);
-
- tuples = getTuples(facetStream);
- assertEquals(6, tuples.size());
-
- tuple = tuples.get(0);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello4", bucket1);
- assertEquals("b", bucket2);
- assertEquals(11, sumi.longValue());
- assertEquals(1, count, 0.1);
-
- tuple = tuples.get(1);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello4", bucket1);
- assertEquals("a", bucket2);
- assertEquals(4, sumi.longValue());
- assertEquals(1, count.doubleValue(), 0.1);
-
- tuple = tuples.get(2);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello3", bucket1);
- assertEquals("b", bucket2);
- assertEquals(35, sumi.longValue());
- assertEquals(3, count.doubleValue(), 0.1);
-
- tuple = tuples.get(3);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello3", bucket1);
- assertEquals("a", bucket2);
- assertEquals(3, sumi.longValue());
- assertEquals(1, count.doubleValue(), 0.1);
-
- tuple = tuples.get(4);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello0", bucket1);
- assertEquals("b", bucket2);
- assertEquals(15, sumi.longValue());
- assertEquals(2, count.doubleValue(), 0.1);
-
- tuple = tuples.get(5);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello0", bucket1);
- assertEquals("a", bucket2);
- assertEquals(2, sumi.longValue());
- assertEquals(2, count.doubleValue(), 0.1);
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+
+ SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_i,a_f");
+
+ Bucket[] buckets = {new Bucket("level1_s"), new Bucket("level2_s")};
+
+ Metric[] metrics = {new SumMetric("a_i"),
+ new CountMetric()};
+
+ FieldComparator[] sorts = {new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING), new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING)};
+
+ FacetStream facetStream = new FacetStream(
+ zkHost,
+ COLLECTIONORALIAS,
+ sParamsA,
+ buckets,
+ metrics,
+ sorts,
+ 100);
+ facetStream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(facetStream);
+ assertEquals(6, tuples.size());
+
+ Tuple tuple = tuples.get(0);
+ String bucket1 = tuple.getString("level1_s");
+ String bucket2 = tuple.getString("level2_s");
+ Double sumi = tuple.getDouble("sum(a_i)");
+ Double count = tuple.getDouble("count(*)");
+
+ assertEquals("hello3", bucket1);
+ assertEquals("b", bucket2);
+ assertEquals(35, sumi.longValue());
+ assertEquals(3, count, 0.1);
+
+ tuple = tuples.get(1);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello0", bucket1);
+ assertEquals("b", bucket2);
+ assertEquals(15, sumi.longValue());
+ assertEquals(2, count, 0.1);
+
+ tuple = tuples.get(2);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello4", bucket1);
+ assertEquals("b", bucket2);
+ assertEquals(11, sumi.longValue());
+ assertEquals(1, count.doubleValue(), 0.1);
+
+ tuple = tuples.get(3);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello4", bucket1);
+ assertEquals("a", bucket2);
+ assertEquals(4, sumi.longValue());
+ assertEquals(1, count.doubleValue(), 0.1);
+
+ tuple = tuples.get(4);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello3", bucket1);
+ assertEquals("a", bucket2);
+ assertEquals(3, sumi.longValue());
+ assertEquals(1, count.doubleValue(), 0.1);
+
+ tuple = tuples.get(5);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello0", bucket1);
+ assertEquals("a", bucket2);
+ assertEquals(2, sumi.longValue());
+ assertEquals(2, count.doubleValue(), 0.1);
+
+ sorts[0] = new FieldComparator("level1_s", ComparatorOrder.DESCENDING);
+ sorts[1] = new FieldComparator("level2_s", ComparatorOrder.DESCENDING);
+ facetStream = new FacetStream(
+ zkHost,
+ COLLECTIONORALIAS,
+ sParamsA,
+ buckets,
+ metrics,
+ sorts,
+ 100);
+ facetStream.setStreamContext(streamContext);
+ tuples = getTuples(facetStream);
+ assertEquals(6, tuples.size());
+
+ tuple = tuples.get(0);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello4", bucket1);
+ assertEquals("b", bucket2);
+ assertEquals(11, sumi.longValue());
+ assertEquals(1, count, 0.1);
+
+ tuple = tuples.get(1);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello4", bucket1);
+ assertEquals("a", bucket2);
+ assertEquals(4, sumi.longValue());
+ assertEquals(1, count.doubleValue(), 0.1);
+
+ tuple = tuples.get(2);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello3", bucket1);
+ assertEquals("b", bucket2);
+ assertEquals(35, sumi.longValue());
+ assertEquals(3, count.doubleValue(), 0.1);
+
+ tuple = tuples.get(3);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello3", bucket1);
+ assertEquals("a", bucket2);
+ assertEquals(3, sumi.longValue());
+ assertEquals(1, count.doubleValue(), 0.1);
+
+ tuple = tuples.get(4);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello0", bucket1);
+ assertEquals("b", bucket2);
+ assertEquals(15, sumi.longValue());
+ assertEquals(2, count.doubleValue(), 0.1);
+
+ tuple = tuples.get(5);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello0", bucket1);
+ assertEquals("a", bucket2);
+ assertEquals(2, sumi.longValue());
+ assertEquals(2, count.doubleValue(), 0.1);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -1413,166 +1537,174 @@ public void testTrace() throws Exception {
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
- SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-
- Bucket[] buckets = {new Bucket("a_s")};
-
- Metric[] metrics = {new SumMetric("a_i"),
- new SumMetric("a_f"),
- new MinMetric("a_i"),
- new MinMetric("a_f"),
- new MaxMetric("a_i"),
- new MaxMetric("a_f"),
- new MeanMetric("a_i"),
- new MeanMetric("a_f"),
- new CountMetric()};
-
- RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
- List<Tuple> tuples = getTuples(rollupStream);
-
- assert(tuples.size() == 3);
-
- //Test Long and Double Sums
-
- Tuple tuple = tuples.get(0);
- String bucket = tuple.getString("a_s");
- Double sumi = tuple.getDouble("sum(a_i)");
- Double sumf = tuple.getDouble("sum(a_f)");
- Double mini = tuple.getDouble("min(a_i)");
- Double minf = tuple.getDouble("min(a_f)");
- Double maxi = tuple.getDouble("max(a_i)");
- Double maxf = tuple.getDouble("max(a_f)");
- Double avgi = tuple.getDouble("avg(a_i)");
- Double avgf = tuple.getDouble("avg(a_f)");
- Double count = tuple.getDouble("count(*)");
-
-
- assertEquals("hello0", bucket);
- assertEquals(17, sumi.doubleValue(), 0.001);
- assertEquals(18, sumf.doubleValue(), 0.001);
- assertEquals(0, mini.doubleValue(), 0.001);
- assertEquals(1, minf.doubleValue(), 0.001);
- assertEquals(14, maxi.doubleValue(), 0.001);
- assertEquals(10, maxf.doubleValue(), 0.001);
- assertEquals(4.25, avgi.doubleValue(), 0.001);
- assertEquals(4.5, avgf.doubleValue(), 0.001);
- assertEquals(4, count.doubleValue(), 0.001);
-
-
- tuple = tuples.get(1);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello3", bucket);
- assertEquals(38, sumi.doubleValue(), 0.001);
- assertEquals(26, sumf.doubleValue(), 0.001);
- assertEquals(3, mini.doubleValue(), 0.001);
- assertEquals(3, minf.doubleValue(), 0.001);
- assertEquals(13, maxi.doubleValue(), 0.001);
- assertEquals(9, maxf.doubleValue(), 0.001);
- assertEquals(9.5, avgi.doubleValue(), 0.001);
- assertEquals(6.5, avgf.doubleValue(), 0.001);
- assertEquals(4, count.doubleValue(), 0.001);
-
-
- tuple = tuples.get(2);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello4", bucket);
- assertEquals(15, sumi.longValue());
- assertEquals(11, sumf.doubleValue(), 0.01);
- assertEquals(4, mini.doubleValue(), 0.01);
- assertEquals(4, minf.doubleValue(), 0.01);
- assertEquals(11, maxi.doubleValue(), 0.01);
- assertEquals(7, maxf.doubleValue(), 0.01);
- assertEquals(7.5, avgi.doubleValue(), 0.01);
- assertEquals(5.5, avgf.doubleValue(), 0.01);
- assertEquals(2, count.doubleValue(), 0.01);
-
- // Test will null metrics
- rollupStream = new RollupStream(stream, buckets, metrics);
- tuples = getTuples(rollupStream);
-
- assert(tuples.size() == 3);
- tuple = tuples.get(0);
- bucket = tuple.getString("a_s");
- assertTrue(bucket.equals("hello0"));
-
- tuple = tuples.get(1);
- bucket = tuple.getString("a_s");
- assertTrue(bucket.equals("hello3"));
-
- tuple = tuples.get(2);
- bucket = tuple.getString("a_s");
- assertTrue(bucket.equals("hello4"));
-
-
- //Test will null value in the grouping field
- new UpdateRequest()
- .add(id, "12", "a_s", null, "a_i", "14", "a_f", "10")
- .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
- sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "qt", "/export");
- stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-
- Bucket[] buckets1 = {new Bucket("a_s")};
-
- Metric[] metrics1 = {new SumMetric("a_i"),
- new SumMetric("a_f"),
- new MinMetric("a_i"),
- new MinMetric("a_f"),
- new MaxMetric("a_i"),
- new MaxMetric("a_f"),
- new MeanMetric("a_i"),
- new MeanMetric("a_f"),
- new CountMetric()};
-
- rollupStream = new RollupStream(stream, buckets1, metrics1);
- tuples = getTuples(rollupStream);
- //Check that we've got the extra NULL bucket
- assertEquals(4, tuples.size());
- tuple = tuples.get(0);
- assertEquals("NULL", tuple.getString("a_s"));
-
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals(14, sumi.doubleValue(), 0.01);
- assertEquals(10, sumf.doubleValue(), 0.01);
- assertEquals(14, mini.doubleValue(), 0.01);
- assertEquals(10, minf.doubleValue(), 0.01);
- assertEquals(14, maxi.doubleValue(), 0.01);
- assertEquals(10, maxf.doubleValue(), 0.01);
- assertEquals(14, avgi.doubleValue(), 0.01);
- assertEquals(10, avgf.doubleValue(), 0.01);
- assertEquals(1, count.doubleValue(), 0.01);
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+ SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+
+ Bucket[] buckets = {new Bucket("a_s")};
+
+ Metric[] metrics = {new SumMetric("a_i"),
+ new SumMetric("a_f"),
+ new MinMetric("a_i"),
+ new MinMetric("a_f"),
+ new MaxMetric("a_i"),
+ new MaxMetric("a_f"),
+ new MeanMetric("a_i"),
+ new MeanMetric("a_f"),
+ new CountMetric()};
+
+ RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
+ rollupStream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(rollupStream);
+
+ assert (tuples.size() == 3);
+
+ //Test Long and Double Sums
+
+ Tuple tuple = tuples.get(0);
+ String bucket = tuple.getString("a_s");
+ Double sumi = tuple.getDouble("sum(a_i)");
+ Double sumf = tuple.getDouble("sum(a_f)");
+ Double mini = tuple.getDouble("min(a_i)");
+ Double minf = tuple.getDouble("min(a_f)");
+ Double maxi = tuple.getDouble("max(a_i)");
+ Double maxf = tuple.getDouble("max(a_f)");
+ Double avgi = tuple.getDouble("avg(a_i)");
+ Double avgf = tuple.getDouble("avg(a_f)");
+ Double count = tuple.getDouble("count(*)");
+
+
+ assertEquals("hello0", bucket);
+ assertEquals(17, sumi.doubleValue(), 0.001);
+ assertEquals(18, sumf.doubleValue(), 0.001);
+ assertEquals(0, mini.doubleValue(), 0.001);
+ assertEquals(1, minf.doubleValue(), 0.001);
+ assertEquals(14, maxi.doubleValue(), 0.001);
+ assertEquals(10, maxf.doubleValue(), 0.001);
+ assertEquals(4.25, avgi.doubleValue(), 0.001);
+ assertEquals(4.5, avgf.doubleValue(), 0.001);
+ assertEquals(4, count.doubleValue(), 0.001);
+
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello3", bucket);
+ assertEquals(38, sumi.doubleValue(), 0.001);
+ assertEquals(26, sumf.doubleValue(), 0.001);
+ assertEquals(3, mini.doubleValue(), 0.001);
+ assertEquals(3, minf.doubleValue(), 0.001);
+ assertEquals(13, maxi.doubleValue(), 0.001);
+ assertEquals(9, maxf.doubleValue(), 0.001);
+ assertEquals(9.5, avgi.doubleValue(), 0.001);
+ assertEquals(6.5, avgf.doubleValue(), 0.001);
+ assertEquals(4, count.doubleValue(), 0.001);
+
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello4", bucket);
+ assertEquals(15, sumi.longValue());
+ assertEquals(11, sumf.doubleValue(), 0.01);
+ assertEquals(4, mini.doubleValue(), 0.01);
+ assertEquals(4, minf.doubleValue(), 0.01);
+ assertEquals(11, maxi.doubleValue(), 0.01);
+ assertEquals(7, maxf.doubleValue(), 0.01);
+ assertEquals(7.5, avgi.doubleValue(), 0.01);
+ assertEquals(5.5, avgf.doubleValue(), 0.01);
+ assertEquals(2, count.doubleValue(), 0.01);
+
+ // Test will null metrics
+ rollupStream = new RollupStream(stream, buckets, metrics);
+ rollupStream.setStreamContext(streamContext);
+ tuples = getTuples(rollupStream);
+
+ assert (tuples.size() == 3);
+ tuple = tuples.get(0);
+ bucket = tuple.getString("a_s");
+ assertTrue(bucket.equals("hello0"));
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ assertTrue(bucket.equals("hello3"));
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ assertTrue(bucket.equals("hello4"));
+
+
+ //Test will null value in the grouping field
+ new UpdateRequest()
+ .add(id, "12", "a_s", null, "a_i", "14", "a_f", "10")
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+ sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "qt", "/export");
+ stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ Bucket[] buckets1 = {new Bucket("a_s")};
+
+ Metric[] metrics1 = {new SumMetric("a_i"),
+ new SumMetric("a_f"),
+ new MinMetric("a_i"),
+ new MinMetric("a_f"),
+ new MaxMetric("a_i"),
+ new MaxMetric("a_f"),
+ new MeanMetric("a_i"),
+ new MeanMetric("a_f"),
+ new CountMetric()};
+
+ rollupStream = new RollupStream(stream, buckets1, metrics1);
+ rollupStream.setStreamContext(streamContext);
+ tuples = getTuples(rollupStream);
+ //Check that we've got the extra NULL bucket
+ assertEquals(4, tuples.size());
+ tuple = tuples.get(0);
+ assertEquals("NULL", tuple.getString("a_s"));
+
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals(14, sumi.doubleValue(), 0.01);
+ assertEquals(10, sumf.doubleValue(), 0.01);
+ assertEquals(14, mini.doubleValue(), 0.01);
+ assertEquals(10, minf.doubleValue(), 0.01);
+ assertEquals(14, maxi.doubleValue(), 0.01);
+ assertEquals(10, maxf.doubleValue(), 0.01);
+ assertEquals(14, avgi.doubleValue(), 0.01);
+ assertEquals(10, avgf.doubleValue(), 0.01);
+ assertEquals(1, count.doubleValue(), 0.01);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -1583,66 +1715,71 @@ public void testTrace() throws Exception {
SolrClientCache cache = new SolrClientCache();
context.setSolrClientCache(cache);
- SolrParams sParams = mapParams("q", "a_s:hello0", "rows", "500", "fl", "id");
+ try {
+ SolrParams sParams = mapParams("q", "a_s:hello0", "rows", "500", "fl", "id");
- TopicStream topicStream = new TopicStream(zkHost,
- COLLECTIONORALIAS,
- COLLECTIONORALIAS,
- "50000000",
- -1,
- 1000000, sParams);
+ TopicStream topicStream = new TopicStream(zkHost,
+ COLLECTIONORALIAS,
+ COLLECTIONORALIAS,
+ "50000000",
+ -1,
+ 1000000, sParams);
- DaemonStream daemonStream = new DaemonStream(topicStream, "daemon1", 1000, 500);
- daemonStream.setStreamContext(context);
+ DaemonStream daemonStream = new DaemonStream(topicStream, "daemon1", 1000, 500);
+ daemonStream.setStreamContext(context);
- daemonStream.open();
+ daemonStream.open();
- // Wait for the checkpoint
- JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);
+ // Wait for the checkpoint
+ JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);
- SolrParams sParams1 = mapParams("qt", "/get", "ids", "50000000", "fl", "id");
- int count = 0;
- while(count == 0) {
- SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTIONORALIAS, sParams1);
- List<Tuple> tuples = getTuples(solrStream);
- count = tuples.size();
- if(count > 0) {
- Tuple t = tuples.get(0);
- assertTrue(t.getLong("id") == 50000000);
- } else {
- System.out.println("###### Waiting for checkpoint #######:" + count);
+ SolrParams sParams1 = mapParams("qt", "/get", "ids", "50000000", "fl", "id");
+ int count = 0;
+ while (count == 0) {
+ SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTIONORALIAS, sParams1);
+ solrStream.setStreamContext(context);
+ List<Tuple> tuples = getTuples(solrStream);
+ count = tuples.size();
+ if (count > 0) {
+ Tuple t = tuples.get(0);
+ assertTrue(t.getLong("id") == 50000000);
+ } else {
+ System.out.println("###### Waiting for checkpoint #######:" + count);
+ }
}
- }
- new UpdateRequest()
- .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
- .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
- .add(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3")
- .add(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4")
- .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
- .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+ new UpdateRequest()
+ .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+ .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+ .add(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3")
+ .add(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4")
+ .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- for(int i=0; i<5; i++) {
- daemonStream.read();
- }
+ for (int i = 0; i < 5; i++) {
+ daemonStream.read();
+ }
- new UpdateRequest()
- .add(id, "5", "a_s", "hello0", "a_i", "4", "a_f", "4")
- .add(id, "6", "a_s", "hello0", "a_i", "4", "a_f", "4")
- .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+ new UpdateRequest()
+ .add(id, "5", "a_s", "hello0", "a_i", "4", "a_f", "4")
+ .add(id, "6", "a_s", "hello0", "a_i", "4", "a_f", "4")
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- for(int i=0; i<2; i++) {
- daemonStream.read();
- }
+ for (int i = 0; i < 2; i++) {
+ daemonStream.read();
+ }
+
+ daemonStream.shutdown();
- daemonStream.shutdown();
+ Tuple tuple = daemonStream.read();
- Tuple tuple = daemonStream.read();
+ assertTrue(tuple.EOF);
+ daemonStream.close();
+ } finally {
+ cache.close();
+ }
- assertTrue(tuple.EOF);
- daemonStream.close();
- cache.close();
}
@@ -1662,99 +1799,107 @@ public void testTrace() throws Exception {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-
- Bucket[] buckets = {new Bucket("a_s")};
-
- Metric[] metrics = {new SumMetric("a_i"),
- new SumMetric("a_f"),
- new MinMetric("a_i"),
- new MinMetric("a_f"),
- new MaxMetric("a_i"),
- new MaxMetric("a_f"),
- new MeanMetric("a_i"),
- new MeanMetric("a_f"),
- new CountMetric()};
-
- RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
- ParallelStream parallelStream = parallelStream(rollupStream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
- attachStreamFactory(parallelStream);
- List<Tuple> tuples = getTuples(parallelStream);
-
- assertEquals(3, tuples.size());
-
- //Test Long and Double Sums
-
- Tuple tuple = tuples.get(0);
- String bucket = tuple.getString("a_s");
- Double sumi = tuple.getDouble("sum(a_i)");
- Double sumf = tuple.getDouble("sum(a_f)");
- Double mini = tuple.getDouble("min(a_i)");
- Double minf = tuple.getDouble("min(a_f)");
- Double maxi = tuple.getDouble("max(a_i)");
- Double maxf = tuple.getDouble("max(a_f)");
- Double avgi = tuple.getDouble("avg(a_i)");
- Double avgf = tuple.getDouble("avg(a_f)");
- Double count = tuple.getDouble("count(*)");
-
- assertEquals("hello0", bucket);
- assertEquals(17, sumi.doubleValue(), 0.001);
- assertEquals(18, sumf.doubleValue(), 0.001);
- assertEquals(0, mini.doubleValue(), 0.001);
- assertEquals(1, minf.doubleValue(), 0.001);
- assertEquals(14, maxi.doubleValue(), 0.001);
- assertEquals(10, maxf.doubleValue(), 0.001);
- assertEquals(4.25, avgi.doubleValue(), 0.001);
- assertEquals(4.5, avgf.doubleValue(), 0.001);
- assertEquals(4, count.doubleValue(), 0.001);
-
- tuple = tuples.get(1);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello3", bucket);
- assertEquals(38, sumi.doubleValue(), 0.001);
- assertEquals(26, sumf.doubleValue(), 0.001);
- assertEquals(3, mini.doubleValue(), 0.001);
- assertEquals(3, minf.doubleValue(), 0.001);
- assertEquals(13, maxi.doubleValue(), 0.001);
- assertEquals(9, maxf.doubleValue(), 0.001);
- assertEquals(9.5, avgi.doubleValue(), 0.001);
- assertEquals(6.5, avgf.doubleValue(), 0.001);
- assertEquals(4, count.doubleValue(), 0.001);
-
- tuple = tuples.get(2);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello4", bucket);
- assertEquals(15, sumi.longValue());
- assertEquals(11, sumf.doubleValue(), 0.001);
- assertEq
<TRUNCATED>