You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2017/04/12 02:47:21 UTC
[1/4] lucene-solr:branch_6x: SOLR-10274: Fix backport
Repository: lucene-solr
Updated Branches:
refs/heads/branch_6x 740d96767 -> 56e3bc7a5
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b03a7b1c/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index eeb41ae..711fb3d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -131,13 +131,20 @@ public void testUniqueStream() throws Exception {
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- SolrParams sParams = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
- UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
- List<Tuple> tuples = getTuples(ustream);
- assertEquals(4, tuples.size());
- assertOrder(tuples, 0,1,3,4);
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+ try {
+ SolrParams sParams = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
+ UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
+ ustream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(ustream);
+ assertEquals(4, tuples.size());
+ assertOrder(tuples, 0, 1, 3, 4);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -167,15 +174,22 @@ public void testNonePartitionKeys() throws Exception {
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+ try {
- SolrParams sParamsA = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- ParallelStream pstream = parallelStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
- attachStreamFactory(pstream);
- List<Tuple> tuples = getTuples(pstream);
-
- assert(tuples.size() == (10 * numWorkers)); // Each tuple will be double counted.
+ SolrParams sParamsA = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ ParallelStream pstream = parallelStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+ attachStreamFactory(pstream);
+ pstream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(pstream);
+ assert (tuples.size() == (10 * numWorkers)); // Each tuple will be double counted.
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -193,19 +207,29 @@ public void testParallelUniqueStream() throws Exception {
.add(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
- UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
- ParallelStream pstream = parallelStream(ustream, new FieldComparator("a_f", ComparatorOrder.ASCENDING));
- attachStreamFactory(pstream);
- List<Tuple> tuples = getTuples(pstream);
- assertEquals(5, tuples.size());
- assertOrder(tuples, 0, 1, 3, 4, 6);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
- //Test the eofTuples
+ SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
+ UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
+ ParallelStream pstream = parallelStream(ustream, new FieldComparator("a_f", ComparatorOrder.ASCENDING));
+ attachStreamFactory(pstream);
+ pstream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(pstream);
+ assertEquals(5, tuples.size());
+ assertOrder(tuples, 0, 1, 3, 4, 6);
- Map<String,Tuple> eofTuples = pstream.getEofTuples();
- assertEquals(numWorkers, eofTuples.size()); //There should be an EOF tuple for each worker.
+ //Test the eofTuples
+
+ Map<String, Tuple> eofTuples = pstream.getEofTuples();
+ assertEquals(numWorkers, eofTuples.size()); //There should be an EOF tuple for each worker.
+ }finally {
+ solrClientCache.close();
+ }
}
@@ -226,12 +250,21 @@ public void testMultipleFqClauses() throws Exception {
streamFactory.withCollectionZkHost(COLLECTIONORALIAS, zkHost);
- ModifiableSolrParams params = new ModifiableSolrParams(mapParams("q", "*:*", "fl", "id,a_i",
- "sort", "a_i asc", "fq", "a_ss:hello0", "fq", "a_ss:hello1"));
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, params);
- List<Tuple> tuples = getTuples(stream);
- assertEquals("Multiple fq clauses should have been honored", 1, tuples.size());
- assertEquals("should only have gotten back document 0", "0", tuples.get(0).getString("id"));
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+ ModifiableSolrParams params = new ModifiableSolrParams(mapParams("q", "*:*", "fl", "id,a_i",
+ "sort", "a_i asc", "fq", "a_ss:hello0", "fq", "a_ss:hello1"));
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, params);
+ stream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(stream);
+ assertEquals("Multiple fq clauses should have been honored", 1, tuples.size());
+ assertEquals("should only have gotten back document 0", "0", tuples.get(0).getString("id"));
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -245,15 +278,20 @@ public void testRankStream() throws Exception {
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
- SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
- RankStream rstream = new RankStream(stream, 3, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
- List<Tuple> tuples = getTuples(rstream);
-
- assertEquals(3, tuples.size());
- assertOrder(tuples, 4,3,2);
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+ try {
+ SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
+ RankStream rstream = new RankStream(stream, 3, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
+ rstream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(rstream);
+ assertEquals(3, tuples.size());
+ assertOrder(tuples, 4, 3, 2);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -272,22 +310,30 @@ public void testParallelRankStream() throws Exception {
.add(id, "10", "a_s", "hello1", "a_i", "10", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
- RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
- ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
- attachStreamFactory(pstream);
- List<Tuple> tuples = getTuples(pstream);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+ try {
+ SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
+ RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
+ ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
+ attachStreamFactory(pstream);
+ pstream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(pstream);
- assertEquals(10, tuples.size());
- assertOrder(tuples, 10,9,8,7,6,5,4,3,2,0);
+ assertEquals(10, tuples.size());
+ assertOrder(tuples, 10, 9, 8, 7, 6, 5, 4, 3, 2, 0);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
-public void testTrace() throws Exception {
+ public void testTrace() throws Exception {
- new UpdateRequest()
+ new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
.add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
@@ -300,15 +346,24 @@ public void testTrace() throws Exception {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- //Test with spaces in the parameter lists.
- SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,a_f", "sort", "a_s asc,a_f asc");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- stream.setTrace(true);
- List<Tuple> tuples = getTuples(stream);
- assertEquals(COLLECTIONORALIAS, tuples.get(0).get("_COLLECTION_"));
- assertEquals(COLLECTIONORALIAS, tuples.get(1).get("_COLLECTION_"));
- assertEquals(COLLECTIONORALIAS, tuples.get(2).get("_COLLECTION_"));
- assertEquals(COLLECTIONORALIAS, tuples.get(3).get("_COLLECTION_"));
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+ //Test with spaces in the parameter lists.
+ SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,a_f", "sort", "a_s asc,a_f asc");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ stream.setTrace(true);
+ stream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(stream);
+ assertEquals(COLLECTIONORALIAS, tuples.get(0).get("_COLLECTION_"));
+ assertEquals(COLLECTIONORALIAS, tuples.get(1).get("_COLLECTION_"));
+ assertEquals(COLLECTIONORALIAS, tuples.get(2).get("_COLLECTION_"));
+ assertEquals(COLLECTIONORALIAS, tuples.get(3).get("_COLLECTION_"));
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -327,52 +382,60 @@ public void testTrace() throws Exception {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- //Test with spaces in the parameter lists.
- SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- ReducerStream rstream = new ReducerStream(stream,
- new FieldEqualitor("a_s"),
- new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
-
- List<Tuple> tuples = getTuples(rstream);
-
- assertEquals(3, tuples.size());
-
- Tuple t0 = tuples.get(0);
- List<Map> maps0 = t0.getMaps("group");
- assertMaps(maps0, 0, 2, 1, 9);
-
- Tuple t1 = tuples.get(1);
- List<Map> maps1 = t1.getMaps("group");
- assertMaps(maps1, 3, 5, 7, 8);
-
- Tuple t2 = tuples.get(2);
- List<Map> maps2 = t2.getMaps("group");
- assertMaps(maps2, 4, 6);
-
- //Test with spaces in the parameter lists using a comparator
- sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
- stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- rstream = new ReducerStream(stream,
- new FieldComparator("a_s", ComparatorOrder.ASCENDING),
- new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
-
- tuples = getTuples(rstream);
-
- assertEquals(3, tuples.size());
-
- t0 = tuples.get(0);
- maps0 = t0.getMaps("group");
- assertMaps(maps0, 9, 1, 2, 0);
-
- t1 = tuples.get(1);
- maps1 = t1.getMaps("group");
- assertMaps(maps1, 8, 7, 5, 3);
-
- t2 = tuples.get(2);
- maps2 = t2.getMaps("group");
- assertMaps(maps2, 6, 4);
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+ //Test with spaces in the parameter lists.
+ SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ ReducerStream rstream = new ReducerStream(stream,
+ new FieldEqualitor("a_s"),
+ new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
+
+ rstream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(rstream);
+
+ assertEquals(3, tuples.size());
+
+ Tuple t0 = tuples.get(0);
+ List<Map> maps0 = t0.getMaps("group");
+ assertMaps(maps0, 0, 2, 1, 9);
+
+ Tuple t1 = tuples.get(1);
+ List<Map> maps1 = t1.getMaps("group");
+ assertMaps(maps1, 3, 5, 7, 8);
+
+ Tuple t2 = tuples.get(2);
+ List<Map> maps2 = t2.getMaps("group");
+ assertMaps(maps2, 4, 6);
+
+ //Test with spaces in the parameter lists using a comparator
+ sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
+ stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ rstream = new ReducerStream(stream,
+ new FieldComparator("a_s", ComparatorOrder.ASCENDING),
+ new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
+ rstream.setStreamContext(streamContext);
+ tuples = getTuples(rstream);
+
+ assertEquals(3, tuples.size());
+
+ t0 = tuples.get(0);
+ maps0 = t0.getMaps("group");
+ assertMaps(maps0, 9, 1, 2, 0);
+
+ t1 = tuples.get(1);
+ maps1 = t1.getMaps("group");
+ assertMaps(maps1, 8, 7, 5, 3);
+
+ t2 = tuples.get(2);
+ maps2 = t2.getMaps("group");
+ assertMaps(maps2, 6, 4);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -392,17 +455,24 @@ public void testTrace() throws Exception {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- //Test with spaces in the parameter lists.
- SolrParams sParamsA = mapParams("q", "blah", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- ReducerStream rstream = new ReducerStream(stream,
- new FieldEqualitor("a_s"),
- new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
-
- List<Tuple> tuples = getTuples(rstream);
-
- assertEquals(0, tuples.size());
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+ //Test with spaces in the parameter lists.
+ SolrParams sParamsA = mapParams("q", "blah", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ ReducerStream rstream = new ReducerStream(stream,
+ new FieldEqualitor("a_s"),
+ new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
+ rstream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(rstream);
+
+ assertEquals(0, tuples.size());
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -421,56 +491,65 @@ public void testTrace() throws Exception {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
- ReducerStream rstream = new ReducerStream(stream,
- new FieldEqualitor("a_s"),
- new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
- ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
- attachStreamFactory(pstream);
- List<Tuple> tuples = getTuples(pstream);
+ try {
+ SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- assertEquals(3, tuples.size());
+ ReducerStream rstream = new ReducerStream(stream,
+ new FieldEqualitor("a_s"),
+ new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
+ ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+ attachStreamFactory(pstream);
+ pstream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(pstream);
- Tuple t0 = tuples.get(0);
- List<Map> maps0 = t0.getMaps("group");
- assertMaps(maps0, 9, 1, 2, 0);
+ assertEquals(3, tuples.size());
- Tuple t1 = tuples.get(1);
- List<Map> maps1 = t1.getMaps("group");
- assertMaps(maps1, 8, 7, 5, 3);
+ Tuple t0 = tuples.get(0);
+ List<Map> maps0 = t0.getMaps("group");
+ assertMaps(maps0, 9, 1, 2, 0);
- Tuple t2 = tuples.get(2);
- List<Map> maps2 = t2.getMaps("group");
- assertMaps(maps2, 6, 4);
+ Tuple t1 = tuples.get(1);
+ List<Map> maps1 = t1.getMaps("group");
+ assertMaps(maps1, 8, 7, 5, 3);
- //Test Descending with Ascending subsort
+ Tuple t2 = tuples.get(2);
+ List<Map> maps2 = t2.getMaps("group");
+ assertMaps(maps2, 6, 4);
- sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
- stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ //Test Descending with Ascending subsort
- rstream = new ReducerStream(stream,
- new FieldEqualitor("a_s"),
- new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 3));
- pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.DESCENDING));
- attachStreamFactory(pstream);
- tuples = getTuples(pstream);
+ sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
+ stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- assertEquals(3, tuples.size());
+ rstream = new ReducerStream(stream,
+ new FieldEqualitor("a_s"),
+ new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 3));
+ pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.DESCENDING));
+ attachStreamFactory(pstream);
+ pstream.setStreamContext(streamContext);
+ tuples = getTuples(pstream);
- t0 = tuples.get(0);
- maps0 = t0.getMaps("group");
- assertMaps(maps0, 4, 6);
+ assertEquals(3, tuples.size());
- t1 = tuples.get(1);
- maps1 = t1.getMaps("group");
- assertMaps(maps1, 3, 5, 7);
+ t0 = tuples.get(0);
+ maps0 = t0.getMaps("group");
+ assertMaps(maps0, 4, 6);
- t2 = tuples.get(2);
- maps2 = t2.getMaps("group");
- assertMaps(maps2, 0, 2, 1);
+ t1 = tuples.get(1);
+ maps1 = t1.getMaps("group");
+ assertMaps(maps1, 3, 5, 7);
+ t2 = tuples.get(2);
+ maps2 = t2.getMaps("group");
+ assertMaps(maps2, 0, 2, 1);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -490,24 +569,33 @@ public void testTrace() throws Exception {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
//Test an error that comes originates from the /select handler
- SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- ExceptionStream estream = new ExceptionStream(stream);
- Tuple t = getTuple(estream);
- assertTrue(t.EOF);
- assertTrue(t.EXCEPTION);
- assertTrue(t.getException().contains("sort param field can't be found: blah"));
-
- //Test an error that comes originates from the /export handler
- sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt", "/export");
- stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
- estream = new ExceptionStream(stream);
- t = getTuple(estream);
- assertTrue(t.EOF);
- assertTrue(t.EXCEPTION);
- //The /export handler will pass through a real exception.
- assertTrue(t.getException().contains("undefined field:"));
+ try {
+ SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ ExceptionStream estream = new ExceptionStream(stream);
+ estream.setStreamContext(streamContext);
+ Tuple t = getTuple(estream);
+ assertTrue(t.EOF);
+ assertTrue(t.EXCEPTION);
+ assertTrue(t.getException().contains("sort param field can't be found: blah"));
+
+ //Test an error that comes originates from the /export handler
+ sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt", "/export");
+ stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ estream = new ExceptionStream(stream);
+ estream.setStreamContext(streamContext);
+ t = getTuple(estream);
+ assertTrue(t.EOF);
+ assertTrue(t.EXCEPTION);
+ //The /export handler will pass through a real exception.
+ assertTrue(t.getException().contains("undefined field:"));
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -577,48 +665,55 @@ public void testTrace() throws Exception {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- SolrParams sParamsA = mapParams("q", "*:*");
-
- Metric[] metrics = {new SumMetric("a_i"),
- new SumMetric("a_f"),
- new MinMetric("a_i"),
- new MinMetric("a_f"),
- new MaxMetric("a_i"),
- new MaxMetric("a_f"),
- new MeanMetric("a_i"),
- new MeanMetric("a_f"),
- new CountMetric()};
-
- StatsStream statsStream = new StatsStream(zkHost, COLLECTIONORALIAS, sParamsA, metrics);
-
- List<Tuple> tuples = getTuples(statsStream);
-
- assertEquals(1, tuples.size());
-
- //Test Long and Double Sums
-
- Tuple tuple = tuples.get(0);
-
- Double sumi = tuple.getDouble("sum(a_i)");
- Double sumf = tuple.getDouble("sum(a_f)");
- Double mini = tuple.getDouble("min(a_i)");
- Double minf = tuple.getDouble("min(a_f)");
- Double maxi = tuple.getDouble("max(a_i)");
- Double maxf = tuple.getDouble("max(a_f)");
- Double avgi = tuple.getDouble("avg(a_i)");
- Double avgf = tuple.getDouble("avg(a_f)");
- Double count = tuple.getDouble("count(*)");
-
- assertEquals(70, sumi.longValue());
- assertEquals(55.0, sumf.doubleValue(), 0.01);
- assertEquals(0.0, mini.doubleValue(), 0.01);
- assertEquals(1.0, minf.doubleValue(), 0.01);
- assertEquals(14.0, maxi.doubleValue(), 0.01);
- assertEquals(10.0, maxf.doubleValue(), 0.01);
- assertEquals(7.0, avgi.doubleValue(), .01);
- assertEquals(5.5, avgf.doubleValue(), .001);
- assertEquals(10, count.doubleValue(), .01);
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+ SolrParams sParamsA = mapParams("q", "*:*");
+
+ Metric[] metrics = {new SumMetric("a_i"),
+ new SumMetric("a_f"),
+ new MinMetric("a_i"),
+ new MinMetric("a_f"),
+ new MaxMetric("a_i"),
+ new MaxMetric("a_f"),
+ new MeanMetric("a_i"),
+ new MeanMetric("a_f"),
+ new CountMetric()};
+
+ StatsStream statsStream = new StatsStream(zkHost, COLLECTIONORALIAS, sParamsA, metrics);
+ statsStream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(statsStream);
+
+ assertEquals(1, tuples.size());
+
+ //Test Long and Double Sums
+
+ Tuple tuple = tuples.get(0);
+
+ Double sumi = tuple.getDouble("sum(a_i)");
+ Double sumf = tuple.getDouble("sum(a_f)");
+ Double mini = tuple.getDouble("min(a_i)");
+ Double minf = tuple.getDouble("min(a_f)");
+ Double maxi = tuple.getDouble("max(a_i)");
+ Double maxf = tuple.getDouble("max(a_f)");
+ Double avgi = tuple.getDouble("avg(a_i)");
+ Double avgf = tuple.getDouble("avg(a_f)");
+ Double count = tuple.getDouble("count(*)");
+
+ assertEquals(70, sumi.longValue());
+ assertEquals(55.0, sumf.doubleValue(), 0.01);
+ assertEquals(0.0, mini.doubleValue(), 0.01);
+ assertEquals(1.0, minf.doubleValue(), 0.01);
+ assertEquals(14.0, maxi.doubleValue(), 0.01);
+ assertEquals(10.0, maxf.doubleValue(), 0.01);
+ assertEquals(7.0, avgi.doubleValue(), .01);
+ assertEquals(5.5, avgf.doubleValue(), .001);
+ assertEquals(10, count.doubleValue(), .01);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -637,344 +732,352 @@ public void testTrace() throws Exception {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
-
- Bucket[] buckets = {new Bucket("a_s")};
-
- Metric[] metrics = {new SumMetric("a_i"),
- new SumMetric("a_f"),
- new MinMetric("a_i"),
- new MinMetric("a_f"),
- new MaxMetric("a_i"),
- new MaxMetric("a_f"),
- new MeanMetric("a_i"),
- new MeanMetric("a_f"),
- new CountMetric()};
-
- FieldComparator[] sorts = {new FieldComparator("sum(a_i)",
- ComparatorOrder.ASCENDING)};
-
- FacetStream facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
-
- List<Tuple> tuples = getTuples(facetStream);
-
- assert(tuples.size() == 3);
-
- //Test Long and Double Sums
-
- Tuple tuple = tuples.get(0);
- String bucket = tuple.getString("a_s");
- Double sumi = tuple.getDouble("sum(a_i)");
- Double sumf = tuple.getDouble("sum(a_f)");
- Double mini = tuple.getDouble("min(a_i)");
- Double minf = tuple.getDouble("min(a_f)");
- Double maxi = tuple.getDouble("max(a_i)");
- Double maxf = tuple.getDouble("max(a_f)");
- Double avgi = tuple.getDouble("avg(a_i)");
- Double avgf = tuple.getDouble("avg(a_f)");
- Double count = tuple.getDouble("count(*)");
-
- assertEquals("hello4", bucket);
- assertEquals(15, sumi.longValue());
- assertEquals(11.0, sumf.doubleValue(), 0.01);
- assertEquals(4.0, mini.doubleValue(), 0.01);
- assertEquals(4.0, minf.doubleValue(), 0.01);
- assertEquals(11.0, maxi.doubleValue(), 0.01);
- assertEquals(7.0, maxf.doubleValue(), 0.01);
- assertEquals(7.5, avgi.doubleValue(), 0.01);
- assertEquals(5.5, avgf.doubleValue(), 0.01);
- assertEquals(2, count.doubleValue(), 0.01);
-
- tuple = tuples.get(1);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello0", bucket);
- assertEquals(17, sumi.doubleValue(), .01);
- assertEquals(18, sumf.doubleValue(), .01);
- assertEquals(0.0, mini.doubleValue(), .01);
- assertEquals(1.0, minf.doubleValue(), .01);
- assertEquals(14.0, maxi.doubleValue(), .01);
- assertEquals(10.0, maxf.doubleValue(), .01);
- assertEquals(4.25, avgi.doubleValue(), .01);
- assertEquals(4.5, avgf.doubleValue(), .01);
- assertEquals(4, count.doubleValue(), .01);
-
- tuple = tuples.get(2);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello3", bucket);
- assertEquals(38.0, sumi.doubleValue(), 0.01);
- assertEquals(26.0, sumf.doubleValue(), 0.01);
- assertEquals(3.0, mini.doubleValue(), 0.01);
- assertEquals(3.0, minf.doubleValue(), 0.01);
- assertEquals(13.0, maxi.doubleValue(), 0.01);
- assertEquals(9.0, maxf.doubleValue(), 0.01);
- assertEquals(9.5, avgi.doubleValue(), 0.01);
- assertEquals(6.5, avgf.doubleValue(), 0.01);
- assertEquals(4, count.doubleValue(), 0.01);
-
-
- //Reverse the Sort.
-
- sorts[0] = new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING);
-
- facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
-
- tuples = getTuples(facetStream);
-
- assertEquals(3, tuples.size());
-
- //Test Long and Double Sums
-
- tuple = tuples.get(0);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello3", bucket);
- assertEquals(38, sumi.doubleValue(), 0.1);
- assertEquals(26, sumf.doubleValue(), 0.1);
- assertEquals(3, mini.doubleValue(), 0.1);
- assertEquals(3, minf.doubleValue(), 0.1);
- assertEquals(13, maxi.doubleValue(), 0.1);
- assertEquals(9, maxf.doubleValue(), 0.1);
- assertEquals(9.5, avgi.doubleValue(), 0.1);
- assertEquals(6.5, avgf.doubleValue(), 0.1);
- assertEquals(4, count.doubleValue(), 0.1);
-
- tuple = tuples.get(1);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello0", bucket);
- assertEquals(17, sumi.doubleValue(), 0.01);
- assertEquals(18, sumf.doubleValue(), 0.01);
- assertEquals(0, mini.doubleValue(), 0.01);
- assertEquals(1, minf.doubleValue(), 0.01);
- assertEquals(14, maxi.doubleValue(), 0.01);
- assertEquals(10, maxf.doubleValue(), 0.01);
- assertEquals(4.25, avgi.doubleValue(), 0.01);
- assertEquals(4.5, avgf.doubleValue(), 0.01);
- assertEquals(4, count.doubleValue(), 0.01);
-
- tuple = tuples.get(2);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello4", bucket);
- assertEquals(15, sumi.longValue());
- assertEquals(11, sumf.doubleValue(), 0.01);
- assertEquals(4.0, mini.doubleValue(), 0.01);
- assertEquals(4.0, minf.doubleValue(), 0.01);
- assertEquals(11.0, maxi.doubleValue(), 0.01);
- assertEquals(7.0, maxf.doubleValue(), 0.01);
- assertEquals(7.5, avgi.doubleValue(), 0.01);
- assertEquals(5.5, avgf.doubleValue(), 0.01);
- assertEquals(2, count.doubleValue(), 0.01);
-
-
- //Test index sort
-
- sorts[0] = new FieldComparator("a_s", ComparatorOrder.DESCENDING);
-
-
- facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
-
- tuples = getTuples(facetStream);
-
- assertEquals(3, tuples.size());
-
-
- tuple = tuples.get(0);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
-
- assertEquals("hello4", bucket);
- assertEquals(15, sumi.longValue());
- assertEquals(11, sumf.doubleValue(), 0.01);
- assertEquals(4, mini.doubleValue(), 0.01);
- assertEquals(4, minf.doubleValue(), 0.01);
- assertEquals(11, maxi.doubleValue(), 0.01);
- assertEquals(7, maxf.doubleValue(), 0.01);
- assertEquals(7.5, avgi.doubleValue(), 0.01);
- assertEquals(5.5, avgf.doubleValue(), 0.01);
- assertEquals(2, count.doubleValue(), 0.01);
-
- tuple = tuples.get(1);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertTrue(bucket.equals("hello3"));
- assertTrue(sumi.doubleValue() == 38.0D);
- assertTrue(sumf.doubleValue() == 26.0D);
- assertTrue(mini.doubleValue() == 3.0D);
- assertTrue(minf.doubleValue() == 3.0D);
- assertTrue(maxi.doubleValue() == 13.0D);
- assertTrue(maxf.doubleValue() == 9.0D);
- assertTrue(avgi.doubleValue() == 9.5D);
- assertTrue(avgf.doubleValue() == 6.5D);
- assertTrue(count.doubleValue() == 4);
-
- tuple = tuples.get(2);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello0", bucket);
- assertEquals(17, sumi.doubleValue(), 0.01);
- assertEquals(18, sumf.doubleValue(), 0.01);
- assertEquals(0, mini.doubleValue(), 0.01);
- assertEquals(1, minf.doubleValue(), 0.01);
- assertEquals(14, maxi.doubleValue(), 0.01);
- assertEquals(10, maxf.doubleValue(), 0.01);
- assertEquals(4.25, avgi.doubleValue(), 0.01);
- assertEquals(4.5, avgf.doubleValue(), 0.01);
- assertEquals(4, count.doubleValue(), 0.01);
-
- //Test index sort
-
- sorts[0] = new FieldComparator("a_s", ComparatorOrder.ASCENDING);
-
- facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
-
- tuples = getTuples(facetStream);
-
- assertEquals(3, tuples.size());
-
- tuple = tuples.get(0);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello0", bucket);
- assertEquals(17, sumi.doubleValue(), 0.01);
- assertEquals(18, sumf.doubleValue(), 0.01);
- assertEquals(0, mini.doubleValue(), 0.01);
- assertEquals(1, minf.doubleValue(), 0.01);
- assertEquals(14, maxi.doubleValue(), 0.01);
- assertEquals(10, maxf.doubleValue(), 0.01);
- assertEquals(4.25, avgi.doubleValue(), 0.0001);
- assertEquals(4.5, avgf.doubleValue(), 0.001);
- assertEquals(4, count.doubleValue(), 0.01);
-
- tuple = tuples.get(1);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello3", bucket);
- assertEquals(38, sumi.doubleValue(), 0.01);
- assertEquals(26, sumf.doubleValue(), 0.01);
- assertEquals(3, mini.doubleValue(), 0.01);
- assertEquals(3, minf.doubleValue(), 0.01);
- assertEquals(13, maxi.doubleValue(), 0.01);
- assertEquals(9, maxf.doubleValue(), 0.01);
- assertEquals(9.5, avgi.doubleValue(), 0.01);
- assertEquals(6.5, avgf.doubleValue(), 0.01);
- assertEquals(4, count.doubleValue(), 0.01);
-
- tuple = tuples.get(2);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello4", bucket);
- assertEquals(15, sumi.longValue());
- assertEquals(11.0, sumf.doubleValue(), 0.1);
- assertEquals(4.0, mini.doubleValue(), 0.1);
- assertEquals(4.0, minf.doubleValue(), 0.1);
- assertEquals(11.0, maxi.doubleValue(), 0.1);
- assertEquals(7.0, maxf.doubleValue(), 0.1);
- assertEquals(7.5, avgi.doubleValue(), 0.1);
- assertEquals(5.5, avgf.doubleValue(), 0.1);
- assertEquals(2, count.doubleValue(), 0.1);
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+ SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
+
+ Bucket[] buckets = {new Bucket("a_s")};
+
+ Metric[] metrics = {new SumMetric("a_i"),
+ new SumMetric("a_f"),
+ new MinMetric("a_i"),
+ new MinMetric("a_f"),
+ new MaxMetric("a_i"),
+ new MaxMetric("a_f"),
+ new MeanMetric("a_i"),
+ new MeanMetric("a_f"),
+ new CountMetric()};
+
+ FieldComparator[] sorts = {new FieldComparator("sum(a_i)",
+ ComparatorOrder.ASCENDING)};
+
+ FacetStream facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
+
+ List<Tuple> tuples = getTuples(facetStream);
+
+ assert (tuples.size() == 3);
+
+ //Test Long and Double Sums
+
+ Tuple tuple = tuples.get(0);
+ String bucket = tuple.getString("a_s");
+ Double sumi = tuple.getDouble("sum(a_i)");
+ Double sumf = tuple.getDouble("sum(a_f)");
+ Double mini = tuple.getDouble("min(a_i)");
+ Double minf = tuple.getDouble("min(a_f)");
+ Double maxi = tuple.getDouble("max(a_i)");
+ Double maxf = tuple.getDouble("max(a_f)");
+ Double avgi = tuple.getDouble("avg(a_i)");
+ Double avgf = tuple.getDouble("avg(a_f)");
+ Double count = tuple.getDouble("count(*)");
+
+ assertEquals("hello4", bucket);
+ assertEquals(15, sumi.longValue());
+ assertEquals(11.0, sumf.doubleValue(), 0.01);
+ assertEquals(4.0, mini.doubleValue(), 0.01);
+ assertEquals(4.0, minf.doubleValue(), 0.01);
+ assertEquals(11.0, maxi.doubleValue(), 0.01);
+ assertEquals(7.0, maxf.doubleValue(), 0.01);
+ assertEquals(7.5, avgi.doubleValue(), 0.01);
+ assertEquals(5.5, avgf.doubleValue(), 0.01);
+ assertEquals(2, count.doubleValue(), 0.01);
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello0", bucket);
+ assertEquals(17, sumi.doubleValue(), .01);
+ assertEquals(18, sumf.doubleValue(), .01);
+ assertEquals(0.0, mini.doubleValue(), .01);
+ assertEquals(1.0, minf.doubleValue(), .01);
+ assertEquals(14.0, maxi.doubleValue(), .01);
+ assertEquals(10.0, maxf.doubleValue(), .01);
+ assertEquals(4.25, avgi.doubleValue(), .01);
+ assertEquals(4.5, avgf.doubleValue(), .01);
+ assertEquals(4, count.doubleValue(), .01);
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello3", bucket);
+ assertEquals(38.0, sumi.doubleValue(), 0.01);
+ assertEquals(26.0, sumf.doubleValue(), 0.01);
+ assertEquals(3.0, mini.doubleValue(), 0.01);
+ assertEquals(3.0, minf.doubleValue(), 0.01);
+ assertEquals(13.0, maxi.doubleValue(), 0.01);
+ assertEquals(9.0, maxf.doubleValue(), 0.01);
+ assertEquals(9.5, avgi.doubleValue(), 0.01);
+ assertEquals(6.5, avgf.doubleValue(), 0.01);
+ assertEquals(4, count.doubleValue(), 0.01);
+
+
+ //Reverse the Sort.
+
+ sorts[0] = new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING);
+
+ facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
+
+ tuples = getTuples(facetStream);
+
+ assertEquals(3, tuples.size());
+
+ //Test Long and Double Sums
+
+ tuple = tuples.get(0);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello3", bucket);
+ assertEquals(38, sumi.doubleValue(), 0.1);
+ assertEquals(26, sumf.doubleValue(), 0.1);
+ assertEquals(3, mini.doubleValue(), 0.1);
+ assertEquals(3, minf.doubleValue(), 0.1);
+ assertEquals(13, maxi.doubleValue(), 0.1);
+ assertEquals(9, maxf.doubleValue(), 0.1);
+ assertEquals(9.5, avgi.doubleValue(), 0.1);
+ assertEquals(6.5, avgf.doubleValue(), 0.1);
+ assertEquals(4, count.doubleValue(), 0.1);
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello0", bucket);
+ assertEquals(17, sumi.doubleValue(), 0.01);
+ assertEquals(18, sumf.doubleValue(), 0.01);
+ assertEquals(0, mini.doubleValue(), 0.01);
+ assertEquals(1, minf.doubleValue(), 0.01);
+ assertEquals(14, maxi.doubleValue(), 0.01);
+ assertEquals(10, maxf.doubleValue(), 0.01);
+ assertEquals(4.25, avgi.doubleValue(), 0.01);
+ assertEquals(4.5, avgf.doubleValue(), 0.01);
+ assertEquals(4, count.doubleValue(), 0.01);
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello4", bucket);
+ assertEquals(15, sumi.longValue());
+ assertEquals(11, sumf.doubleValue(), 0.01);
+ assertEquals(4.0, mini.doubleValue(), 0.01);
+ assertEquals(4.0, minf.doubleValue(), 0.01);
+ assertEquals(11.0, maxi.doubleValue(), 0.01);
+ assertEquals(7.0, maxf.doubleValue(), 0.01);
+ assertEquals(7.5, avgi.doubleValue(), 0.01);
+ assertEquals(5.5, avgf.doubleValue(), 0.01);
+ assertEquals(2, count.doubleValue(), 0.01);
+
+
+ //Test index sort
+
+ sorts[0] = new FieldComparator("a_s", ComparatorOrder.DESCENDING);
+
+
+ facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
+ facetStream.setStreamContext(streamContext);
+
+ tuples = getTuples(facetStream);
+
+ assertEquals(3, tuples.size());
+
+
+ tuple = tuples.get(0);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+
+ assertEquals("hello4", bucket);
+ assertEquals(15, sumi.longValue());
+ assertEquals(11, sumf.doubleValue(), 0.01);
+ assertEquals(4, mini.doubleValue(), 0.01);
+ assertEquals(4, minf.doubleValue(), 0.01);
+ assertEquals(11, maxi.doubleValue(), 0.01);
+ assertEquals(7, maxf.doubleValue(), 0.01);
+ assertEquals(7.5, avgi.doubleValue(), 0.01);
+ assertEquals(5.5, avgf.doubleValue(), 0.01);
+ assertEquals(2, count.doubleValue(), 0.01);
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello3"));
+ assertTrue(sumi.doubleValue() == 38.0D);
+ assertTrue(sumf.doubleValue() == 26.0D);
+ assertTrue(mini.doubleValue() == 3.0D);
+ assertTrue(minf.doubleValue() == 3.0D);
+ assertTrue(maxi.doubleValue() == 13.0D);
+ assertTrue(maxf.doubleValue() == 9.0D);
+ assertTrue(avgi.doubleValue() == 9.5D);
+ assertTrue(avgf.doubleValue() == 6.5D);
+ assertTrue(count.doubleValue() == 4);
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello0", bucket);
+ assertEquals(17, sumi.doubleValue(), 0.01);
+ assertEquals(18, sumf.doubleValue(), 0.01);
+ assertEquals(0, mini.doubleValue(), 0.01);
+ assertEquals(1, minf.doubleValue(), 0.01);
+ assertEquals(14, maxi.doubleValue(), 0.01);
+ assertEquals(10, maxf.doubleValue(), 0.01);
+ assertEquals(4.25, avgi.doubleValue(), 0.01);
+ assertEquals(4.5, avgf.doubleValue(), 0.01);
+ assertEquals(4, count.doubleValue(), 0.01);
+
+ //Test index sort
+
+ sorts[0] = new FieldComparator("a_s", ComparatorOrder.ASCENDING);
+
+ facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
+ facetStream.setStreamContext(streamContext);
+ tuples = getTuples(facetStream);
+
+ assertEquals(3, tuples.size());
+
+ tuple = tuples.get(0);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello0", bucket);
+ assertEquals(17, sumi.doubleValue(), 0.01);
+ assertEquals(18, sumf.doubleValue(), 0.01);
+ assertEquals(0, mini.doubleValue(), 0.01);
+ assertEquals(1, minf.doubleValue(), 0.01);
+ assertEquals(14, maxi.doubleValue(), 0.01);
+ assertEquals(10, maxf.doubleValue(), 0.01);
+ assertEquals(4.25, avgi.doubleValue(), 0.0001);
+ assertEquals(4.5, avgf.doubleValue(), 0.001);
+ assertEquals(4, count.doubleValue(), 0.01);
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello3", bucket);
+ assertEquals(38, sumi.doubleValue(), 0.01);
+ assertEquals(26, sumf.doubleValue(), 0.01);
+ assertEquals(3, mini.doubleValue(), 0.01);
+ assertEquals(3, minf.doubleValue(), 0.01);
+ assertEquals(13, maxi.doubleValue(), 0.01);
+ assertEquals(9, maxf.doubleValue(), 0.01);
+ assertEquals(9.5, avgi.doubleValue(), 0.01);
+ assertEquals(6.5, avgf.doubleValue(), 0.01);
+ assertEquals(4, count.doubleValue(), 0.01);
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello4", bucket);
+ assertEquals(15, sumi.longValue());
+ assertEquals(11.0, sumf.doubleValue(), 0.1);
+ assertEquals(4.0, mini.doubleValue(), 0.1);
+ assertEquals(4.0, minf.doubleValue(), 0.1);
+ assertEquals(11.0, maxi.doubleValue(), 0.1);
+ assertEquals(7.0, maxf.doubleValue(), 0.1);
+ assertEquals(7.5, avgi.doubleValue(), 0.1);
+ assertEquals(5.5, avgf.doubleValue(), 0.1);
+ assertEquals(2, count.doubleValue(), 0.1);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -993,173 +1096,181 @@ public void testTrace() throws Exception {
.add(id, "9", "level1_s", "hello0", "level2_s", "b", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_i,a_f");
-
- Bucket[] buckets = {new Bucket("level1_s"), new Bucket("level2_s")};
-
- Metric[] metrics = {new SumMetric("a_i"),
- new CountMetric()};
-
- FieldComparator[] sorts = {new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING), new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING)};
-
- FacetStream facetStream = new FacetStream(
- zkHost,
- COLLECTIONORALIAS,
- sParamsA,
- buckets,
- metrics,
- sorts,
- 100);
-
- List<Tuple> tuples = getTuples(facetStream);
- assertEquals(6, tuples.size());
-
- Tuple tuple = tuples.get(0);
- String bucket1 = tuple.getString("level1_s");
- String bucket2 = tuple.getString("level2_s");
- Double sumi = tuple.getDouble("sum(a_i)");
- Double count = tuple.getDouble("count(*)");
-
- assertEquals("hello3", bucket1);
- assertEquals("b", bucket2);
- assertEquals(35, sumi.longValue());
- assertEquals(3, count, 0.1);
-
- tuple = tuples.get(1);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello0", bucket1);
- assertEquals("b", bucket2);
- assertEquals(15, sumi.longValue());
- assertEquals(2, count, 0.1);
-
- tuple = tuples.get(2);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello4", bucket1);
- assertEquals("b", bucket2);
- assertEquals(11, sumi.longValue());
- assertEquals(1, count.doubleValue(), 0.1);
-
- tuple = tuples.get(3);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello4", bucket1);
- assertEquals("a", bucket2);
- assertEquals(4, sumi.longValue());
- assertEquals(1, count.doubleValue(), 0.1);
-
- tuple = tuples.get(4);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello3", bucket1);
- assertEquals("a", bucket2);
- assertEquals(3, sumi.longValue());
- assertEquals(1, count.doubleValue(), 0.1);
-
- tuple = tuples.get(5);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello0", bucket1);
- assertEquals("a", bucket2);
- assertEquals(2, sumi.longValue());
- assertEquals(2, count.doubleValue(), 0.1);
-
- sorts[0] = new FieldComparator("level1_s", ComparatorOrder.DESCENDING );
- sorts[1] = new FieldComparator("level2_s", ComparatorOrder.DESCENDING );
- facetStream = new FacetStream(
- zkHost,
- COLLECTIONORALIAS,
- sParamsA,
- buckets,
- metrics,
- sorts,
- 100);
-
- tuples = getTuples(facetStream);
- assertEquals(6, tuples.size());
-
- tuple = tuples.get(0);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello4", bucket1);
- assertEquals("b", bucket2);
- assertEquals(11, sumi.longValue());
- assertEquals(1, count, 0.1);
-
- tuple = tuples.get(1);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello4", bucket1);
- assertEquals("a", bucket2);
- assertEquals(4, sumi.longValue());
- assertEquals(1, count.doubleValue(), 0.1);
-
- tuple = tuples.get(2);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello3", bucket1);
- assertEquals("b", bucket2);
- assertEquals(35, sumi.longValue());
- assertEquals(3, count.doubleValue(), 0.1);
-
- tuple = tuples.get(3);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello3", bucket1);
- assertEquals("a", bucket2);
- assertEquals(3, sumi.longValue());
- assertEquals(1, count.doubleValue(), 0.1);
-
- tuple = tuples.get(4);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello0", bucket1);
- assertEquals("b", bucket2);
- assertEquals(15, sumi.longValue());
- assertEquals(2, count.doubleValue(), 0.1);
-
- tuple = tuples.get(5);
- bucket1 = tuple.getString("level1_s");
- bucket2 = tuple.getString("level2_s");
- sumi = tuple.getDouble("sum(a_i)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello0", bucket1);
- assertEquals("a", bucket2);
- assertEquals(2, sumi.longValue());
- assertEquals(2, count.doubleValue(), 0.1);
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+
+ SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_i,a_f");
+
+ Bucket[] buckets = {new Bucket("level1_s"), new Bucket("level2_s")};
+
+ Metric[] metrics = {new SumMetric("a_i"),
+ new CountMetric()};
+
+ FieldComparator[] sorts = {new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING), new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING)};
+
+ FacetStream facetStream = new FacetStream(
+ zkHost,
+ COLLECTIONORALIAS,
+ sParamsA,
+ buckets,
+ metrics,
+ sorts,
+ 100);
+ facetStream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(facetStream);
+ assertEquals(6, tuples.size());
+
+ Tuple tuple = tuples.get(0);
+ String bucket1 = tuple.getString("level1_s");
+ String bucket2 = tuple.getString("level2_s");
+ Double sumi = tuple.getDouble("sum(a_i)");
+ Double count = tuple.getDouble("count(*)");
+
+ assertEquals("hello3", bucket1);
+ assertEquals("b", bucket2);
+ assertEquals(35, sumi.longValue());
+ assertEquals(3, count, 0.1);
+
+ tuple = tuples.get(1);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello0", bucket1);
+ assertEquals("b", bucket2);
+ assertEquals(15, sumi.longValue());
+ assertEquals(2, count, 0.1);
+
+ tuple = tuples.get(2);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello4", bucket1);
+ assertEquals("b", bucket2);
+ assertEquals(11, sumi.longValue());
+ assertEquals(1, count.doubleValue(), 0.1);
+
+ tuple = tuples.get(3);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello4", bucket1);
+ assertEquals("a", bucket2);
+ assertEquals(4, sumi.longValue());
+ assertEquals(1, count.doubleValue(), 0.1);
+
+ tuple = tuples.get(4);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello3", bucket1);
+ assertEquals("a", bucket2);
+ assertEquals(3, sumi.longValue());
+ assertEquals(1, count.doubleValue(), 0.1);
+
+ tuple = tuples.get(5);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello0", bucket1);
+ assertEquals("a", bucket2);
+ assertEquals(2, sumi.longValue());
+ assertEquals(2, count.doubleValue(), 0.1);
+
+ sorts[0] = new FieldComparator("level1_s", ComparatorOrder.DESCENDING);
+ sorts[1] = new FieldComparator("level2_s", ComparatorOrder.DESCENDING);
+ facetStream = new FacetStream(
+ zkHost,
+ COLLECTIONORALIAS,
+ sParamsA,
+ buckets,
+ metrics,
+ sorts,
+ 100);
+ facetStream.setStreamContext(streamContext);
+ tuples = getTuples(facetStream);
+ assertEquals(6, tuples.size());
+
+ tuple = tuples.get(0);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello4", bucket1);
+ assertEquals("b", bucket2);
+ assertEquals(11, sumi.longValue());
+ assertEquals(1, count, 0.1);
+
+ tuple = tuples.get(1);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello4", bucket1);
+ assertEquals("a", bucket2);
+ assertEquals(4, sumi.longValue());
+ assertEquals(1, count.doubleValue(), 0.1);
+
+ tuple = tuples.get(2);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello3", bucket1);
+ assertEquals("b", bucket2);
+ assertEquals(35, sumi.longValue());
+ assertEquals(3, count.doubleValue(), 0.1);
+
+ tuple = tuples.get(3);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello3", bucket1);
+ assertEquals("a", bucket2);
+ assertEquals(3, sumi.longValue());
+ assertEquals(1, count.doubleValue(), 0.1);
+
+ tuple = tuples.get(4);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello0", bucket1);
+ assertEquals("b", bucket2);
+ assertEquals(15, sumi.longValue());
+ assertEquals(2, count.doubleValue(), 0.1);
+
+ tuple = tuples.get(5);
+ bucket1 = tuple.getString("level1_s");
+ bucket2 = tuple.getString("level2_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello0", bucket1);
+ assertEquals("a", bucket2);
+ assertEquals(2, sumi.longValue());
+ assertEquals(2, count.doubleValue(), 0.1);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -1177,166 +1288,174 @@ public void testTrace() throws Exception {
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
- SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-
- Bucket[] buckets = {new Bucket("a_s")};
-
- Metric[] metrics = {new SumMetric("a_i"),
- new SumMetric("a_f"),
- new MinMetric("a_i"),
- new MinMetric("a_f"),
- new MaxMetric("a_i"),
- new MaxMetric("a_f"),
- new MeanMetric("a_i"),
- new MeanMetric("a_f"),
- new CountMetric()};
-
- RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
- List<Tuple> tuples = getTuples(rollupStream);
-
- assert(tuples.size() == 3);
-
- //Test Long and Double Sums
-
- Tuple tuple = tuples.get(0);
- String bucket = tuple.getString("a_s");
- Double sumi = tuple.getDouble("sum(a_i)");
- Double sumf = tuple.getDouble("sum(a_f)");
- Double mini = tuple.getDouble("min(a_i)");
- Double minf = tuple.getDouble("min(a_f)");
- Double maxi = tuple.getDouble("max(a_i)");
- Double maxf = tuple.getDouble("max(a_f)");
- Double avgi = tuple.getDouble("avg(a_i)");
- Double avgf = tuple.getDouble("avg(a_f)");
- Double count = tuple.getDouble("count(*)");
-
-
- assertEquals("hello0", bucket);
- assertEquals(17, sumi.doubleValue(), 0.001);
- assertEquals(18, sumf.doubleValue(), 0.001);
- assertEquals(0, mini.doubleValue(), 0.001);
- assertEquals(1, minf.doubleValue(), 0.001);
- assertEquals(14, maxi.doubleValue(), 0.001);
- assertEquals(10, maxf.doubleValue(), 0.001);
- assertEquals(4.25, avgi.doubleValue(), 0.001);
- assertEquals(4.5, avgf.doubleValue(), 0.001);
- assertEquals(4, count.doubleValue(), 0.001);
-
-
- tuple = tuples.get(1);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello3", bucket);
- assertEquals(38, sumi.doubleValue(), 0.001);
- assertEquals(26, sumf.doubleValue(), 0.001);
- assertEquals(3, mini.doubleValue(), 0.001);
- assertEquals(3, minf.doubleValue(), 0.001);
- assertEquals(13, maxi.doubleValue(), 0.001);
- assertEquals(9, maxf.doubleValue(), 0.001);
- assertEquals(9.5, avgi.doubleValue(), 0.001);
- assertEquals(6.5, avgf.doubleValue(), 0.001);
- assertEquals(4, count.doubleValue(), 0.001);
-
-
- tuple = tuples.get(2);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello4", bucket);
- assertEquals(15, sumi.longValue());
- assertEquals(11, sumf.doubleValue(), 0.01);
- assertEquals(4, mini.doubleValue(), 0.01);
- assertEquals(4, minf.doubleValue(), 0.01);
- assertEquals(11, maxi.doubleValue(), 0.01);
- assertEquals(7, maxf.doubleValue(), 0.01);
- assertEquals(7.5, avgi.doubleValue(), 0.01);
- assertEquals(5.5, avgf.doubleValue(), 0.01);
- assertEquals(2, count.doubleValue(), 0.01);
-
- // Test will null metrics
- rollupStream = new RollupStream(stream, buckets, metrics);
- tuples = getTuples(rollupStream);
-
- assert(tuples.size() == 3);
- tuple = tuples.get(0);
- bucket = tuple.getString("a_s");
- assertTrue(bucket.equals("hello0"));
-
- tuple = tuples.get(1);
- bucket = tuple.getString("a_s");
- assertTrue(bucket.equals("hello3"));
-
- tuple = tuples.get(2);
- bucket = tuple.getString("a_s");
- assertTrue(bucket.equals("hello4"));
-
-
- //Test will null value in the grouping field
- new UpdateRequest()
- .add(id, "12", "a_s", null, "a_i", "14", "a_f", "10")
- .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
- sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "qt", "/export");
- stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-
- Bucket[] buckets1 = {new Bucket("a_s")};
-
- Metric[] metrics1 = {new SumMetric("a_i"),
- new SumMetric("a_f"),
- new MinMetric("a_i"),
- new MinMetric("a_f"),
- new MaxMetric("a_i"),
- new MaxMetric("a_f"),
- new MeanMetric("a_i"),
- new MeanMetric("a_f"),
- new CountMetric()};
-
- rollupStream = new RollupStream(stream, buckets1, metrics1);
- tuples = getTuples(rollupStream);
- //Check that we've got the extra NULL bucket
- assertEquals(4, tuples.size());
- tuple = tuples.get(0);
- assertEquals("NULL", tuple.getString("a_s"));
-
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals(14, sumi.doubleValue(), 0.01);
- assertEquals(10, sumf.doubleValue(), 0.01);
- assertEquals(14, mini.doubleValue(), 0.01);
- assertEquals(10, minf.doubleValue(), 0.01);
- assertEquals(14, maxi.doubleValue(), 0.01);
- assertEquals(10, maxf.doubleValue(), 0.01);
- assertEquals(14, avgi.doubleValue(), 0.01);
- assertEquals(10, avgf.doubleValue(), 0.01);
- assertEquals(1, count.doubleValue(), 0.01);
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+ SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+
+ Bucket[] buckets = {new Bucket("a_s")};
+
+ Metric[] metrics = {new SumMetric("a_i"),
+ new SumMetric("a_f"),
+ new MinMetric("a_i"),
+ new MinMetric("a_f"),
+ new MaxMetric("a_i"),
+ new MaxMetric("a_f"),
+ new MeanMetric("a_i"),
+ new MeanMetric("a_f"),
+ new CountMetric()};
+
+ RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
+ rollupStream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(rollupStream);
+
+ assert (tuples.size() == 3);
+
+ //Test Long and Double Sums
+
+ Tuple tuple = tuples.get(0);
+ String bucket = tuple.getString("a_s");
+ Double sumi = tuple.getDouble("sum(a_i)");
+ Double sumf = tuple.getDouble("sum(a_f)");
+ Double mini = tuple.getDouble("min(a_i)");
+ Double minf = tuple.getDouble("min(a_f)");
+ Double maxi = tuple.getDouble("max(a_i)");
+ Double maxf = tuple.getDouble("max(a_f)");
+ Double avgi = tuple.getDouble("avg(a_i)");
+ Double avgf = tuple.getDouble("avg(a_f)");
+ Double count = tuple.getDouble("count(*)");
+
+
+ assertEquals("hello0", bucket);
+ assertEquals(17, sumi.doubleValue(), 0.001);
+ assertEquals(18, sumf.doubleValue(), 0.001);
+ assertEquals(0, mini.doubleValue(), 0.001);
+ assertEquals(1, minf.doubleValue(), 0.001);
+ assertEquals(14, maxi.doubleValue(), 0.001);
+ assertEquals(10, maxf.doubleValue(), 0.001);
+ assertEquals(4.25, avgi.doubleValue(), 0.001);
+ assertEquals(4.5, avgf.doubleValue(), 0.001);
+ assertEquals(4, count.doubleValue(), 0.001);
+
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello3", bucket);
+ assertEquals(38, sumi.doubleValue(), 0.001);
+ assertEquals(26, sumf.doubleValue(), 0.001);
+ assertEquals(3, mini.doubleValue(), 0.001);
+ assertEquals(3, minf.doubleValue(), 0.001);
+ assertEquals(13, maxi.doubleValue(), 0.001);
+ assertEquals(9, maxf.doubleValue(), 0.001);
+ assertEquals(9.5, avgi.doubleValue(), 0.001);
+ assertEquals(6.5, avgf.doubleValue(), 0.001);
+ assertEquals(4, count.doubleValue(), 0.001);
+
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals("hello4", bucket);
+ assertEquals(15, sumi.longValue());
+ assertEquals(11, sumf.doubleValue(), 0.01);
+ assertEquals(4, mini.doubleValue(), 0.01);
+ assertEquals(4, minf.doubleValue(), 0.01);
+ assertEquals(11, maxi.doubleValue(), 0.01);
+ assertEquals(7, maxf.doubleValue(), 0.01);
+ assertEquals(7.5, avgi.doubleValue(), 0.01);
+ assertEquals(5.5, avgf.doubleValue(), 0.01);
+ assertEquals(2, count.doubleValue(), 0.01);
+
+ // Test will null metrics
+ rollupStream = new RollupStream(stream, buckets, metrics);
+ rollupStream.setStreamContext(streamContext);
+ tuples = getTuples(rollupStream);
+
+ assert (tuples.size() == 3);
+ tuple = tuples.get(0);
+ bucket = tuple.getString("a_s");
+ assertTrue(bucket.equals("hello0"));
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ assertTrue(bucket.equals("hello3"));
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ assertTrue(bucket.equals("hello4"));
+
+
+ //Test will null value in the grouping field
+ new UpdateRequest()
+ .add(id, "12", "a_s", null, "a_i", "14", "a_f", "10")
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+ sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "qt", "/export");
+ stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+ Bucket[] buckets1 = {new Bucket("a_s")};
+
+ Metric[] metrics1 = {new SumMetric("a_i"),
+ new SumMetric("a_f"),
+ new MinMetric("a_i"),
+ new MinMetric("a_f"),
+ new MaxMetric("a_i"),
+ new MaxMetric("a_f"),
+ new MeanMetric("a_i"),
+ new MeanMetric("a_f"),
+ new CountMetric()};
+
+ rollupStream = new RollupStream(stream, buckets1, metrics1);
+ rollupStream.setStreamContext(streamContext);
+ tuples = getTuples(rollupStream);
+ //Check that we've got the extra NULL bucket
+ assertEquals(4, tuples.size());
+ tuple = tuples.get(0);
+ assertEquals("NULL", tuple.getString("a_s"));
+
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertEquals(14, sumi.doubleValue(), 0.01);
+ assertEquals(10, sumf.doubleValue(), 0.01);
+ assertEquals(14, mini.doubleValue(), 0.01);
+ assertEquals(10, minf.doubleValue(), 0.01);
+ assertEquals(14, maxi.doubleValue(), 0.01);
+ assertEquals(10, maxf.doubleValue(), 0.01);
+ assertEquals(14, avgi.doubleValue(), 0.01);
+ assertEquals(10, avgf.doubleValue(), 0.01);
+ assertEquals(1, count.doubleValue(), 0.01);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -1347,66 +1466,71 @@ public void testTrace() throws Exception {
SolrClientCache cache = new SolrClientCache();
context.setSolrClientCache(cache);
- SolrParams sParams = mapParams("q", "a_s:hello0", "rows", "500", "fl", "id");
+ try {
+ SolrParams sParams = mapParams("q", "a_s:hello0", "rows", "500", "fl", "id");
- TopicStream topicStream = new TopicStream(zkHost,
- COLLECTIONORALIAS,
- COLLECTIONORALIAS,
- "50000000",
- -1,
- 1000000, sParams);
+ TopicStream topicStream = new TopicStream(zkHost,
+ COLLECTIONORALIAS,
+ COLLECTIONORALIAS,
+ "50000000",
+ -1,
+ 1000000, sParams);
- DaemonStream daemonStream = new DaemonStream(topicStream, "daemon1", 1000, 500);
- daemonStream.setStreamContext(context);
+ DaemonStream daemonStream = new DaemonStream(topicStream, "daemon1", 1000, 500);
+ daemonStream.setStreamContext(context);
- daemonStream.open();
+ daemonStream.open();
- // Wait for the checkpoint
- JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);
+ // Wait for the checkpoint
+ JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);
- SolrParams sParams1 = mapParams("qt", "/get", "ids", "50000000", "fl", "id");
- int count = 0;
- while(count == 0) {
- SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTIONORALIAS, sParams1);
- List<Tuple> tuples = getTuples(solrStream);
- count = tuples.size();
- if(count > 0) {
- Tuple t = tuples.get(0);
- assertTrue(t.getLong("id") == 50000000);
- } else {
- System.out.println("###### Waiting for checkpoint #######:" + count);
+ SolrParams sParams1 = mapParams("qt", "/get", "ids", "50000000", "fl", "id");
+ int count = 0;
+ while (count == 0) {
+ SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTIONORALIAS, sParams1);
+ solrStream.setStreamContext(context);
+ List<Tuple> tuples = getTuples(solrStream);
+ count = tuples.size();
+ if (count > 0) {
+ Tuple t = tuples.get(0);
+ assertTrue(t.getLong("id") == 50000000);
+ } else {
+ System.out.println("###### Waiting for checkpoint #######:" + count);
+ }
}
- }
- new UpdateRequest()
- .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
- .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
- .add(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3")
- .add(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4")
- .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
- .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+ new UpdateRequest()
+ .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+ .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+ .add(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3")
+ .add(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4")
+ .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- for(int i=0; i<5; i++) {
- daemonStream.read();
- }
+ for (int i = 0; i < 5; i++) {
+ daemonStream.read();
+ }
- new UpdateRequest()
- .add(id, "5", "a_s", "hello0", "a_i", "4", "a_f", "4")
- .add(id, "6", "a_s", "hello0", "a_i", "4", "a_f", "4")
- .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+ new UpdateRequest()
+ .add(id, "5", "a_s", "hello0", "a_i", "4", "a_f", "4")
+ .add(id, "6", "a_s", "hello0", "a_i", "4", "a_f", "4")
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- for(int i=0; i<2; i++) {
- daemonStream.read();
- }
+ for (int i = 0; i < 2; i++) {
+ daemonStream.read();
+ }
- daemonStream.shutdown();
+ daemonStream.shutdown();
- Tuple tuple = daemonStream.read();
+ Tuple tuple = daemonStream.read();
+
+ assertTrue(tuple.EOF);
+ daemonStream.close();
+ } finally {
+ cache.close();
+ }
- assertTrue(tuple.EOF);
- daemonStream.close();
- cache.close();
}
@@ -1426,99 +1550,107 @@ public void testTrace() throws Exception {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s");
- CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
-
- Bucket[] buckets = {new Bucket("a_s")};
-
- Metric[] metrics = {new SumMetric("a_i"),
- new SumMetric("a_f"),
- new MinMetric("a_i"),
- new MinMetric("a_f"),
- new MaxMetric("a_i"),
- new MaxMetric("a_f"),
- new MeanMetric("a_i"),
- new MeanMetric("a_f"),
- new CountMetric()};
-
- RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
- ParallelStream parallelStream = parallelStream(rollupStream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
- attachStreamFactory(parallelStream);
- List<Tuple> tuples = getTuples(parallelStream);
-
- assertEquals(3, tuples.size());
-
- //Test Long and Double Sums
-
- Tuple tuple = tuples.get(0);
- String bucket = tuple.getString("a_s");
- Double sumi = tuple.getDouble("sum(a_i)");
- Double sumf = tuple.getDouble("sum(a_f)");
- Double mini = tuple.getDouble("min(a_i)");
- Double minf = tuple.getDouble("min(a_f)");
- Double maxi = tuple.getDouble("max(a_i)");
- Double maxf = tuple.getDouble("max(a_f)");
- Double avgi = tuple.getDouble("avg(a_i)");
- Double avgf = tuple.getDouble("avg(a_f)");
- Double count = tuple.getDouble("count(*)");
-
- assertEquals("hello0", bucket);
- assertEquals(17, sumi.doubleValue(), 0.001);
- assertEquals(18, sumf.doubleValue(), 0.001);
- assertEquals(0, mini.doubleValue(), 0.001);
- assertEquals(1, minf.doubleValue(), 0.001);
- assertEquals(14, maxi.doubleValue(), 0.001);
- assertEquals(10, maxf.doubleValue(), 0.001);
- assertEquals(4.25, avgi.doubleValue(), 0.001);
- assertEquals(4.5, avgf.doubleValue(), 0.001);
- assertEquals(4, count.doubleValue(), 0.001);
-
- tuple = tuples.get(1);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello3", bucket);
- assertEquals(38, sumi.doubleValue(), 0.001);
- assertEquals(26, sumf.doubleValue(), 0.001);
- assertEquals(3, mini.doubleValue(), 0.001);
- assertEquals(3, minf.doubleValue(), 0.001);
- assertEquals(13, maxi.doubleValue(), 0.001);
- assertEquals(9, maxf.doubleValue(), 0.001);
- assertEquals(9.5, avgi.doubleValue(), 0.001);
- assertEquals(6.5, avgf.doubleValue(), 0.001);
- assertEquals(4, count.doubleValue(), 0.001);
-
- tuple = tuples.get(2);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
-
- assertEquals("hello4", bucket);
- assertEquals(15, sumi.longValue());
- assertEquals(11, sumf.doubleValue(), 0.001);
- assertEquals(4, mini.doubleValue(), 0.001);
- assertEquals(4, minf.doubleValue(), 0.001);
- assertEquals(11, maxi.doubleValue(), 0.001);
- assertEquals(7, maxf.doubleValue(), 0.001);
- assertEquals(7.5, avgi.doubleValue(), 0.001);
- assertEquals(5.5, avgf.doubleValue(), 0.001);
- assertEquals(2, count.doubleValue(), 0.001);
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+ SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s");
+ CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
+
+ Bucket[] buckets = {new Bucket("a_s")};
+
+ Metric[] metrics = {new SumMetric("a_i"),
+ new SumMetric("a_f"),
+ new MinMetric("a_i"),
+ new MinMetric("a_f"),
+ new MaxMetric("a_i"),
+ new MaxMetric("a_f"),
+ new MeanMetric("a_i"),
+ new MeanMetric("a_f"),
+ new CountMetric()};
+
+ RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
+ ParallelStream parallelStream = parallelStream(rollupStream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+ attachStreamFactory(parallelStream);
+ parallelStream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(parallelStream);
+
+ assertEquals(3, tuples.size());
+
+ //Test Long and Double Sums
+
+ Tuple tuple = tuples.get(0);
+ String bucket = tuple.getString("a_s");
+ Double sumi = tuple.getDouble("sum(a_i)");
+ Double sumf = tuple.getDouble("sum(a_f)");
+ Double mini = tuple.getDouble("min(a_i)");
+ Double minf = tuple.getDouble("min(a_f)");
+ Double maxi = tuple.getDouble("max(a_i)");
+ Double maxf = tuple.getDouble("max(a_f)");
+ Double avgi = tuple.getDouble("avg(a_i)");
+ Double avgf = tuple.getDouble("avg(a_f)");
+ Double count = tuple.getDouble("count(*)");
+
+ assertEquals("hello0", bucket);
+ assertEquals(17, sumi.doubleValue(), 0.001);
+
<TRUNCATED>
[3/4] lucene-solr:branch_6x: SOLR-10274: Fix backport
Posted by jb...@apache.org.
SOLR-10274: Fix backport
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b03a7b1c
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b03a7b1c
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b03a7b1c
Branch: refs/heads/branch_6x
Commit: b03a7b1cfc9939206c8c802b3dca8ecbb6c2e94f
Parents: 740d967
Author: Joel Bernstein <jb...@apache.org>
Authored: Tue Apr 11 15:17:03 2017 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Tue Apr 11 22:41:48 2017 -0400
----------------------------------------------------------------------
.../client/solrj/io/stream/CloudSolrStream.java | 41 +-
.../client/solrj/io/stream/ParallelStream.java | 27 +-
.../client/solrj/io/stream/StreamContext.java | 4 +
.../client/solrj/io/stream/JDBCStreamTest.java | 308 +-
.../io/stream/SelectWithEvaluatorsTest.java | 37 +-
.../solrj/io/stream/StreamExpressionTest.java | 3783 ++++++++++--------
.../client/solrj/io/stream/StreamingTest.java | 2505 ++++++------
7 files changed, 3652 insertions(+), 3053 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b03a7b1c/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index 7161dc4..6d1764a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -26,8 +26,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@@ -35,7 +33,6 @@ import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
@@ -52,9 +49,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
@@ -178,9 +173,11 @@ public class CloudSolrStream extends TupleStream implements Expressible {
else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
}
+ /*
if(null == zkHost){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
}
+ */
// We've got all the required items
init(collectionName, zkHost, mParams);
@@ -299,14 +296,6 @@ public class CloudSolrStream extends TupleStream implements Expressible {
this.tuples = new TreeSet();
this.solrStreams = new ArrayList();
this.eofTuples = Collections.synchronizedMap(new HashMap());
- if (this.streamContext != null && this.streamContext.getSolrClientCache() != null) {
- this.cloudSolrClient = this.streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
- } else {
- this.cloudSolrClient = new Builder()
- .withZkHost(zkHost)
- .build();
- this.cloudSolrClient.connect();
- }
constructStreams();
openStreams();
}
@@ -400,29 +389,15 @@ public class CloudSolrStream extends TupleStream implements Expressible {
protected void constructStreams() throws IOException {
try {
- ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
- ClusterState clusterState = zkStateReader.getClusterState();
- Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, true);
+ List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams = adjustParams(mParams);
mParams.set(DISTRIB, "false"); // We are the aggregator.
- Set<String> liveNodes = clusterState.getLiveNodes();
- for(Slice slice : slices) {
- Collection<Replica> replicas = slice.getReplicas();
- List<Replica> shuffler = new ArrayList<>();
- for(Replica replica : replicas) {
- if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
- shuffler.add(replica);
- }
-
- Collections.shuffle(shuffler, new Random());
- Replica rep = shuffler.get(0);
- ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
- String url = zkProps.getCoreUrl();
- SolrStream solrStream = new SolrStream(url, mParams);
+ for(String shardUrl : shardUrls) {
+ SolrStream solrStream = new SolrStream(shardUrl, mParams);
if(streamContext != null) {
solrStream.setStreamContext(streamContext);
}
@@ -468,12 +443,6 @@ public class CloudSolrStream extends TupleStream implements Expressible {
solrStream.close();
}
}
-
- if ((this.streamContext == null || this.streamContext.getSolrClientCache() == null) &&
- cloudSolrClient != null) {
-
- cloudSolrClient.close();
- }
}
/** Return the stream sort - ie, the order in which records are returned */
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b03a7b1c/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
index 87e1354..def4c03 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
@@ -263,27 +263,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
try {
Object pushStream = ((Expressible) tupleStream).toExpression(streamFactory);
- ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
-
- Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, true);
-
- ClusterState clusterState = zkStateReader.getClusterState();
- Set<String> liveNodes = clusterState.getLiveNodes();
-
- List<Replica> shuffler = new ArrayList<>();
- for(Slice slice : slices) {
- Collection<Replica> replicas = slice.getReplicas();
- for (Replica replica : replicas) {
- if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
- shuffler.add(replica);
- }
- }
-
- if(workers > shuffler.size()) {
- throw new IOException("Number of workers exceeds nodes in the worker collection");
- }
-
- Collections.shuffle(shuffler, new Random());
+ List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
for(int w=0; w<workers; w++) {
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
@@ -293,9 +273,8 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
paramsLoc.set("expr", pushStream.toString());
paramsLoc.set("qt","/stream");
- Replica rep = shuffler.get(w);
- ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
- String url = zkProps.getCoreUrl();
+
+ String url = shardUrls.get(w);
SolrStream solrStream = new SolrStream(url, paramsLoc);
solrStreams.add(solrStream);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b03a7b1c/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
index 6cbf090..d1460ea 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
@@ -50,6 +50,10 @@ public class StreamContext implements Serializable{
this.entries.put(key, value);
}
+ public boolean containsKey(Object key) {
+ return entries.containsKey(key);
+ }
+
public Map getEntries() {
return this.entries;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b03a7b1c/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
index e55c837..9fff33a 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Locale;
import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
@@ -205,6 +206,10 @@ public class JDBCStreamTest extends SolrCloudTestCase {
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('AL', 'Algeria')");
}
+
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
// Load Solr
new UpdateRequest()
@@ -217,18 +222,25 @@ public class JDBCStreamTest extends SolrCloudTestCase {
.withFunctionName("search", CloudSolrStream.class);
List<Tuple> tuples;
-
- // Simple 1
- TupleStream jdbcStream = new JDBCStream("jdbc:hsqldb:mem:.", "select CODE,COUNTRY_NAME from COUNTRIES order by CODE", new FieldComparator("CODE", ComparatorOrder.ASCENDING));
- TupleStream selectStream = new SelectStream(jdbcStream, new HashMap<String, String>(){{ put("CODE", "code_s"); put("COUNTRY_NAME", "name_s"); }});
- TupleStream searchStream = factory.constructStream("search(" + COLLECTIONORALIAS + ", fl=\"code_s,name_s\",q=\"*:*\",sort=\"code_s asc\")");
- TupleStream mergeStream = new MergeStream(new FieldComparator("code_s", ComparatorOrder.ASCENDING), new TupleStream[]{selectStream,searchStream});
-
- tuples = getTuples(mergeStream);
-
- assertEquals(7, tuples.size());
- assertOrderOf(tuples, "code_s", "AL","CA","GB","NL","NO","NP","US");
- assertOrderOf(tuples, "name_s", "Algeria", "Canada", "Great Britian", "Netherlands", "Norway", "Nepal", "United States");
+
+ try {
+ // Simple 1
+ TupleStream jdbcStream = new JDBCStream("jdbc:hsqldb:mem:.", "select CODE,COUNTRY_NAME from COUNTRIES order by CODE", new FieldComparator("CODE", ComparatorOrder.ASCENDING));
+ TupleStream selectStream = new SelectStream(jdbcStream, new HashMap<String, String>() {{
+ put("CODE", "code_s");
+ put("COUNTRY_NAME", "name_s");
+ }});
+ TupleStream searchStream = factory.constructStream("search(" + COLLECTIONORALIAS + ", fl=\"code_s,name_s\",q=\"*:*\",sort=\"code_s asc\")");
+ TupleStream mergeStream = new MergeStream(new FieldComparator("code_s", ComparatorOrder.ASCENDING), new TupleStream[]{selectStream, searchStream});
+ mergeStream.setStreamContext(streamContext);
+ tuples = getTuples(mergeStream);
+
+ assertEquals(7, tuples.size());
+ assertOrderOf(tuples, "code_s", "AL", "CA", "GB", "NL", "NO", "NP", "US");
+ assertOrderOf(tuples, "name_s", "Algeria", "Canada", "Great Britian", "Netherlands", "Norway", "Nepal", "United States");
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -277,32 +289,41 @@ public class JDBCStreamTest extends SolrCloudTestCase {
String expression;
TupleStream stream;
List<Tuple> tuples;
-
- // Basic test
- expression =
- "innerJoin("
- + " select("
- + " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
- + " personId_i as personId,"
- + " rating_f as rating"
- + " ),"
- + " select("
- + " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
- + " ID as personId,"
- + " NAME as personName,"
- + " COUNTRY_NAME as country"
- + " ),"
- + " on=\"personId\""
- + ")";
-
- stream = factory.constructStream(expression);
- tuples = getTuples(stream);
-
- assertEquals(10, tuples.size());
- assertOrderOf(tuples, "personId", 11,12,13,14,15,16,17,18,19,20);
- assertOrderOf(tuples, "rating", 3.5d,5d,2.2d,4.3d,3.5d,3d,3d,4d,4.1d,4.8d);
- assertOrderOf(tuples, "personName", "Emma","Grace","Hailey","Isabella","Lily","Madison","Mia","Natalie","Olivia","Samantha");
- assertOrderOf(tuples, "country", "Netherlands","United States","Netherlands","Netherlands","Netherlands","United States","United States","Netherlands","Netherlands","United States");
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+ // Basic test
+ expression =
+ "innerJoin("
+ + " select("
+ + " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ + " personId_i as personId,"
+ + " rating_f as rating"
+ + " ),"
+ + " select("
+ + " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
+ + " ID as personId,"
+ + " NAME as personName,"
+ + " COUNTRY_NAME as country"
+ + " ),"
+ + " on=\"personId\""
+ + ")";
+
+
+ stream = factory.constructStream(expression);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assertEquals(10, tuples.size());
+ assertOrderOf(tuples, "personId", 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
+ assertOrderOf(tuples, "rating", 3.5d, 5d, 2.2d, 4.3d, 3.5d, 3d, 3d, 4d, 4.1d, 4.8d);
+ assertOrderOf(tuples, "personName", "Emma", "Grace", "Hailey", "Isabella", "Lily", "Madison", "Mia", "Natalie", "Olivia", "Samantha");
+ assertOrderOf(tuples, "country", "Netherlands", "United States", "Netherlands", "Netherlands", "Netherlands", "United States", "United States", "Netherlands", "Netherlands", "United States");
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -351,58 +372,67 @@ public class JDBCStreamTest extends SolrCloudTestCase {
String expression;
TupleStream stream;
List<Tuple> tuples;
-
- // Basic test for no alias
- expression =
- "innerJoin("
- + " select("
- + " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
- + " personId_i as personId,"
- + " rating_f as rating"
- + " ),"
- + " select("
- + " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
- + " ID as personId,"
- + " NAME as personName,"
- + " COUNTRY_NAME as country"
- + " ),"
- + " on=\"personId\""
- + ")";
-
- stream = factory.constructStream(expression);
- tuples = getTuples(stream);
-
- assertEquals(10, tuples.size());
- assertOrderOf(tuples, "personId", 11,12,13,14,15,16,17,18,19,20);
- assertOrderOf(tuples, "rating", 3.5d,5d,2.2d,4.3d,3.5d,3d,3d,4d,4.1d,4.8d);
- assertOrderOf(tuples, "personName", "Emma","Grace","Hailey","Isabella","Lily","Madison","Mia","Natalie","Olivia","Samantha");
- assertOrderOf(tuples, "country", "Netherlands","United States","Netherlands","Netherlands","Netherlands","United States","United States","Netherlands","Netherlands","United States");
-
- // Basic test for alias
- expression =
- "innerJoin("
- + " select("
- + " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
- + " personId_i as personId,"
- + " rating_f as rating"
- + " ),"
- + " select("
- + " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID as PERSONID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"PERSONID asc\"),"
- + " PERSONID as personId,"
- + " NAME as personName,"
- + " COUNTRY_NAME as country"
- + " ),"
- + " on=\"personId\""
- + ")";
-
- stream = factory.constructStream(expression);
- tuples = getTuples(stream);
-
- assertEquals(10, tuples.size());
- assertOrderOf(tuples, "personId", 11,12,13,14,15,16,17,18,19,20);
- assertOrderOf(tuples, "rating", 3.5d,5d,2.2d,4.3d,3.5d,3d,3d,4d,4.1d,4.8d);
- assertOrderOf(tuples, "personName", "Emma","Grace","Hailey","Isabella","Lily","Madison","Mia","Natalie","Olivia","Samantha");
- assertOrderOf(tuples, "country", "Netherlands","United States","Netherlands","Netherlands","Netherlands","United States","United States","Netherlands","Netherlands","United States");
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+ // Basic test for no alias
+ expression =
+ "innerJoin("
+ + " select("
+ + " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ + " personId_i as personId,"
+ + " rating_f as rating"
+ + " ),"
+ + " select("
+ + " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
+ + " ID as personId,"
+ + " NAME as personName,"
+ + " COUNTRY_NAME as country"
+ + " ),"
+ + " on=\"personId\""
+ + ")";
+
+ stream = factory.constructStream(expression);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assertEquals(10, tuples.size());
+ assertOrderOf(tuples, "personId", 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
+ assertOrderOf(tuples, "rating", 3.5d, 5d, 2.2d, 4.3d, 3.5d, 3d, 3d, 4d, 4.1d, 4.8d);
+ assertOrderOf(tuples, "personName", "Emma", "Grace", "Hailey", "Isabella", "Lily", "Madison", "Mia", "Natalie", "Olivia", "Samantha");
+ assertOrderOf(tuples, "country", "Netherlands", "United States", "Netherlands", "Netherlands", "Netherlands", "United States", "United States", "Netherlands", "Netherlands", "United States");
+
+ // Basic test for alias
+ expression =
+ "innerJoin("
+ + " select("
+ + " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ + " personId_i as personId,"
+ + " rating_f as rating"
+ + " ),"
+ + " select("
+ + " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID as PERSONID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"PERSONID asc\"),"
+ + " PERSONID as personId,"
+ + " NAME as personName,"
+ + " COUNTRY_NAME as country"
+ + " ),"
+ + " on=\"personId\""
+ + ")";
+
+ stream = factory.constructStream(expression);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assertEquals(10, tuples.size());
+ assertOrderOf(tuples, "personId", 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
+ assertOrderOf(tuples, "rating", 3.5d, 5d, 2.2d, 4.3d, 3.5d, 3d, 3d, 4d, 4.1d, 4.8d);
+ assertOrderOf(tuples, "personName", "Emma", "Grace", "Hailey", "Isabella", "Lily", "Madison", "Mia", "Natalie", "Olivia", "Samantha");
+ assertOrderOf(tuples, "country", "Netherlands", "United States", "Netherlands", "Netherlands", "Netherlands", "United States", "United States", "Netherlands", "Netherlands", "United States");
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -439,7 +469,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (19,'Olivia','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (20,'Samantha','US')");
}
-
+
// Load solr data
new UpdateRequest()
.add(id, "1", "rating_f", "3.5", "personId_i", "11")
@@ -457,50 +487,58 @@ public class JDBCStreamTest extends SolrCloudTestCase {
String expression;
TupleStream stream;
List<Tuple> tuples;
-
- // Basic test
- expression =
- "rollup("
- + " hashJoin("
- + " hashed=select("
- + " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
- + " personId_i as personId,"
- + " rating_f as rating"
- + " ),"
- + " select("
- + " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by COUNTRIES.COUNTRY_NAME\", sort=\"COUNTRIES.COUNTRY_NAME asc\"),"
- + " ID as personId,"
- + " NAME as personName,"
- + " COUNTRY_NAME as country"
- + " ),"
- + " on=\"personId\""
- + " ),"
- + " over=\"country\","
- + " max(rating),"
- + " min(rating),"
- + " avg(rating),"
- + " count(*)"
- + ")";
-
- stream = factory.constructStream(expression);
- tuples = getTuples(stream);
-
- assertEquals(2, tuples.size());
-
- Tuple tuple = tuples.get(0);
- assertEquals("Netherlands",tuple.getString("country"));
- assertTrue(4.3D == tuple.getDouble("max(rating)"));
- assertTrue(2.2D == tuple.getDouble("min(rating)"));
- assertTrue(3.6D == tuple.getDouble("avg(rating)"));
- assertTrue(6D == tuple.getDouble("count(*)"));
-
- tuple = tuples.get(1);
- assertEquals("United States",tuple.getString("country"));
- assertTrue(5D == tuple.getDouble("max(rating)"));
- assertTrue(3D == tuple.getDouble("min(rating)"));
- assertTrue(3.95D == tuple.getDouble("avg(rating)"));
- assertTrue(4D == tuple.getDouble("count(*)"));
-
+
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+ // Basic test
+ expression =
+ "rollup("
+ + " hashJoin("
+ + " hashed=select("
+ + " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ + " personId_i as personId,"
+ + " rating_f as rating"
+ + " ),"
+ + " select("
+ + " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by COUNTRIES.COUNTRY_NAME\", sort=\"COUNTRIES.COUNTRY_NAME asc\"),"
+ + " ID as personId,"
+ + " NAME as personName,"
+ + " COUNTRY_NAME as country"
+ + " ),"
+ + " on=\"personId\""
+ + " ),"
+ + " over=\"country\","
+ + " max(rating),"
+ + " min(rating),"
+ + " avg(rating),"
+ + " count(*)"
+ + ")";
+
+ stream = factory.constructStream(expression);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assertEquals(2, tuples.size());
+
+ Tuple tuple = tuples.get(0);
+ assertEquals("Netherlands", tuple.getString("country"));
+ assertTrue(4.3D == tuple.getDouble("max(rating)"));
+ assertTrue(2.2D == tuple.getDouble("min(rating)"));
+ assertTrue(3.6D == tuple.getDouble("avg(rating)"));
+ assertTrue(6D == tuple.getDouble("count(*)"));
+
+ tuple = tuples.get(1);
+ assertEquals("United States", tuple.getString("country"));
+ assertTrue(5D == tuple.getDouble("max(rating)"));
+ assertTrue(3D == tuple.getDouble("min(rating)"));
+ assertTrue(3.95D == tuple.getDouble("avg(rating)"));
+ assertTrue(4D == tuple.getDouble("count(*)"));
+ } finally {
+ solrClientCache.close();
+ }
}
@Test(expected=IOException.class)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b03a7b1c/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
index b91df8d..75bf92d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.eval.AddEvaluator;
import org.apache.solr.client.solrj.io.eval.GreaterThanEvaluator;
@@ -92,6 +93,9 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
String clause;
TupleStream stream;
List<Tuple> tuples;
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
@@ -101,21 +105,24 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
.withFunctionName("if", IfThenElseEvaluator.class)
.withFunctionName("gt", GreaterThanEvaluator.class)
;
-
- // Basic test
- clause = "select("
- + "id,"
- + "add(b_i,c_d) as result,"
- + "search(collection1, q=*:*, fl=\"id,a_s,b_i,c_d,d_b\", sort=\"id asc\")"
- + ")";
- stream = factory.constructStream(clause);
- tuples = getTuples(stream);
- assertFields(tuples, "id", "result");
- assertNotFields(tuples, "a_s", "b_i", "c_d", "d_b");
- assertEquals(1, tuples.size());
- assertDouble(tuples.get(0), "result", 4.3);
- assertEquals(4.3, tuples.get(0).get("result"));
-
+ try {
+ // Basic test
+ clause = "select("
+ + "id,"
+ + "add(b_i,c_d) as result,"
+ + "search(collection1, q=*:*, fl=\"id,a_s,b_i,c_d,d_b\", sort=\"id asc\")"
+ + ")";
+ stream = factory.constructStream(clause);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ assertFields(tuples, "id", "result");
+ assertNotFields(tuples, "a_s", "b_i", "c_d", "d_b");
+ assertEquals(1, tuples.size());
+ assertDouble(tuples.get(0), "result", 4.3);
+ assertEquals(4.3, tuples.get(0).get("result"));
+ } finally {
+ solrClientCache.close();
+ }
}
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
[2/4] lucene-solr:branch_6x: SOLR-10274: Fix backport
Posted by jb...@apache.org.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b03a7b1c/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index bb0bd7e..b69195a 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -127,63 +127,119 @@ public class StreamExpressionTest extends SolrCloudTestCase {
StreamExpression expression;
CloudSolrStream stream;
List<Tuple> tuples;
-
- // Basic test
- expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
- stream = new CloudSolrStream(expression, factory);
- tuples = getTuples(stream);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
- assert(tuples.size() == 5);
- assertOrder(tuples, 0, 2, 1, 3, 4);
- assertLong(tuples.get(0), "a_i", 0);
+ try {
+ // Basic test
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+ stream = new CloudSolrStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
- // Basic w/aliases
- expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"a_i=alias.a_i, a_s=name\")");
- stream = new CloudSolrStream(expression, factory);
- tuples = getTuples(stream);
+ assert (tuples.size() == 5);
+ assertOrder(tuples, 0, 2, 1, 3, 4);
+ assertLong(tuples.get(0), "a_i", 0);
- assert(tuples.size() == 5);
- assertOrder(tuples, 0, 2, 1, 3, 4);
- assertLong(tuples.get(0), "alias.a_i", 0);
- assertString(tuples.get(0), "name", "hello0");
-
- // Basic filtered test
- expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
- stream = new CloudSolrStream(expression, factory);
- tuples = getTuples(stream);
+ // Basic w/aliases
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"a_i=alias.a_i, a_s=name\")");
+ stream = new CloudSolrStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
- assert(tuples.size() == 3);
- assertOrder(tuples, 0, 3, 4);
- assertLong(tuples.get(1), "a_i", 3);
+ assert (tuples.size() == 5);
+ assertOrder(tuples, 0, 2, 1, 3, 4);
+ assertLong(tuples.get(0), "alias.a_i", 0);
+ assertString(tuples.get(0), "name", "hello0");
- try {
- expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+ // Basic filtered test
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
stream = new CloudSolrStream(expression, factory);
+ stream.setStreamContext(streamContext);
tuples = getTuples(stream);
- throw new Exception("Should be an exception here");
- } catch(Exception e) {
- assertTrue(e.getMessage().contains("q param expected for search function"));
- }
- try {
- expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"blah\", sort=\"a_f asc, a_i asc\")");
+ assert (tuples.size() == 3);
+ assertOrder(tuples, 0, 3, 4);
+ assertLong(tuples.get(1), "a_i", 3);
+
+ try {
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+ stream = new CloudSolrStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ throw new Exception("Should be an exception here");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("q param expected for search function"));
+ }
+
+ try {
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"blah\", sort=\"a_f asc, a_i asc\")");
+ stream = new CloudSolrStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ throw new Exception("Should be an exception here");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("fl param expected for search function"));
+ }
+
+ try {
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"blah\", fl=\"id, a_f\", sort=\"a_f\")");
+ stream = new CloudSolrStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ throw new Exception("Should be an exception here");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Invalid sort spec"));
+ }
+
+ // Test with shards param
+
+ List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext);
+
+ Map<String, List<String>> shardsMap = new HashMap();
+ shardsMap.put("myCollection", shardUrls);
+ StreamContext context = new StreamContext();
+ context.put("shards", shardsMap);
+ context.setSolrClientCache(solrClientCache);
+
+ // Basic test
+ expression = StreamExpressionParser.parse("search(myCollection, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
stream = new CloudSolrStream(expression, factory);
+ stream.setStreamContext(context);
tuples = getTuples(stream);
- throw new Exception("Should be an exception here");
- } catch(Exception e) {
- assertTrue(e.getMessage().contains("fl param expected for search function"));
- }
+ assert (tuples.size() == 5);
+ assertOrder(tuples, 0, 2, 1, 3, 4);
+ assertLong(tuples.get(0), "a_i", 0);
- try {
- expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"blah\", fl=\"id, a_f\", sort=\"a_f\")");
- stream = new CloudSolrStream(expression, factory);
+
+ //Execersise the /stream hander
+
+ //Add the shards http parameter for the myCollection
+ StringBuilder buf = new StringBuilder();
+ for (String shardUrl : shardUrls) {
+ if (buf.length() > 0) {
+ buf.append(",");
+ }
+ buf.append(shardUrl);
+ }
+
+ ModifiableSolrParams solrParams = new ModifiableSolrParams();
+ solrParams.add("qt", "/stream");
+ solrParams.add("expr", "search(myCollection, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+ solrParams.add("myCollection.shards", buf.toString());
+ SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams);
+ stream.setStreamContext(context);
tuples = getTuples(stream);
- throw new Exception("Should be an exception here");
- } catch(Exception e) {
- assertTrue(e.getMessage().contains("Invalid sort spec"));
- }
+ assert (tuples.size() == 5);
+ assertOrder(tuples, 0, 2, 1, 3, 4);
+ assertLong(tuples.get(0), "a_i", 0);
+
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -200,55 +256,66 @@ public class StreamExpressionTest extends SolrCloudTestCase {
StreamFactory factory = new StreamFactory();
StreamExpression expression;
CloudSolrStream stream;
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
List<Tuple> tuples;
-
- // Basic test
- expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", zkHost=" + cluster.getZkServer().getZkAddress() + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
- stream = new CloudSolrStream(expression, factory);
- tuples = getTuples(stream);
- assert(tuples.size() == 5);
- assertOrder(tuples, 0, 2, 1, 3, 4);
- assertLong(tuples.get(0), "a_i", 0);
+ try {
+ // Basic test
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", zkHost=" + cluster.getZkServer().getZkAddress() + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+ stream = new CloudSolrStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
- // Basic w/aliases
- expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"a_i=alias.a_i, a_s=name\", zkHost=" + cluster.getZkServer().getZkAddress() + ")");
- stream = new CloudSolrStream(expression, factory);
- tuples = getTuples(stream);
+ assert (tuples.size() == 5);
+ assertOrder(tuples, 0, 2, 1, 3, 4);
+ assertLong(tuples.get(0), "a_i", 0);
- assert(tuples.size() == 5);
- assertOrder(tuples, 0, 2, 1, 3, 4);
- assertLong(tuples.get(0), "alias.a_i", 0);
- assertString(tuples.get(0), "name", "hello0");
+ // Basic w/aliases
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"a_i=alias.a_i, a_s=name\", zkHost=" + cluster.getZkServer().getZkAddress() + ")");
+ stream = new CloudSolrStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
- // Basic filtered test
- expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", zkHost="
- + cluster.getZkServer().getZkAddress() + ", sort=\"a_f asc, a_i asc\")");
- stream = new CloudSolrStream(expression, factory);
- tuples = getTuples(stream);
+ assert (tuples.size() == 5);
+ assertOrder(tuples, 0, 2, 1, 3, 4);
+ assertLong(tuples.get(0), "alias.a_i", 0);
+ assertString(tuples.get(0), "name", "hello0");
- assert(tuples.size() == 3);
- assertOrder(tuples, 0, 3, 4);
- assertLong(tuples.get(1), "a_i", 3);
+ // Basic filtered test
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", zkHost="
+ + cluster.getZkServer().getZkAddress() + ", sort=\"a_f asc, a_i asc\")");
+ stream = new CloudSolrStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ assert (tuples.size() == 3);
+ assertOrder(tuples, 0, 3, 4);
+ assertLong(tuples.get(1), "a_i", 3);
- // Test a couple of multile field lists.
- expression = StreamExpressionParser.parse("search(collection1, fq=\"a_s:hello0\", fq=\"a_s:hello1\", q=\"id:(*)\", " +
- "zkHost=" + cluster.getZkServer().getZkAddress()+ ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
- stream = new CloudSolrStream(expression, factory);
- tuples = getTuples(stream);
- assertEquals("fq clauses should have prevented any docs from coming back", tuples.size(), 0);
+ // Test a couple of multile field lists.
+ expression = StreamExpressionParser.parse("search(collection1, fq=\"a_s:hello0\", fq=\"a_s:hello1\", q=\"id:(*)\", " +
+ "zkHost=" + cluster.getZkServer().getZkAddress() + ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+ stream = new CloudSolrStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ assertEquals("fq clauses should have prevented any docs from coming back", tuples.size(), 0);
- expression = StreamExpressionParser.parse("search(collection1, fq=\"a_s:(hello0 OR hello1)\", q=\"id:(*)\", " +
- "zkHost=" + cluster.getZkServer().getZkAddress() + ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
- stream = new CloudSolrStream(expression, factory);
- tuples = getTuples(stream);
- assertEquals("Combining an f1 clause should show us 2 docs", tuples.size(), 2);
-
-
+ expression = StreamExpressionParser.parse("search(collection1, fq=\"a_s:(hello0 OR hello1)\", q=\"id:(*)\", " +
+ "zkHost=" + cluster.getZkServer().getZkAddress() + ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+ stream = new CloudSolrStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assertEquals("Combining an f1 clause should show us 2 docs", tuples.size(), 2);
+
+ } finally {
+ solrClientCache.close();
+ }
}
@@ -315,43 +382,53 @@ public class StreamExpressionTest extends SolrCloudTestCase {
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("unique", UniqueStream.class);
-
- // Basic test
- expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f\")");
- stream = new UniqueStream(expression, factory);
- tuples = getTuples(stream);
-
- assert(tuples.size() == 4);
- assertOrder(tuples, 0, 1, 3, 4);
- // Basic test desc
- expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"), over=\"a_f\")");
- stream = new UniqueStream(expression, factory);
- tuples = getTuples(stream);
-
- assert(tuples.size() == 4);
- assertOrder(tuples, 4, 3, 1, 2);
-
- // Basic w/multi comp
- expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f, a_i\")");
- stream = new UniqueStream(expression, factory);
- tuples = getTuples(stream);
-
- assert(tuples.size() == 5);
- assertOrder(tuples, 0,2,1,3,4);
-
- // full factory w/multi comp
- stream = factory.constructStream("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f, a_i\")");
- tuples = getTuples(stream);
-
- assert(tuples.size() == 5);
- assertOrder(tuples, 0, 2, 1, 3, 4);
+ try {
+ // Basic test
+ expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f\")");
+ stream = new UniqueStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ assert (tuples.size() == 4);
+ assertOrder(tuples, 0, 1, 3, 4);
+
+ // Basic test desc
+ expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"), over=\"a_f\")");
+ stream = new UniqueStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assert (tuples.size() == 4);
+ assertOrder(tuples, 4, 3, 1, 2);
+
+ // Basic w/multi comp
+ expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f, a_i\")");
+ stream = new UniqueStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assert (tuples.size() == 5);
+ assertOrder(tuples, 0, 2, 1, 3, 4);
+
+ // full factory w/multi comp
+ stream = factory.constructStream("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f, a_i\")");
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assert (tuples.size() == 5);
+ assertOrder(tuples, 0, 2, 1, 3, 4);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -369,30 +446,38 @@ public class StreamExpressionTest extends SolrCloudTestCase {
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
-
- StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
- .withFunctionName("search", CloudSolrStream.class)
- .withFunctionName("sort", SortStream.class);
-
- // Basic test
- stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc\")");
- tuples = getTuples(stream);
- assert(tuples.size() == 6);
- assertOrder(tuples, 0, 1, 5, 2, 3, 4);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+ try {
+ StreamFactory factory = new StreamFactory()
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+ .withFunctionName("search", CloudSolrStream.class)
+ .withFunctionName("sort", SortStream.class);
- // Basic test desc
- stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i desc\")");
- tuples = getTuples(stream);
- assert(tuples.size() == 6);
- assertOrder(tuples, 4,3,2,1,5,0);
-
- // Basic w/multi comp
- stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc, a_f desc\")");
- tuples = getTuples(stream);
- assert(tuples.size() == 6);
- assertOrder(tuples, 0,5,1,2,3,4);
+ // Basic test
+ stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc\")");
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ assert (tuples.size() == 6);
+ assertOrder(tuples, 0, 1, 5, 2, 3, 4);
+ // Basic test desc
+ stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i desc\")");
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ assert (tuples.size() == 6);
+ assertOrder(tuples, 4, 3, 2, 1, 5, 0);
+
+ // Basic w/multi comp
+ stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc, a_f desc\")");
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ assert (tuples.size() == 6);
+ assertOrder(tuples, 0, 5, 1, 2, 3, 4);
+ } finally {
+ solrClientCache.close();
+ }
}
@@ -411,17 +496,24 @@ public class StreamExpressionTest extends SolrCloudTestCase {
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("null", NullStream.class);
- // Basic test
- stream = factory.constructStream("null(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc\")");
- tuples = getTuples(stream);
- assertTrue(tuples.size() == 1);
- assertTrue(tuples.get(0).getLong("nullCount") == 6);
+ try {
+ // Basic test
+ stream = factory.constructStream("null(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc\")");
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ assertTrue(tuples.size() == 1);
+ assertTrue(tuples.get(0).getLong("nullCount") == 6);
+ } finally {
+ solrClientCache.close();
+ }
}
@@ -440,6 +532,9 @@ public class StreamExpressionTest extends SolrCloudTestCase {
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
@@ -447,18 +542,23 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.withFunctionName("null", NullStream.class)
.withFunctionName("parallel", ParallelStream.class);
- // Basic test
- stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"nullCount desc\", null(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), by=\"a_i asc\"))");
- tuples = getTuples(stream);
- assertTrue(tuples.size() == 2);
- long nullCount = 0;
- for(Tuple t : tuples) {
- nullCount += t.getLong("nullCount");
- }
+ try {
- assertEquals(nullCount, 6L);
- }
+ // Basic test
+ stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"nullCount desc\", null(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), by=\"a_i asc\"))");
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ assertTrue(tuples.size() == 2);
+ long nullCount = 0;
+ for (Tuple t : tuples) {
+ nullCount += t.getLong("nullCount");
+ }
+ assertEquals(nullCount, 6L);
+ } finally {
+ solrClientCache.close();
+ }
+ }
@Test
public void testNulls() throws Exception {
@@ -475,49 +575,59 @@ public class StreamExpressionTest extends SolrCloudTestCase {
TupleStream stream;
List<Tuple> tuples;
Tuple tuple;
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class);
- // Basic test
- expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_i asc\")");
- stream = new CloudSolrStream(expression, factory);
- tuples = getTuples(stream);
+ try {
+ // Basic test
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_i asc\")");
+ stream = new CloudSolrStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
- assert(tuples.size() == 5);
- assertOrder(tuples, 4, 0, 1, 2, 3);
+ assert (tuples.size() == 5);
+ assertOrder(tuples, 4, 0, 1, 2, 3);
- tuple = tuples.get(0);
- assertTrue("hello4".equals(tuple.getString("a_s")));
- assertNull(tuple.get("s_multi"));
- assertNull(tuple.get("i_multi"));
- assertNull(tuple.getLong("a_i"));
+ tuple = tuples.get(0);
+ assertTrue("hello4".equals(tuple.getString("a_s")));
+ assertNull(tuple.get("s_multi"));
+ assertNull(tuple.get("i_multi"));
+ assertNull(tuple.getLong("a_i"));
- tuple = tuples.get(1);
- assertNull(tuple.get("a_s"));
- List<String> strings = tuple.getStrings("s_multi");
- assertNotNull(strings);
- assertEquals("aaa", strings.get(0));
- assertEquals("bbb", strings.get(1));
- List<Long> longs = tuple.getLongs("i_multi");
- assertNotNull(longs);
-
- //test sort (asc) with null string field. Null should sort to the top.
- expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_s asc\")");
- stream = new CloudSolrStream(expression, factory);
- tuples = getTuples(stream);
+ tuple = tuples.get(1);
+ assertNull(tuple.get("a_s"));
+ List<String> strings = tuple.getStrings("s_multi");
+ assertNotNull(strings);
+ assertEquals("aaa", strings.get(0));
+ assertEquals("bbb", strings.get(1));
+ List<Long> longs = tuple.getLongs("i_multi");
+ assertNotNull(longs);
- assert(tuples.size() == 5);
- assertOrder(tuples, 0, 1, 2, 3, 4);
+ //test sort (asc) with null string field. Null should sort to the top.
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_s asc\")");
+ stream = new CloudSolrStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
- //test sort(desc) with null string field. Null should sort to the bottom.
- expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_s desc\")");
- stream = new CloudSolrStream(expression, factory);
- tuples = getTuples(stream);
+ assert (tuples.size() == 5);
+ assertOrder(tuples, 0, 1, 2, 3, 4);
- assert(tuples.size() == 5);
- assertOrder(tuples, 4, 3, 2, 1, 0);
+ //test sort(desc) with null string field. Null should sort to the bottom.
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_s desc\")");
+ stream = new CloudSolrStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ assert (tuples.size() == 5);
+ assertOrder(tuples, 4, 3, 2, 1, 0);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -546,55 +656,67 @@ public class StreamExpressionTest extends SolrCloudTestCase {
+ "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"),"
+ "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"),"
+ "on=\"a_f asc\")");
- stream = new MergeStream(expression, factory);
- tuples = getTuples(stream);
-
- assert(tuples.size() == 4);
- assertOrder(tuples, 0, 1, 3, 4);
- // Basic test desc
- expression = StreamExpressionParser.parse("merge("
- + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
- + "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
- + "on=\"a_f desc\")");
stream = new MergeStream(expression, factory);
- tuples = getTuples(stream);
-
- assert(tuples.size() == 4);
- assertOrder(tuples, 4, 3, 1, 0);
-
- // Basic w/multi comp
- expression = StreamExpressionParser.parse("merge("
- + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
- + "search(" + COLLECTIONORALIAS + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
- + "on=\"a_f asc, a_s asc\")");
- stream = new MergeStream(expression, factory);
- tuples = getTuples(stream);
-
- assert(tuples.size() == 5);
- assertOrder(tuples, 0, 2, 1, 3, 4);
-
- // full factory w/multi comp
- stream = factory.constructStream("merge("
- + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
- + "search(" + COLLECTIONORALIAS + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
- + "on=\"a_f asc, a_s asc\")");
- tuples = getTuples(stream);
-
- assert(tuples.size() == 5);
- assertOrder(tuples, 0, 2, 1, 3, 4);
-
- // full factory w/multi streams
- stream = factory.constructStream("merge("
- + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
- + "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
- + "search(" + COLLECTIONORALIAS + ", q=\"id:(2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
- + "on=\"a_f asc\")");
- tuples = getTuples(stream);
-
- assert(tuples.size() == 4);
- assertOrder(tuples, 0, 2, 1, 4);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+ try {
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assert (tuples.size() == 4);
+ assertOrder(tuples, 0, 1, 3, 4);
+
+ // Basic test desc
+ expression = StreamExpressionParser.parse("merge("
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+ + "on=\"a_f desc\")");
+ stream = new MergeStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assert (tuples.size() == 4);
+ assertOrder(tuples, 4, 3, 1, 0);
+
+ // Basic w/multi comp
+ expression = StreamExpressionParser.parse("merge("
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ + "on=\"a_f asc, a_s asc\")");
+ stream = new MergeStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assert (tuples.size() == 5);
+ assertOrder(tuples, 0, 2, 1, 3, 4);
+
+ // full factory w/multi comp
+ stream = factory.constructStream("merge("
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ + "on=\"a_f asc, a_s asc\")");
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assert (tuples.size() == 5);
+ assertOrder(tuples, 0, 2, 1, 3, 4);
+
+ // full factory w/multi streams
+ stream = factory.constructStream("merge("
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ + "on=\"a_f asc\")");
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ assert (tuples.size() == 4);
+ assertOrder(tuples, 0, 2, 1, 4);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -611,61 +733,70 @@ public class StreamExpressionTest extends SolrCloudTestCase {
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("top", RankStream.class);
-
- // Basic test
- expression = StreamExpressionParser.parse("top("
- + "n=3,"
- + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
- + "sort=\"a_f asc, a_i asc\")");
- stream = new RankStream(expression, factory);
- tuples = getTuples(stream);
-
- assert(tuples.size() == 3);
- assertOrder(tuples, 0, 2, 1);
+ try {
+ // Basic test
+ expression = StreamExpressionParser.parse("top("
+ + "n=3,"
+ + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
+ + "sort=\"a_f asc, a_i asc\")");
+ stream = new RankStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
- // Basic test desc
- expression = StreamExpressionParser.parse("top("
- + "n=2,"
- + "unique("
- + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
- + "over=\"a_f\"),"
- + "sort=\"a_f desc\")");
- stream = new RankStream(expression, factory);
- tuples = getTuples(stream);
-
- assert(tuples.size() == 2);
- assertOrder(tuples, 4, 3);
-
- // full factory
- stream = factory.constructStream("top("
- + "n=4,"
- + "unique("
- + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
- + "over=\"a_f\"),"
- + "sort=\"a_f asc\")");
- tuples = getTuples(stream);
-
- assert(tuples.size() == 4);
- assertOrder(tuples, 0,1,3,4);
+ assert (tuples.size() == 3);
+ assertOrder(tuples, 0, 2, 1);
+
+ // Basic test desc
+ expression = StreamExpressionParser.parse("top("
+ + "n=2,"
+ + "unique("
+ + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+ + "over=\"a_f\"),"
+ + "sort=\"a_f desc\")");
+ stream = new RankStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
- // full factory, switch order
- stream = factory.constructStream("top("
- + "n=4,"
- + "unique("
- + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"),"
- + "over=\"a_f\"),"
- + "sort=\"a_f asc\")");
- tuples = getTuples(stream);
-
- assert(tuples.size() == 4);
- assertOrder(tuples, 2,1,3,4);
+ assert (tuples.size() == 2);
+ assertOrder(tuples, 4, 3);
+
+ // full factory
+ stream = factory.constructStream("top("
+ + "n=4,"
+ + "unique("
+ + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
+ + "over=\"a_f\"),"
+ + "sort=\"a_f asc\")");
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assert (tuples.size() == 4);
+ assertOrder(tuples, 0, 1, 3, 4);
+
+ // full factory, switch order
+ stream = factory.constructStream("top("
+ + "n=4,"
+ + "unique("
+ + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"),"
+ + "over=\"a_f\"),"
+ + "sort=\"a_f asc\")");
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ assert (tuples.size() == 4);
+ assertOrder(tuples, 2, 1, 3, 4);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -735,7 +866,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
//Exercise the /stream handler
ModifiableSolrParams sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream"));
- sParams.add("expr", "random(" + COLLECTIONORALIAS + ", q=\"*:*\", rows=\"1\", fl=\"id, a_i\")" );
+ sParams.add("expr", "random(" + COLLECTIONORALIAS + ", q=\"*:*\", rows=\"1\", fl=\"id, a_i\")");
JettySolrRunner jetty = cluster.getJettySolrRunner(0);
SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams);
List<Tuple> tuples4 = getTuples(solrStream);
@@ -767,61 +898,69 @@ public class StreamExpressionTest extends SolrCloudTestCase {
List<Tuple> tuples;
Tuple t0, t1, t2;
List<Map> maps0, maps1, maps2;
-
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("reduce", ReducerStream.class)
.withFunctionName("group", GroupOperation.class);
- // basic
- expression = StreamExpressionParser.parse("reduce("
- + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
- + "by=\"a_s\","
- + "group(sort=\"a_f desc\", n=\"4\"))");
+ try {
+ // basic
+ expression = StreamExpressionParser.parse("reduce("
+ + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
+ + "by=\"a_s\","
+ + "group(sort=\"a_f desc\", n=\"4\"))");
- stream = factory.constructStream(expression);
- tuples = getTuples(stream);
+ stream = factory.constructStream(expression);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
- assert(tuples.size() == 3);
+ assert (tuples.size() == 3);
- t0 = tuples.get(0);
- maps0 = t0.getMaps("group");
- assertMaps(maps0, 9, 1, 2, 0);
+ t0 = tuples.get(0);
+ maps0 = t0.getMaps("group");
+ assertMaps(maps0, 9, 1, 2, 0);
- t1 = tuples.get(1);
- maps1 = t1.getMaps("group");
- assertMaps(maps1, 8, 7, 5, 3);
+ t1 = tuples.get(1);
+ maps1 = t1.getMaps("group");
+ assertMaps(maps1, 8, 7, 5, 3);
- t2 = tuples.get(2);
- maps2 = t2.getMaps("group");
- assertMaps(maps2, 6, 4);
-
- // basic w/spaces
- expression = StreamExpressionParser.parse("reduce("
- + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
- + "by=\"a_s\"," +
- "group(sort=\"a_i asc\", n=\"2\"))");
- stream = factory.constructStream(expression);
- tuples = getTuples(stream);
+ t2 = tuples.get(2);
+ maps2 = t2.getMaps("group");
+ assertMaps(maps2, 6, 4);
- assert(tuples.size() == 3);
+ // basic w/spaces
+ expression = StreamExpressionParser.parse("reduce("
+ + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
+ + "by=\"a_s\"," +
+ "group(sort=\"a_i asc\", n=\"2\"))");
+ stream = factory.constructStream(expression);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
- t0 = tuples.get(0);
- maps0 = t0.getMaps("group");
- assert(maps0.size() == 2);
+ assert (tuples.size() == 3);
- assertMaps(maps0, 0, 1);
+ t0 = tuples.get(0);
+ maps0 = t0.getMaps("group");
+ assert (maps0.size() == 2);
- t1 = tuples.get(1);
- maps1 = t1.getMaps("group");
- assertMaps(maps1, 3, 5);
+ assertMaps(maps0, 0, 1);
- t2 = tuples.get(2);
- maps2 = t2.getMaps("group");
- assertMaps(maps2, 4, 6);
+ t1 = tuples.get(1);
+ maps1 = t1.getMaps("group");
+ assertMaps(maps1, 3, 5);
+ t2 = tuples.get(2);
+ maps2 = t2.getMaps("group");
+ assertMaps(maps2, 4, 6);
+ } finally {
+ solrClientCache.close();
+ }
}
@@ -1158,66 +1297,76 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "9", "a_s", "hello0", "a_i", "9", "a_f", "10", "subject", "blah blah blah 9")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
TupleStream stream;
List<Tuple> tuples;
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
- .withFunctionName("parallel", ParallelStream.class)
- .withFunctionName("fetch", FetchStream.class);
-
- stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ", search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))");
- tuples = getTuples(stream);
-
- assert(tuples.size() == 10);
- Tuple t = tuples.get(0);
- assertTrue("blah blah blah 0".equals(t.getString("subject")));
- t = tuples.get(1);
- assertTrue("blah blah blah 2".equals(t.getString("subject")));
- t = tuples.get(2);
- assertTrue("blah blah blah 3".equals(t.getString("subject")));
- t = tuples.get(3);
- assertTrue("blah blah blah 4".equals(t.getString("subject")));
- t = tuples.get(4);
- assertTrue("blah blah blah 1".equals(t.getString("subject")));
- t = tuples.get(5);
- assertTrue("blah blah blah 5".equals(t.getString("subject")));
- t = tuples.get(6);
- assertTrue("blah blah blah 6".equals(t.getString("subject")));
- t = tuples.get(7);
- assertTrue("blah blah blah 7".equals(t.getString("subject")));
- t = tuples.get(8);
- assertTrue("blah blah blah 8".equals(t.getString("subject")));
- t = tuples.get(9);
- assertTrue("blah blah blah 9".equals(t.getString("subject")));
-
-
- stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ", search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))");
- tuples = getTuples(stream);
-
- assert(tuples.size() == 10);
- t = tuples.get(0);
- assertTrue("blah blah blah 0".equals(t.getString("subject")));
- t = tuples.get(1);
- assertTrue("blah blah blah 2".equals(t.getString("subject")));
- t = tuples.get(2);
- assertTrue("blah blah blah 3".equals(t.getString("subject")));
- t = tuples.get(3);
- assertTrue("blah blah blah 4".equals(t.getString("subject")));
- t = tuples.get(4);
- assertTrue("blah blah blah 1".equals(t.getString("subject")));
- t = tuples.get(5);
- assertTrue("blah blah blah 5".equals(t.getString("subject")));
- t = tuples.get(6);
- assertTrue("blah blah blah 6".equals(t.getString("subject")));
- t = tuples.get(7);
- assertTrue("blah blah blah 7".equals(t.getString("subject")));
- t = tuples.get(8);
- assertTrue("blah blah blah 8".equals(t.getString("subject")));
- t = tuples.get(9);
- assertTrue("blah blah blah 9".equals(t.getString("subject")));
+ .withFunctionName("parallel", ParallelStream.class)
+ .withFunctionName("fetch", FetchStream.class);
+
+ try {
+
+ stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ", search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))");
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assert (tuples.size() == 10);
+ Tuple t = tuples.get(0);
+ assertTrue("blah blah blah 0".equals(t.getString("subject")));
+ t = tuples.get(1);
+ assertTrue("blah blah blah 2".equals(t.getString("subject")));
+ t = tuples.get(2);
+ assertTrue("blah blah blah 3".equals(t.getString("subject")));
+ t = tuples.get(3);
+ assertTrue("blah blah blah 4".equals(t.getString("subject")));
+ t = tuples.get(4);
+ assertTrue("blah blah blah 1".equals(t.getString("subject")));
+ t = tuples.get(5);
+ assertTrue("blah blah blah 5".equals(t.getString("subject")));
+ t = tuples.get(6);
+ assertTrue("blah blah blah 6".equals(t.getString("subject")));
+ t = tuples.get(7);
+ assertTrue("blah blah blah 7".equals(t.getString("subject")));
+ t = tuples.get(8);
+ assertTrue("blah blah blah 8".equals(t.getString("subject")));
+ t = tuples.get(9);
+ assertTrue("blah blah blah 9".equals(t.getString("subject")));
+
+
+ stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ", search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))");
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ assert (tuples.size() == 10);
+ t = tuples.get(0);
+ assertTrue("blah blah blah 0".equals(t.getString("subject")));
+ t = tuples.get(1);
+ assertTrue("blah blah blah 2".equals(t.getString("subject")));
+ t = tuples.get(2);
+ assertTrue("blah blah blah 3".equals(t.getString("subject")));
+ t = tuples.get(3);
+ assertTrue("blah blah blah 4".equals(t.getString("subject")));
+ t = tuples.get(4);
+ assertTrue("blah blah blah 1".equals(t.getString("subject")));
+ t = tuples.get(5);
+ assertTrue("blah blah blah 5".equals(t.getString("subject")));
+ t = tuples.get(6);
+ assertTrue("blah blah blah 6".equals(t.getString("subject")));
+ t = tuples.get(7);
+ assertTrue("blah blah blah 7".equals(t.getString("subject")));
+ t = tuples.get(8);
+ assertTrue("blah blah blah 8".equals(t.getString("subject")));
+ t = tuples.get(9);
+ assertTrue("blah blah blah 9".equals(t.getString("subject")));
+ } finally {
+ solrClientCache.close();
+ }
}
@@ -1260,87 +1409,91 @@ public class StreamExpressionTest extends SolrCloudTestCase {
+ "sum(a_i)"
+ "), id=\"test\", runInterval=\"1000\", queueSize=\"9\")");
daemonStream = (DaemonStream)factory.constructStream(expression);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+ daemonStream.setStreamContext(streamContext);
+ try {
+ //Test Long and Double Sums
+ daemonStream.open(); // This will start the daemon thread
- //Test Long and Double Sums
-
- daemonStream.open(); // This will start the daemon thread
-
- for(int i=0; i<4; i++) {
- Tuple tuple = daemonStream.read(); // Reads from the queue
- String bucket = tuple.getString("a_s");
- Double sumi = tuple.getDouble("sum(a_i)");
-
- //System.out.println("#################################### Bucket 1:"+bucket);
- assertTrue(bucket.equals("hello0"));
- assertTrue(sumi.doubleValue() == 17.0D);
-
- tuple = daemonStream.read();
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
-
- //System.out.println("#################################### Bucket 2:"+bucket);
- assertTrue(bucket.equals("hello3"));
- assertTrue(sumi.doubleValue() == 38.0D);
+ for (int i = 0; i < 4; i++) {
+ Tuple tuple = daemonStream.read(); // Reads from the queue
+ String bucket = tuple.getString("a_s");
+ Double sumi = tuple.getDouble("sum(a_i)");
- tuple = daemonStream.read();
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- //System.out.println("#################################### Bucket 3:"+bucket);
- assertTrue(bucket.equals("hello4"));
- assertTrue(sumi.longValue() == 15);
- }
+ //System.out.println("#################################### Bucket 1:"+bucket);
+ assertTrue(bucket.equals("hello0"));
+ assertTrue(sumi.doubleValue() == 17.0D);
- //Now lets wait until the internal queue fills up
+ tuple = daemonStream.read();
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
- while(daemonStream.remainingCapacity() > 0) {
- try {
- Thread.sleep(1000);
- } catch (Exception e) {
+ //System.out.println("#################################### Bucket 2:"+bucket);
+ assertTrue(bucket.equals("hello3"));
+ assertTrue(sumi.doubleValue() == 38.0D);
+ tuple = daemonStream.read();
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ //System.out.println("#################################### Bucket 3:"+bucket);
+ assertTrue(bucket.equals("hello4"));
+ assertTrue(sumi.longValue() == 15);
}
- }
-
- //OK capacity is full, let's index a new doc
- new UpdateRequest()
- .add(id, "10", "a_s", "hello0", "a_i", "1", "a_f", "10")
- .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+ //Now lets wait until the internal queue fills up
- //Now lets clear the existing docs in the queue 9, plus 3 more to get passed the run that was blocked. The next run should
- //have the tuples with the updated count.
- for(int i=0; i<12;i++) {
- daemonStream.read();
- }
+ while (daemonStream.remainingCapacity() > 0) {
+ try {
+ Thread.sleep(1000);
+ } catch (Exception e) {
- //And rerun the loop. It should have a new count for hello0
- for(int i=0; i<4; i++) {
- Tuple tuple = daemonStream.read(); // Reads from the queue
- String bucket = tuple.getString("a_s");
- Double sumi = tuple.getDouble("sum(a_i)");
+ }
+ }
- //System.out.println("#################################### Bucket 1:"+bucket);
- assertTrue(bucket.equals("hello0"));
- assertTrue(sumi.doubleValue() == 18.0D);
+ //OK capacity is full, let's index a new doc
- tuple = daemonStream.read();
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
+ new UpdateRequest()
+ .add(id, "10", "a_s", "hello0", "a_i", "1", "a_f", "10")
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- //System.out.println("#################################### Bucket 2:"+bucket);
- assertTrue(bucket.equals("hello3"));
- assertTrue(sumi.doubleValue() == 38.0D);
+ //Now lets clear the existing docs in the queue 9, plus 3 more to get passed the run that was blocked. The next run should
+ //have the tuples with the updated count.
+ for (int i = 0; i < 12; i++) {
+ daemonStream.read();
+ }
- tuple = daemonStream.read();
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- //System.out.println("#################################### Bucket 3:"+bucket);
- assertTrue(bucket.equals("hello4"));
- assertTrue(sumi.longValue() == 15);
+ //And rerun the loop. It should have a new count for hello0
+ for (int i = 0; i < 4; i++) {
+ Tuple tuple = daemonStream.read(); // Reads from the queue
+ String bucket = tuple.getString("a_s");
+ Double sumi = tuple.getDouble("sum(a_i)");
+
+ //System.out.println("#################################### Bucket 1:"+bucket);
+ assertTrue(bucket.equals("hello0"));
+ assertTrue(sumi.doubleValue() == 18.0D);
+
+ tuple = daemonStream.read();
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+
+ //System.out.println("#################################### Bucket 2:"+bucket);
+ assertTrue(bucket.equals("hello3"));
+ assertTrue(sumi.doubleValue() == 38.0D);
+
+ tuple = daemonStream.read();
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ //System.out.println("#################################### Bucket 3:"+bucket);
+ assertTrue(bucket.equals("hello4"));
+ assertTrue(sumi.longValue() == 15);
+ }
+ } finally {
+ daemonStream.close(); //This should stop the daemon thread
+ solrClientCache.close();
}
-
- daemonStream.close(); //This should stop the daemon thread
-
}
@@ -1412,96 +1565,103 @@ public class StreamExpressionTest extends SolrCloudTestCase {
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+ try {
+ expression = StreamExpressionParser.parse("rollup("
+ + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\"),"
+ + "over=\"a_s\","
+ + "sum(a_i),"
+ + "sum(a_f),"
+ + "min(a_i),"
+ + "min(a_f),"
+ + "max(a_i),"
+ + "max(a_f),"
+ + "avg(a_i),"
+ + "avg(a_f),"
+ + "count(*),"
+ + ")");
+ stream = factory.constructStream(expression);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
- expression = StreamExpressionParser.parse("rollup("
- + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\"),"
- + "over=\"a_s\","
- + "sum(a_i),"
- + "sum(a_f),"
- + "min(a_i),"
- + "min(a_f),"
- + "max(a_i),"
- + "max(a_f),"
- + "avg(a_i),"
- + "avg(a_f),"
- + "count(*),"
- + ")");
- stream = factory.constructStream(expression);
- tuples = getTuples(stream);
-
- assert(tuples.size() == 3);
-
- //Test Long and Double Sums
-
- Tuple tuple = tuples.get(0);
- String bucket = tuple.getString("a_s");
- Double sumi = tuple.getDouble("sum(a_i)");
- Double sumf = tuple.getDouble("sum(a_f)");
- Double mini = tuple.getDouble("min(a_i)");
- Double minf = tuple.getDouble("min(a_f)");
- Double maxi = tuple.getDouble("max(a_i)");
- Double maxf = tuple.getDouble("max(a_f)");
- Double avgi = tuple.getDouble("avg(a_i)");
- Double avgf = tuple.getDouble("avg(a_f)");
- Double count = tuple.getDouble("count(*)");
+ assert (tuples.size() == 3);
- assertTrue(bucket.equals("hello0"));
- assertTrue(sumi.doubleValue() == 17.0D);
- assertTrue(sumf.doubleValue() == 18.0D);
- assertTrue(mini.doubleValue() == 0.0D);
- assertTrue(minf.doubleValue() == 1.0D);
- assertTrue(maxi.doubleValue() == 14.0D);
- assertTrue(maxf.doubleValue() == 10.0D);
- assertTrue(avgi.doubleValue() == 4.25D);
- assertTrue(avgf.doubleValue() == 4.5D);
- assertTrue(count.doubleValue() == 4);
+ //Test Long and Double Sums
- tuple = tuples.get(1);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
+ Tuple tuple = tuples.get(0);
+ String bucket = tuple.getString("a_s");
+ Double sumi = tuple.getDouble("sum(a_i)");
+ Double sumf = tuple.getDouble("sum(a_f)");
+ Double mini = tuple.getDouble("min(a_i)");
+ Double minf = tuple.getDouble("min(a_f)");
+ Double maxi = tuple.getDouble("max(a_i)");
+ Double maxf = tuple.getDouble("max(a_f)");
+ Double avgi = tuple.getDouble("avg(a_i)");
+ Double avgf = tuple.getDouble("avg(a_f)");
+ Double count = tuple.getDouble("count(*)");
- assertTrue(bucket.equals("hello3"));
- assertTrue(sumi.doubleValue() == 38.0D);
- assertTrue(sumf.doubleValue() == 26.0D);
- assertTrue(mini.doubleValue() == 3.0D);
- assertTrue(minf.doubleValue() == 3.0D);
- assertTrue(maxi.doubleValue() == 13.0D);
- assertTrue(maxf.doubleValue() == 9.0D);
- assertTrue(avgi.doubleValue() == 9.5D);
- assertTrue(avgf.doubleValue() == 6.5D);
- assertTrue(count.doubleValue() == 4);
+ assertTrue(bucket.equals("hello0"));
+ assertTrue(sumi.doubleValue() == 17.0D);
+ assertTrue(sumf.doubleValue() == 18.0D);
+ assertTrue(mini.doubleValue() == 0.0D);
+ assertTrue(minf.doubleValue() == 1.0D);
+ assertTrue(maxi.doubleValue() == 14.0D);
+ assertTrue(maxf.doubleValue() == 10.0D);
+ assertTrue(avgi.doubleValue() == 4.25D);
+ assertTrue(avgf.doubleValue() == 4.5D);
+ assertTrue(count.doubleValue() == 4);
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
- tuple = tuples.get(2);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
+ assertTrue(bucket.equals("hello3"));
+ assertTrue(sumi.doubleValue() == 38.0D);
+ assertTrue(sumf.doubleValue() == 26.0D);
+ assertTrue(mini.doubleValue() == 3.0D);
+ assertTrue(minf.doubleValue() == 3.0D);
+ assertTrue(maxi.doubleValue() == 13.0D);
+ assertTrue(maxf.doubleValue() == 9.0D);
+ assertTrue(avgi.doubleValue() == 9.5D);
+ assertTrue(avgf.doubleValue() == 6.5D);
+ assertTrue(count.doubleValue() == 4);
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
- assertTrue(bucket.equals("hello4"));
- assertTrue(sumi.longValue() == 15);
- assertTrue(sumf.doubleValue() == 11.0D);
- assertTrue(mini.doubleValue() == 4.0D);
- assertTrue(minf.doubleValue() == 4.0D);
- assertTrue(maxi.doubleValue() == 11.0D);
- assertTrue(maxf.doubleValue() == 7.0D);
- assertTrue(avgi.doubleValue() == 7.5D);
- assertTrue(avgf.doubleValue() == 5.5D);
- assertTrue(count.doubleValue() == 2);
+ assertTrue(bucket.equals("hello4"));
+ assertTrue(sumi.longValue() == 15);
+ assertTrue(sumf.doubleValue() == 11.0D);
+ assertTrue(mini.doubleValue() == 4.0D);
+ assertTrue(minf.doubleValue() == 4.0D);
+ assertTrue(maxi.doubleValue() == 11.0D);
+ assertTrue(maxf.doubleValue() == 7.0D);
+ assertTrue(avgi.doubleValue() == 7.5D);
+ assertTrue(avgf.doubleValue() == 5.5D);
+ assertTrue(count.doubleValue() == 2);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -1588,18 +1748,27 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.withFunctionName("top", RankStream.class)
.withFunctionName("group", ReducerStream.class)
.withFunctionName("parallel", ParallelStream.class);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
- ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_f asc\")");
- List<Tuple> tuples = getTuples(pstream);
- assert(tuples.size() == 5);
- assertOrder(tuples, 0, 1, 3, 4, 6);
+ try {
- //Test the eofTuples
+ ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_f asc\")");
+ pstream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(pstream);
+ assert (tuples.size() == 5);
+ assertOrder(tuples, 0, 1, 3, 4, 6);
- Map<String,Tuple> eofTuples = pstream.getEofTuples();
- assert(eofTuples.size() == 2); //There should be an EOF tuple for each worker.
+ //Test the eofTuples
+ Map<String, Tuple> eofTuples = pstream.getEofTuples();
+ assert (eofTuples.size() == 2); //There should be an EOF tuple for each worker.
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -1666,23 +1835,32 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
String zkHost = cluster.getZkServer().getZkAddress();
StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, zkHost)
.withFunctionName("shuffle", ShuffleStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("parallel", ParallelStream.class);
- ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(shuffle(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_f asc\")");
-
- List<Tuple> tuples = getTuples(pstream);
- assert(tuples.size() == 6);
- assertOrder(tuples, 0, 1, 3, 4, 6, 56);
-
- //Test the eofTuples
-
- Map<String,Tuple> eofTuples = pstream.getEofTuples();
- assert(eofTuples.size() == 2); //There should be an EOF tuple for each worker.
- assert(pstream.toExpression(streamFactory).toString().contains("shuffle"));
+ try {
+ ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(shuffle(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_f asc\")");
+ pstream.setStreamFactory(streamFactory);
+ pstream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(pstream);
+ assert (tuples.size() == 6);
+ assertOrder(tuples, 0, 1, 3, 4, 6, 56);
+
+ //Test the eofTuples
+
+ Map<String, Tuple> eofTuples = pstream.getEofTuples();
+ assert (eofTuples.size() == 2); //There should be an EOF tuple for each worker.
+ assert (pstream.toExpression(streamFactory).toString().contains("shuffle"));
+ } finally {
+ solrClientCache.close();
+ }
}
@@ -1702,6 +1880,11 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+
String zkHost = cluster.getZkServer().getZkAddress();
StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, zkHost)
.withFunctionName("search", CloudSolrStream.class)
@@ -1709,54 +1892,62 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.withFunctionName("reduce", ReducerStream.class)
.withFunctionName("parallel", ParallelStream.class);
- ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", " +
- "reduce(" +
- "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), " +
- "by=\"a_s\"," +
- "group(sort=\"a_i asc\", n=\"5\")), " +
- "workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s asc\")");
- List<Tuple> tuples = getTuples(pstream);
+ try {
+ ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", " +
+ "reduce(" +
+ "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), " +
+ "by=\"a_s\"," +
+ "group(sort=\"a_i asc\", n=\"5\")), " +
+ "workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_s asc\")");
+
+ pstream.setStreamContext(streamContext);
- assert(tuples.size() == 3);
+ List<Tuple> tuples = getTuples(pstream);
- Tuple t0 = tuples.get(0);
- List<Map> maps0 = t0.getMaps("group");
- assertMaps(maps0, 0, 1, 2, 9);
+ assert (tuples.size() == 3);
- Tuple t1 = tuples.get(1);
- List<Map> maps1 = t1.getMaps("group");
- assertMaps(maps1, 3, 5, 7, 8);
+ Tuple t0 = tuples.get(0);
+ List<Map> maps0 = t0.getMaps("group");
+ assertMaps(maps0, 0, 1, 2, 9);
- Tuple t2 = tuples.get(2);
- List<Map> maps2 = t2.getMaps("group");
- assertMaps(maps2, 4, 6);
+ Tuple t1 = tuples.get(1);
+ List<Map> maps1 = t1.getMaps("group");
+ assertMaps(maps1, 3, 5, 7, 8);
+ Tuple t2 = tuples.get(2);
+ List<Map> maps2 = t2.getMaps("group");
+ assertMaps(maps2, 4, 6);
- pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", " +
- "reduce(" +
- "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), " +
- "by=\"a_s\", " +
- "group(sort=\"a_i desc\", n=\"5\")),"+
- "workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s desc\")");
- tuples = getTuples(pstream);
+ pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", " +
+ "reduce(" +
+ "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), " +
+ "by=\"a_s\", " +
+ "group(sort=\"a_i desc\", n=\"5\"))," +
+ "workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_s desc\")");
- assert(tuples.size() == 3);
+ pstream.setStreamContext(streamContext);
+ tuples = getTuples(pstream);
- t0 = tuples.get(0);
- maps0 = t0.getMaps("group");
- assertMaps(maps0, 6, 4);
+ assert (tuples.size() == 3);
+
+ t0 = tuples.get(0);
+ maps0 = t0.getMaps("group");
+ assertMaps(maps0, 6, 4);
- t1 = tuples.get(1);
- maps1 = t1.getMaps("group");
- assertMaps(maps1, 8, 7, 5, 3);
+ t1 = tuples.get(1);
+ maps1 = t1.getMaps("group");
+ assertMaps(maps1, 8, 7, 5, 3);
- t2 = tuples.get(2);
- maps2 = t2.getMaps("group");
- assertMaps(maps2, 9, 2, 1, 0);
+ t2 = tuples.get(2);
+ maps2 = t2.getMaps("group");
+ assertMaps(maps2, 9, 2, 1, 0);
+ } finally {
+ solrClientCache.close();
+ }
}
@@ -1784,17 +1975,24 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.withFunctionName("group", ReducerStream.class)
.withFunctionName("parallel", ParallelStream.class);
- ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel("
- + COLLECTIONORALIAS + ", "
- + "top("
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+ try {
+ ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel("
+ + COLLECTIONORALIAS + ", "
+ + "top("
+ "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), "
+ "n=\"11\", "
- + "sort=\"a_i desc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i desc\")");
-
- List<Tuple> tuples = getTuples(pstream);
+ + "sort=\"a_i desc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i desc\")");
+ pstream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(pstream);
- assert(tuples.size() == 10);
- assertOrder(tuples, 10,9,8,7,6,5,4,3,2,0);
+ assert (tuples.size() == 10);
+ assertOrder(tuples, 10, 9, 8, 7, 6, 5, 4, 3, 2, 0);
+ } finally {
+ solrClientCache.close();
+ }
}
@@ -1823,24 +2021,29 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.withFunctionName("merge", MergeStream.class)
.withFunctionName("parallel", ParallelStream.class);
- //Test ascending
- ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 7 9)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), on=\"a_i asc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i asc\")");
-
- List<Tuple> tuples = getTuples(pstream);
-
-
-
- assert(tuples.size() == 9);
- assertOrder(tuples, 0, 1, 2, 3, 4, 7, 6, 8, 9);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+ try {
+ //Test ascending
+ ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 7 9)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), on=\"a_i asc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i asc\")");
+ pstream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(pstream);
- //Test descending
+ assert (tuples.size() == 9);
+ assertOrder(tuples, 0, 1, 2, 3, 4, 7, 6, 8, 9);
- pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 9)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), on=\"a_i desc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i desc\")");
+ //Test descending
- tuples = getTuples(pstream);
+ pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 9)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), on=\"a_i desc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i desc\")");
+ pstream.setStreamContext(streamContext);
+ tuples = getTuples(pstream);
- assert(tuples.size() == 8);
- assertOrder(tuples, 9, 8, 6, 4, 3, 2, 1, 0);
+ assert (tuples.size() == 8);
+ assertOrder(tuples, 9, 8, 6, 4, 3, 2, 1, 0);
+ } finally {
+ solrClientCache.close();
+ }
}
@@ -1869,104 +2072,115 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.withFunctionName("min", MinMetric.class)
.withFunctionName("max", MaxMetric.class)
.withFunctionName("avg", MeanMetric.class)
- .withFunctionName("count", CountMetric.class);
-
- StreamExpression expression;
- TupleStream stream;
- List<Tuple> tuples;
+ .withFunctionName("count", CountMetric.class);
- expression = StreamExpressionParser.parse("parallel(" + COLLECTIONORALIAS + ","
- + "rollup("
- + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\", partitionKeys=\"a_s\"),"
- + "over=\"a_s\","
- + "sum(a_i),"
- + "sum(a_f),"
- + "min(a_i),"
- + "min(a_f),"
- + "max(a_i),"
- + "max(a_f),"
- + "avg(a_i),"
- + "avg(a_f),"
- + "count(*)"
- + "),"
- + "workers=\"2\", zkHost=\""+cluster.getZkServer().getZkAddress()+"\", sort=\"a_s asc\")"
- );
- stream = factory.constructStream(expression);
- tuples = getTuples(stream);
- assert(tuples.size() == 3);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
- //Test Long and Double Sums
+ StreamExpression expression;
+ TupleStream stream;
+ List<Tuple> tuples;
- Tuple tuple = tuples.get(0);
- String bucket = tuple.getString("a_s");
- Double sumi = tuple.getDouble("sum(a_i)");
- Double sumf = tuple.getDouble("sum(a_f)");
- Double mini = tuple.getDouble("min(a_i)");
- Double minf = tuple.getDouble("min(a_f)");
- Double maxi = tuple.getDouble("max(a_i)");
- Double maxf = tuple.getDouble("max(a_f)");
- Double avgi = tuple.getDouble("avg(a_i)");
- Double avgf = tuple.getDouble("avg(a_f)");
- Double count = tuple.getDouble("count(*)");
+ try {
+ expression = StreamExpressionParser.parse("parallel(" + COLLECTIONORALIAS + ","
+ + "rollup("
+ + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\", partitionKeys=\"a_s\"),"
+ + "over=\"a_s\","
+ + "sum(a_i),"
+ + "sum(a_f),"
+ + "min(a_i),"
+ + "min(a_f),"
+ + "max(a_i),"
+ + "max(a_f),"
+ + "avg(a_i),"
+ + "avg(a_f),"
+ + "count(*)"
+ + "),"
+ + "workers=\"2\", zkHost=\"" + cluster.getZkServer().getZkAddress() + "\", sort=\"a_s asc\")"
+ );
- assertTrue(bucket.equals("hello0"));
- assertTrue(sumi.doubleValue() == 17.0D);
- assertTrue(sumf.doubleValue() == 18.0D);
- assertTrue(mini.doubleValue() == 0.0D);
- assertTrue(minf.doubleValue() == 1.0D);
- assertTrue(maxi.doubleValue() == 14.0D);
- assertTrue(maxf.doubleValue() == 10.0D);
- assertTrue(avgi.doubleValue() == 4.25D);
- assertTrue(avgf.doubleValue() == 4.5D);
- assertTrue(count.doubleValue() == 4);
- tuple = tuples.get(1);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
+ stream = factory.constructStream(expression);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
- assertTrue(bucket.equals("hello3"));
- assertTrue(sumi.doubleValue() == 38.0D);
- assertTrue(sumf.doubleValue() == 26.0D);
- assertTrue(mini.doubleValue() == 3.0D);
- assertTrue(minf.doubleValue() == 3.0D);
- assertTrue(maxi.doubleValue() == 13.0D);
- assertTrue(maxf.doubleValue() == 9.0D);
- assertTrue(avgi.doubleValue() == 9.5D);
- assertTrue(avgf.doubleValue() == 6.5D);
- assertTrue(count.doubleValue() == 4);
+ assert (tuples.size() == 3);
- tuple = tuples.get(2);
- bucket = tuple.getString("a_s");
- sumi = tuple.getDouble("sum(a_i)");
- sumf = tuple.getDouble("sum(a_f)");
- mini = tuple.getDouble("min(a_i)");
- minf = tuple.getDouble("min(a_f)");
- maxi = tuple.getDouble("max(a_i)");
- maxf = tuple.getDouble("max(a_f)");
- avgi = tuple.getDouble("avg(a_i)");
- avgf = tuple.getDouble("avg(a_f)");
- count = tuple.getDouble("count(*)");
+ //Test Long and Double Sums
- assertTrue(bucket.equals("hello4"));
- assertTrue(sumi.longValue() == 15);
- assertTrue(sumf.doubleValue() == 11.0D);
- assertTrue(mini.doubleValue() == 4.0D);
- assertTrue(minf.doubleValue() == 4.0D);
- assertTrue(maxi.doubleValue() == 11.0D);
- assertTrue(maxf.doubleValue() == 7.0D);
- assertTrue(avgi.doubleValue() == 7.5D);
- assertTrue(avgf.doubleValue() == 5.5D);
- assertTrue(count.doubleValue() == 2);
+ Tuple tuple = tuples.get(0);
+ String bucket = tuple.getString("a_s");
+ Double sumi = tuple.getDouble("sum(a_i)");
+ Double sumf = tuple.getDouble("sum(a_f)");
+ Double mini = tuple.getDouble("min(a_i)");
+ Double minf = tuple.getDouble("min(a_f)");
+ Double maxi = tuple.getDouble("max(a_i)");
+ Double maxf = tuple.getDouble("max(a_f)");
+ Double avgi = tuple.getDouble("avg(a_i)");
+ Double avgf = tuple.getDouble("avg(a_f)");
+ Double count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello0"));
+ assertTrue(sumi.doubleValue() == 17.0D);
+ assertTrue(sumf.doubleValue() == 18.0D);
+ assertTrue(mini.doubleValue() == 0.0D);
+ assertTrue(minf.doubleValue() == 1.0D);
+ assertTrue(maxi.doubleValue() == 14.0D);
+ assertTrue(maxf.doubleValue() == 10.0D);
+ assertTrue(avgi.doubleValue() == 4.25D);
+ assertTrue(avgf.doubleValue() == 4.5D);
+ assertTrue(count.doubleValue() == 4);
+
+ tuple = tuples.get(1);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(bucket.equals("hello3"));
+ assertTrue(sumi.doubleValue() == 38.0D);
+ assertTrue(sumf.doubleValue() == 26.0D);
+ assertTrue(mini.doubleValue() == 3.0D);
+ assertTrue(minf.doubleValue() == 3.0D);
+ assertTrue(maxi.doubleValue() == 13.0D);
+ assertTrue(maxf.doubleValue() == 9.0D);
+ assertTrue(avgi.doubleValue() == 9.5D);
+ assertTrue(avgf.doubleValue() == 6.5D);
+ assertTrue(count.doubleValue() == 4);
+
+ tuple = tuples.get(2);
+ bucket = tuple.getString("a_s");
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+ assertTrue(bucket.equals("hello4"));
+ assertTrue(sumi.longValue() == 15);
+ assertTrue(sumf.doubleValue() == 11.0D);
+ assertTrue(mini.doubleValue() == 4.0D);
+ assertTrue(minf.doubleValue() == 4.0D);
+ assertTrue(maxi.doubleValue() == 11.0D);
+ assertTrue(maxf.doubleValue() == 7.0D);
+ assertTrue(avgi.doubleValue() == 7.5D);
+ assertTrue(avgf.doubleValue() == 5.5D);
+ assertTrue(count.doubleValue() == 2);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -1994,52 +2208,62 @@ public class StreamExpressionTest extends SolrCloudTestCase {
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("innerJoin", InnerJoinStream.class);
-
- // Basic test
- expression = StreamExpressionParser.parse("innerJoin("
- + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
- + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc\"),"
- + "on=\"join1_i=join1_i, join2_s=join2_s\")");
- stream = new InnerJoinStream(expression, factory);
- tuples = getTuples(stream);
- assert(tuples.size() == 8);
- assertOrder(tuples, 1, 1, 15, 15, 3, 4, 5, 7);
-
- // Basic desc
- expression = StreamExpressionParser.parse("innerJoin("
- + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
- + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
- + "on=\"join1_i=join1_i, join2_s=join2_s\")");
- stream = new InnerJoinStream(expression, factory);
- tuples = getTuples(stream);
- assert(tuples.size() == 8);
- assertOrder(tuples, 7,3,4,5,1,1,15,15);
-
- // Results in both searches, no join matches
- expression = StreamExpressionParser.parse("innerJoin("
- + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\"),"
- + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\", aliases=\"id=right.id, join1_i=right.join1_i, join2_s=right.join2_s, ident_s=right.ident_s\"),"
- + "on=\"ident_s=right.ident_s\")");
- stream = new InnerJoinStream(expression, factory);
- tuples = getTuples(stream);
- assert(tuples.size() == 0);
-
- // Differing field names
- expression = StreamExpressionParser.parse("innerJoin("
- + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
- + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join3_i asc, join2_s asc\", aliases=\"join3_i=aliasesField\"),"
- + "on=\"join1_i=aliasesField, join2_s=join2_s\")");
- stream = new InnerJoinStream(expression, factory);
- tuples = getTuples(stream);
-
- assert(tuples.size() == 8);
- assertOrder(tuples, 1, 1, 15, 15, 3, 4, 5, 7);
+ try {
+ // Basic test
+ expression = StreamExpressionParser.parse("innerJoin("
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc\"),"
+ + "on=\"join1_i=join1_i, join2_s=join2_s\")");
+ stream = new InnerJoinStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ assert (tuples.size() == 8);
+ assertOrder(tuples, 1, 1, 15, 15, 3, 4, 5, 7);
+
+ // Basic desc
+ expression = StreamExpressionParser.parse("innerJoin("
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ + "on=\"join1_i=join1_i, join2_s=join2_s\")");
+ stream = new InnerJoinStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ assert (tuples.size() == 8);
+ assertOrder(tuples, 7, 3, 4, 5, 1, 1, 15, 15);
+
+ // Results in both searches, no join matches
+ expression = StreamExpressionParser.parse("innerJoin("
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\", aliases=\"id=right.id, join1_i=right.join1_i, join2_s=right.join2_s, ident_s=right.ident_s\"),"
+ + "on=\"ident_s=right.ident_s\")");
+ stream = new InnerJoinStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ assert (tuples.size() == 0);
+
+ // Differing field names
+ expression = StreamExpressionParser.parse("innerJoin("
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join3_i asc, join2_s asc\", aliases=\"join3_i=aliasesField\"),"
+ + "on=\"join1_i=aliasesField, join2_s=join2_s\")");
+ stream = new InnerJoinStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assert (tuples.size() == 8);
+ assertOrder(tuples, 1, 1, 15, 15, 3, 4, 5, 7);
+ } finally {
+ solrClientCache.close();
+ }
}
@Test
@@ -2067,6 +2291,9 @@ public class StreamExpressionTest extends SolrCloudTestCase {
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTIONORALIAS, clus
<TRUNCATED>
[4/4] lucene-solr:branch_6x: SOLR-10274: fix precommit
Posted by jb...@apache.org.
SOLR-10274: fix precommit
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/56e3bc7a
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/56e3bc7a
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/56e3bc7a
Branch: refs/heads/branch_6x
Commit: 56e3bc7a54d1c0e1ec26f038e18fc87df2af559b
Parents: b03a7b1
Author: Joel Bernstein <jb...@apache.org>
Authored: Tue Apr 11 15:36:03 2017 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Tue Apr 11 22:41:49 2017 -0400
----------------------------------------------------------------------
.../apache/solr/client/solrj/io/stream/ParallelStream.java | 9 ---------
1 file changed, 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/56e3bc7a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
index def4c03..58ba248 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
@@ -18,14 +18,10 @@ package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Random;
-import java.util.Set;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
@@ -38,11 +34,6 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import static org.apache.solr.common.params.CommonParams.DISTRIB;