You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by kr...@apache.org on 2016/11/16 02:34:39 UTC
[2/3] lucene-solr:master: SOLR-9077: Streaming expressions should
support collection alias
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ace423e9/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index d447210..ff5a062 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -50,6 +50,7 @@ import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -64,12 +65,12 @@ import org.junit.Test;
@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40","Lucene41","Lucene42","Lucene45"})
public class StreamExpressionTest extends SolrCloudTestCase {
- private static final String COLLECTION = "collection1";
-
- private static final int TIMEOUT = 30;
-
+ private static final String COLLECTIONORALIAS = "collection1";
+ private static final int TIMEOUT = DEFAULT_TIMEOUT;
private static final String id = "id";
+ private static boolean useAlias;
+
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(4)
@@ -77,8 +78,17 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.addConfig("ml", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("ml").resolve("conf"))
.configure();
- CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
- AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
+ String collection;
+ useAlias = random().nextBoolean();
+ if(useAlias) {
+ collection = COLLECTIONORALIAS + "_collection";
+ CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
+ } else {
+ collection = COLLECTIONORALIAS;
+ }
+
+ CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
+ AbstractDistribZkTestBase.waitForRecoveriesToFinish(collection, cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
}
@@ -86,7 +96,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
public void cleanIndex() throws Exception {
new UpdateRequest()
.deleteByQuery("*:*")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
}
@Test
@@ -98,15 +108,15 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- StreamFactory factory = new StreamFactory().withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress());
+ StreamFactory factory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress());
StreamExpression expression;
CloudSolrStream stream;
List<Tuple> tuples;
// Basic test
- expression = StreamExpressionParser.parse("search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
@@ -115,7 +125,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assertLong(tuples.get(0), "a_i", 0);
// Basic w/aliases
- expression = StreamExpressionParser.parse("search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"a_i=alias.a_i, a_s=name\")");
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"a_i=alias.a_i, a_s=name\")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
@@ -125,7 +135,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assertString(tuples.get(0), "name", "hello0");
// Basic filtered test
- expression = StreamExpressionParser.parse("search(" + COLLECTION + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
@@ -134,7 +144,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assertLong(tuples.get(1), "a_i", 3);
try {
- expression = StreamExpressionParser.parse("search(" + COLLECTION + ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
throw new Exception("Should be an exception here");
@@ -143,7 +153,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
}
try {
- expression = StreamExpressionParser.parse("search(" + COLLECTION + ", q=\"blah\", sort=\"a_f asc, a_i asc\")");
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"blah\", sort=\"a_f asc, a_i asc\")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
throw new Exception("Should be an exception here");
@@ -162,7 +172,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamFactory factory = new StreamFactory();
StreamExpression expression;
@@ -170,7 +180,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
List<Tuple> tuples;
// Basic test
- expression = StreamExpressionParser.parse("search(" + COLLECTION + ", zkHost=" + cluster.getZkServer().getZkAddress() + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", zkHost=" + cluster.getZkServer().getZkAddress() + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
@@ -179,7 +189,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assertLong(tuples.get(0), "a_i", 0);
// Basic w/aliases
- expression = StreamExpressionParser.parse("search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"a_i=alias.a_i, a_s=name\", zkHost=" + cluster.getZkServer().getZkAddress() + ")");
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"a_i=alias.a_i, a_s=name\", zkHost=" + cluster.getZkServer().getZkAddress() + ")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
@@ -189,7 +199,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assertString(tuples.get(0), "name", "hello0");
// Basic filtered test
- expression = StreamExpressionParser.parse("search(" + COLLECTION + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", zkHost="
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", zkHost="
+ cluster.getZkServer().getZkAddress() + ", sort=\"a_f asc, a_i asc\")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
@@ -228,9 +238,9 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTION;
+ String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS;
List<Tuple> tuples;
TupleStream stream;
@@ -241,8 +251,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
+ "${q2},"
+ "on=${mySort})");
sParams.set(CommonParams.QT, "/stream");
- sParams.set("q1", "search(" + COLLECTION + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
- sParams.set("q2", "search(" + COLLECTION + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
+ sParams.set("q1", "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
+ sParams.set("q2", "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
sParams.set("mySort", "a_f asc");
stream = new SolrStream(url, sParams);
tuples = getTuples(stream);
@@ -259,7 +269,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assertOrder(tuples, 4, 3, 1, 0);
// Basic w/ multi comp
- sParams.set("q2", "search(" + COLLECTION + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
+ sParams.set("q2", "search(" + COLLECTIONORALIAS + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
sParams.set("mySort", "\"a_f asc, a_s asc\"");
stream = new SolrStream(url, sParams);
tuples = getTuples(stream);
@@ -277,19 +287,19 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("unique", UniqueStream.class);
// Basic test
- expression = StreamExpressionParser.parse("unique(search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f\")");
+ expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f\")");
stream = new UniqueStream(expression, factory);
tuples = getTuples(stream);
@@ -297,7 +307,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assertOrder(tuples, 0, 1, 3, 4);
// Basic test desc
- expression = StreamExpressionParser.parse("unique(search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"), over=\"a_f\")");
+ expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"), over=\"a_f\")");
stream = new UniqueStream(expression, factory);
tuples = getTuples(stream);
@@ -305,7 +315,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assertOrder(tuples, 4,3,1,2);
// Basic w/multi comp
- expression = StreamExpressionParser.parse("unique(search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f, a_i\")");
+ expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f, a_i\")");
stream = new UniqueStream(expression, factory);
tuples = getTuples(stream);
@@ -313,7 +323,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assertOrder(tuples, 0,2,1,3,4);
// full factory w/multi comp
- stream = factory.constructStream("unique(search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f, a_i\")");
+ stream = factory.constructStream("unique(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f, a_i\")");
tuples = getTuples(stream);
assert(tuples.size() == 5);
@@ -331,31 +341,31 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
.add(id, "5", "a_s", "hello1", "a_i", "1", "a_f", "2")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("sort", SortStream.class);
// Basic test
- stream = factory.constructStream("sort(search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc\")");
+ stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc\")");
tuples = getTuples(stream);
assert(tuples.size() == 6);
assertOrder(tuples, 0,1,5,2,3,4);
// Basic test desc
- stream = factory.constructStream("sort(search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i desc\")");
+ stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i desc\")");
tuples = getTuples(stream);
assert(tuples.size() == 6);
assertOrder(tuples, 4,3,2,1,5,0);
// Basic w/multi comp
- stream = factory.constructStream("sort(search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc, a_f desc\")");
+ stream = factory.constructStream("sort(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc, a_f desc\")");
tuples = getTuples(stream);
assert(tuples.size() == 6);
assertOrder(tuples, 0,5,1,2,3,4);
@@ -371,17 +381,17 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "3", "a_s", "hello3", "a_i", "4", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_f", "4")
.add(id, "1", "a_s", "hello1", "a_i", "2", "a_f", "1")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
Tuple tuple;
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class);
// Basic test
- expression = StreamExpressionParser.parse("search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_i asc\")");
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_i asc\")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
@@ -405,7 +415,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assertNotNull(longs);
//test sort (asc) with null string field. Null should sort to the top.
- expression = StreamExpressionParser.parse("search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_s asc\")");
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_s asc\")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
@@ -413,7 +423,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assertOrder(tuples, 0, 1, 2, 3, 4);
//test sort(desc) with null string field. Null should sort to the bottom.
- expression = StreamExpressionParser.parse("search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_s desc\")");
+ expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_s desc\")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
@@ -431,22 +441,22 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("merge", MergeStream.class);
// Basic test
expression = StreamExpressionParser.parse("merge("
- + "search(" + COLLECTION + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"),"
- + "search(" + COLLECTION + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"),"
+ "on=\"a_f asc\")");
stream = new MergeStream(expression, factory);
tuples = getTuples(stream);
@@ -456,8 +466,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
// Basic test desc
expression = StreamExpressionParser.parse("merge("
- + "search(" + COLLECTION + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
- + "search(" + COLLECTION + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+ "on=\"a_f desc\")");
stream = new MergeStream(expression, factory);
tuples = getTuples(stream);
@@ -467,8 +477,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
// Basic w/multi comp
expression = StreamExpressionParser.parse("merge("
- + "search(" + COLLECTION + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
- + "search(" + COLLECTION + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "on=\"a_f asc, a_s asc\")");
stream = new MergeStream(expression, factory);
tuples = getTuples(stream);
@@ -478,8 +488,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
// full factory w/multi comp
stream = factory.constructStream("merge("
- + "search(" + COLLECTION + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
- + "search(" + COLLECTION + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "on=\"a_f asc, a_s asc\")");
tuples = getTuples(stream);
@@ -488,9 +498,9 @@ public class StreamExpressionTest extends SolrCloudTestCase {
// full factory w/multi streams
stream = factory.constructStream("merge("
- + "search(" + COLLECTION + ", q=\"id:(0 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
- + "search(" + COLLECTION + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
- + "search(" + COLLECTION + ", q=\"id:(2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(0 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"id:(2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "on=\"a_f asc\")");
tuples = getTuples(stream);
@@ -508,14 +518,14 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("top", RankStream.class);
@@ -523,7 +533,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
// Basic test
expression = StreamExpressionParser.parse("top("
+ "n=3,"
- + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
+ "sort=\"a_f asc, a_i asc\")");
stream = new RankStream(expression, factory);
tuples = getTuples(stream);
@@ -535,7 +545,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
expression = StreamExpressionParser.parse("top("
+ "n=2,"
+ "unique("
- + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+ "over=\"a_f\"),"
+ "sort=\"a_f desc\")");
stream = new RankStream(expression, factory);
@@ -548,7 +558,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
stream = factory.constructStream("top("
+ "n=4,"
+ "unique("
- + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
+ "over=\"a_f\"),"
+ "sort=\"a_f asc\")");
tuples = getTuples(stream);
@@ -560,7 +570,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
stream = factory.constructStream("top("
+ "n=4,"
+ "unique("
- + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"),"
+ "over=\"a_f\"),"
+ "sort=\"a_f asc\")");
tuples = getTuples(stream);
@@ -578,13 +588,13 @@ public class StreamExpressionTest extends SolrCloudTestCase {
String idxString = new Integer(idx).toString();
update.add(id,idxString, "a_s", "hello" + idxString, "a_i", idxString, "a_f", idxString);
}
- update.commit(cluster.getSolrClient(), COLLECTION);
+ update.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamExpression expression;
TupleStream stream;
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("random", RandomStream.class);
@@ -593,13 +603,13 @@ public class StreamExpressionTest extends SolrCloudTestCase {
try {
context.setSolrClientCache(cache);
- expression = StreamExpressionParser.parse("random(" + COLLECTION + ", q=\"*:*\", rows=\"1000\", fl=\"id, a_i\")");
+ expression = StreamExpressionParser.parse("random(" + COLLECTIONORALIAS + ", q=\"*:*\", rows=\"1000\", fl=\"id, a_i\")");
stream = factory.constructStream(expression);
stream.setStreamContext(context);
List<Tuple> tuples1 = getTuples(stream);
assert (tuples1.size() == 1000);
- expression = StreamExpressionParser.parse("random(" + COLLECTION + ", q=\"*:*\", rows=\"1000\", fl=\"id, a_i\")");
+ expression = StreamExpressionParser.parse("random(" + COLLECTIONORALIAS + ", q=\"*:*\", rows=\"1000\", fl=\"id, a_i\")");
stream = factory.constructStream(expression);
stream.setStreamContext(context);
List<Tuple> tuples2 = getTuples(stream);
@@ -628,7 +638,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
}
}
- expression = StreamExpressionParser.parse("random(" + COLLECTION + ", q=\"*:*\", rows=\"1\", fl=\"id, a_i\")");
+ expression = StreamExpressionParser.parse("random(" + COLLECTIONORALIAS + ", q=\"*:*\", rows=\"1\", fl=\"id, a_i\")");
stream = factory.constructStream(expression);
stream.setStreamContext(context);
List<Tuple> tuples3 = getTuples(stream);
@@ -653,7 +663,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamExpression expression;
TupleStream stream;
@@ -662,14 +672,14 @@ public class StreamExpressionTest extends SolrCloudTestCase {
List<Map> maps0, maps1, maps2;
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("reduce", ReducerStream.class)
.withFunctionName("group", GroupOperation.class);
// basic
expression = StreamExpressionParser.parse("reduce("
- + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
+ "by=\"a_s\","
+ "group(sort=\"a_f desc\", n=\"4\"))");
@@ -693,7 +703,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
// basic w/spaces
expression = StreamExpressionParser.parse("reduce("
- + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
+ "by=\"a_s\"," +
"group(sort=\"a_i asc\", n=\"2\"))");
stream = factory.constructStream(expression);
@@ -733,17 +743,17 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "8", "subject", "blah blah blah 7")
.add(id, "8", "a_s", "hello3", "a_i", "8", "a_f", "9", "subject", "blah blah blah 8")
.add(id, "9", "a_s", "hello0", "a_i", "9", "a_f", "10", "subject", "blah blah blah 9")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
TupleStream stream;
List<Tuple> tuples;
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("fetch", FetchStream.class);
- stream = factory.constructStream("fetch("+COLLECTION+", search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\")");
+ stream = factory.constructStream("fetch("+ COLLECTIONORALIAS +", search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\")");
StreamContext context = new StreamContext();
context.setSolrClientCache(solrClientCache);
stream.setStreamContext(context);
@@ -772,7 +782,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assertTrue("blah blah blah 9".equals(t.getString("subject")));
//Change the batch size
- stream = factory.constructStream("fetch("+COLLECTION+", search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\")");
+ stream = factory.constructStream("fetch("+ COLLECTIONORALIAS +", search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\")");
context = new StreamContext();
context.setSolrClientCache(solrClientCache);
stream.setStreamContext(context);
@@ -816,18 +826,18 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "8", "subject", "blah blah blah 7")
.add(id, "8", "a_s", "hello3", "a_i", "8", "a_f", "9", "subject", "blah blah blah 8")
.add(id, "9", "a_s", "hello0", "a_i", "9", "a_f", "10", "subject", "blah blah blah 9")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
TupleStream stream;
List<Tuple> tuples;
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("parallel", ParallelStream.class)
.withFunctionName("fetch", FetchStream.class);
- stream = factory.constructStream("parallel(" + COLLECTION + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTION + ", search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))");
+ stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ", search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))");
tuples = getTuples(stream);
assert(tuples.size() == 10);
@@ -853,7 +863,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assertTrue("blah blah blah 9".equals(t.getString("subject")));
- stream = factory.constructStream("parallel(" + COLLECTION + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTION + ", search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))");
+ stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTIONORALIAS + ", search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))");
tuples = getTuples(stream);
assert(tuples.size() == 10);
@@ -898,10 +908,10 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("rollup", RollupStream.class)
.withFunctionName("sum", SumMetric.class)
@@ -915,7 +925,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
DaemonStream daemonStream;
expression = StreamExpressionParser.parse("daemon(rollup("
- + "search(" + COLLECTION + ", q=\"*:*\", fl=\"a_i,a_s\", sort=\"a_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"a_i,a_s\", sort=\"a_s asc\"),"
+ "over=\"a_s\","
+ "sum(a_i)"
+ "), id=\"test\", runInterval=\"1000\", queueSize=\"9\")");
@@ -965,7 +975,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
new UpdateRequest()
.add(id, "10", "a_s", "hello0", "a_i", "1", "a_f", "10")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
//Now lets clear the existing docs in the queue 9, plus 3 more to get passed the run that was blocked. The next run should
//have the tuples with the updated count.
@@ -1006,6 +1016,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
@Test
public void testTerminatingDaemonStream() throws Exception {
+ Assume.assumeTrue(!useAlias);
new UpdateRequest()
.add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1")
@@ -1018,10 +1029,10 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("topic", TopicStream.class)
.withFunctionName("daemon", DaemonStream.class);
@@ -1031,7 +1042,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
SolrClientCache cache = new SolrClientCache();
StreamContext context = new StreamContext();
context.setSolrClientCache(cache);
- expression = StreamExpressionParser.parse("daemon(topic("+COLLECTION+","+COLLECTION+", q=\"a_s:hello\", initialCheckpoint=0, id=\"topic1\", rows=2, fl=\"id\""
+ expression = StreamExpressionParser.parse("daemon(topic("+ COLLECTIONORALIAS +","+ COLLECTIONORALIAS +", q=\"a_s:hello\", initialCheckpoint=0, id=\"topic1\", rows=2, fl=\"id\""
+ "), id=test, runInterval=1000, terminate=true, queueSize=50)");
daemonStream = (DaemonStream)factory.constructStream(expression);
daemonStream.setStreamContext(context);
@@ -1056,10 +1067,10 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("rollup", RollupStream.class)
.withFunctionName("sum", SumMetric.class)
@@ -1073,7 +1084,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
List<Tuple> tuples;
expression = StreamExpressionParser.parse("rollup("
- + "search(" + COLLECTION + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\"),"
+ "over=\"a_s\","
+ "sum(a_i),"
+ "sum(a_f),"
@@ -1177,10 +1188,10 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("stats", StatsStream.class)
.withFunctionName("sum", SumMetric.class)
.withFunctionName("min", MinMetric.class)
@@ -1238,17 +1249,17 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "6", "a_s", "hello1", "a_i", "11", "a_f", "5")
.add(id, "7", "a_s", "hello1", "a_i", "12", "a_f", "5")
.add(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String zkHost = cluster.getZkServer().getZkAddress();
- StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTION, zkHost)
+ StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, zkHost)
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("top", RankStream.class)
.withFunctionName("group", ReducerStream.class)
.withFunctionName("parallel", ParallelStream.class);
- ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTION + ", unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_f asc\")");
+ ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_f asc\")");
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 5);
@@ -1275,18 +1286,18 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String zkHost = cluster.getZkServer().getZkAddress();
- StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTION, zkHost)
+ StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, zkHost)
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("group", GroupOperation.class)
.withFunctionName("reduce", ReducerStream.class)
.withFunctionName("parallel", ParallelStream.class);
- ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTION + ", " +
+ ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", " +
"reduce(" +
- "search(" + COLLECTION + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), " +
+ "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), " +
"by=\"a_s\"," +
"group(sort=\"a_i asc\", n=\"5\")), " +
"workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s asc\")");
@@ -1308,9 +1319,9 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assertMaps(maps2, 4, 6);
- pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTION + ", " +
+ pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", " +
"reduce(" +
- "search(" + COLLECTION + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), " +
+ "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), " +
"by=\"a_s\", " +
"group(sort=\"a_i desc\", n=\"5\")),"+
"workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s desc\")");
@@ -1349,10 +1360,10 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "8", "a_s", "hello1", "a_i", "8", "a_f", "1")
.add(id, "9", "a_s", "hello1", "a_i", "9", "a_f", "1")
.add(id, "10", "a_s", "hello1", "a_i", "10", "a_f", "1")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String zkHost = cluster.getZkServer().getZkAddress();
- StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTION, zkHost)
+ StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, zkHost)
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("top", RankStream.class)
@@ -1360,9 +1371,9 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.withFunctionName("parallel", ParallelStream.class);
ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel("
- + COLLECTION + ", "
+ + COLLECTIONORALIAS + ", "
+ "top("
- + "search(" + COLLECTION + ", q=\"*:*\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), "
+ + "search(" + COLLECTIONORALIAS + ", q=\"*:*\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), "
+ "n=\"11\", "
+ "sort=\"a_i desc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i desc\")");
@@ -1387,10 +1398,10 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "3")
.add(id, "8", "a_s", "hello4", "a_i", "11", "a_f", "4")
.add(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String zkHost = cluster.getZkServer().getZkAddress();
- StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTION, zkHost)
+ StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, zkHost)
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("top", RankStream.class)
@@ -1399,7 +1410,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.withFunctionName("parallel", ParallelStream.class);
//Test ascending
- ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTION + ", merge(search(" + COLLECTION + ", q=\"id:(4 1 8 7 9)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), search(" + COLLECTION + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), on=\"a_i asc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i asc\")");
+ ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 7 9)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), on=\"a_i asc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i asc\")");
List<Tuple> tuples = getTuples(pstream);
@@ -1410,7 +1421,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
//Test descending
- pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTION + ", merge(search(" + COLLECTION + ", q=\"id:(4 1 8 9)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), search(" + COLLECTION + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), on=\"a_i desc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i desc\")");
+ pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 9)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i desc\", partitionKeys=\"a_i\"), on=\"a_i desc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i desc\")");
tuples = getTuples(pstream);
@@ -1433,10 +1444,10 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("parallel", ParallelStream.class)
.withFunctionName("rollup", RollupStream.class)
@@ -1450,9 +1461,9 @@ public class StreamExpressionTest extends SolrCloudTestCase {
TupleStream stream;
List<Tuple> tuples;
- expression = StreamExpressionParser.parse("parallel(" + COLLECTION + ","
+ expression = StreamExpressionParser.parse("parallel(" + COLLECTIONORALIAS + ","
+ "rollup("
- + "search(" + COLLECTION + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\", partitionKeys=\"a_s\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\", partitionKeys=\"a_s\"),"
+ "over=\"a_s\","
+ "sum(a_i),"
+ "sum(a_f),"
@@ -1564,21 +1575,21 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "12", "side_s", "right", "join1_i", "1", "join2_s", "c", "ident_s", "right_5", "join3_i", "1") // 5
.add(id, "13", "side_s", "right", "join1_i", "2", "join2_s", "dad", "ident_s", "right_6", "join3_i", "2")
.add(id, "14", "side_s", "right", "join1_i", "3", "join2_s", "e", "ident_s", "right_7", "join3_i", "3") // 7
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("innerJoin", InnerJoinStream.class);
// Basic test
expression = StreamExpressionParser.parse("innerJoin("
- + "search(" + COLLECTION + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
- + "search(" + COLLECTION + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc\"),"
+ "on=\"join1_i=join1_i, join2_s=join2_s\")");
stream = new InnerJoinStream(expression, factory);
tuples = getTuples(stream);
@@ -1587,8 +1598,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
// Basic desc
expression = StreamExpressionParser.parse("innerJoin("
- + "search(" + COLLECTION + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
- + "search(" + COLLECTION + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ "on=\"join1_i=join1_i, join2_s=join2_s\")");
stream = new InnerJoinStream(expression, factory);
tuples = getTuples(stream);
@@ -1597,8 +1608,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
// Results in both searches, no join matches
expression = StreamExpressionParser.parse("innerJoin("
- + "search(" + COLLECTION + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\"),"
- + "search(" + COLLECTION + ", q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\", aliases=\"id=right.id, join1_i=right.join1_i, join2_s=right.join2_s, ident_s=right.ident_s\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\", aliases=\"id=right.id, join1_i=right.join1_i, join2_s=right.join2_s, ident_s=right.ident_s\"),"
+ "on=\"ident_s=right.ident_s\")");
stream = new InnerJoinStream(expression, factory);
tuples = getTuples(stream);
@@ -1606,8 +1617,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
// Differing field names
expression = StreamExpressionParser.parse("innerJoin("
- + "search(" + COLLECTION + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
- + "search(" + COLLECTION + ", q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join3_i asc, join2_s asc\", aliases=\"join3_i=aliasesField\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join3_i asc, join2_s asc\", aliases=\"join3_i=aliasesField\"),"
+ "on=\"join1_i=aliasesField, join2_s=join2_s\")");
stream = new InnerJoinStream(expression, factory);
tuples = getTuples(stream);
@@ -1637,21 +1648,21 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "12", "side_s", "right", "join1_i", "1", "join2_s", "c", "ident_s", "right_5", "join3_i", "1") // 5
.add(id, "13", "side_s", "right", "join1_i", "2", "join2_s", "dad", "ident_s", "right_6", "join3_i", "2")
.add(id, "14", "side_s", "right", "join1_i", "3", "join2_s", "e", "ident_s", "right_7", "join3_i", "3") // 7
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("leftOuterJoin", LeftOuterJoinStream.class);
// Basic test
expression = StreamExpressionParser.parse("leftOuterJoin("
- + "search(" + COLLECTION + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
- + "search(" + COLLECTION + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc\"),"
+ "on=\"join1_i=join1_i, join2_s=join2_s\")");
stream = new LeftOuterJoinStream(expression, factory);
tuples = getTuples(stream);
@@ -1660,8 +1671,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
// Basic desc
expression = StreamExpressionParser.parse("leftOuterJoin("
- + "search(" + COLLECTION + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
- + "search(" + COLLECTION + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ "on=\"join1_i=join1_i, join2_s=join2_s\")");
stream = new LeftOuterJoinStream(expression, factory);
tuples = getTuples(stream);
@@ -1670,8 +1681,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
// Results in both searches, no join matches
expression = StreamExpressionParser.parse("leftOuterJoin("
- + "search(" + COLLECTION + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\"),"
- + "search(" + COLLECTION + ", q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\", aliases=\"id=right.id, join1_i=right.join1_i, join2_s=right.join2_s, ident_s=right.ident_s\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\", aliases=\"id=right.id, join1_i=right.join1_i, join2_s=right.join2_s, ident_s=right.ident_s\"),"
+ "on=\"ident_s=right.ident_s\")");
stream = new LeftOuterJoinStream(expression, factory);
tuples = getTuples(stream);
@@ -1680,8 +1691,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
// Differing field names
expression = StreamExpressionParser.parse("leftOuterJoin("
- + "search(" + COLLECTION + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
- + "search(" + COLLECTION + ", q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join3_i asc, join2_s asc\", aliases=\"join3_i=aliasesField\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+ + "search(" + COLLECTIONORALIAS + ", q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join3_i asc, join2_s asc\", aliases=\"join3_i=aliasesField\"),"
+ "on=\"join1_i=aliasesField, join2_s=join2_s\")");
stream = new LeftOuterJoinStream(expression, factory);
tuples = getTuples(stream);
@@ -1710,14 +1721,14 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "12", "side_s", "right", "join1_i", "1", "join2_s", "c", "ident_s", "right_5", "join3_i", "1") // 5
.add(id, "13", "side_s", "right", "join1_i", "2", "join2_s", "dad", "ident_s", "right_6", "join3_i", "2")
.add(id, "14", "side_s", "right", "join1_i", "3", "join2_s", "e", "ident_s", "right_7", "join3_i", "3") // 7
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("hashJoin", HashJoinStream.class);
@@ -1784,7 +1795,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "12", "side_s", "right", "join1_i", "1", "join2_s", "c", "ident_s", "right_5", "join3_i", "1") // 5
.add(id, "13", "side_s", "right", "join1_i", "2", "join2_s", "dad", "ident_s", "right_6", "join3_i", "2")
.add(id, "14", "side_s", "right", "join1_i", "3", "join2_s", "e", "ident_s", "right_7", "join3_i", "3") // 7
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamExpression expression;
TupleStream stream;
@@ -1856,7 +1867,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "12", "side_s", "right", "join1_i", "1", "join2_s", "c", "ident_s", "right_5", "join3_i", "1") // 5
.add(id, "13", "side_s", "right", "join1_i", "2", "join2_s", "dad", "ident_s", "right_6", "join3_i", "2")
.add(id, "14", "side_s", "right", "join1_i", "3", "join2_s", "e", "ident_s", "right_7", "join3_i", "3") // 7
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String clause;
TupleStream stream;
@@ -1972,7 +1983,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String clause;
TupleStream stream;
@@ -2398,7 +2409,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "7", "level1_s", "hello3", "level2_s", "b", "a_i", "12", "a_f", "8")
.add(id, "8", "level1_s", "hello3", "level2_s", "b", "a_i", "13", "a_f", "9")
.add(id, "9", "level1_s", "hello0", "level2_s", "b", "a_i", "14", "a_f", "10")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String clause;
TupleStream stream;
@@ -2573,11 +2584,11 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assertTrue(bucket2.equals("a"));
assertTrue(sumi.longValue() == 2);
assertTrue(count.doubleValue() == 2);
-
}
@Test
public void testTopicStream() throws Exception {
+ Assume.assumeTrue(!useAlias);
new UpdateRequest()
.add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1")
@@ -2590,7 +2601,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
@@ -2635,7 +2646,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
new UpdateRequest()
.add(id, "10", "a_s", "hello", "a_i", "13", "a_f", "9")
.add(id, "11", "a_s", "hello", "a_i", "14", "a_f", "10")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
expression = StreamExpressionParser.parse("topic(collection1, collection1, fl=\"id\", q=\"a_s:hello\", id=\"1000000\", checkpointEvery=2)");
@@ -2702,7 +2713,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
new UpdateRequest()
.add(id, "12", "a_s", "hello", "a_i", "13", "a_f", "9")
.add(id, "13", "a_s", "hello", "a_i", "14", "a_f", "10")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
//Start reading from the DaemonStream
Tuple tuple = null;
@@ -2718,7 +2729,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
new UpdateRequest()
.add(id, "14", "a_s", "hello", "a_i", "13", "a_f", "9")
.add(id, "15", "a_s", "hello", "a_i", "14", "a_f", "10")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
//Read from the same DaemonStream stream
@@ -2738,10 +2749,11 @@ public class StreamExpressionTest extends SolrCloudTestCase {
}
}
-
@Test
public void testParallelTopicStream() throws Exception {
+ Assume.assumeTrue(!useAlias);
+
new UpdateRequest()
.add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1", "subject", "ha ha bla blah0")
.add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2", "subject", "ha ha bla blah2")
@@ -2753,7 +2765,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8", "subject", "ha ha bla blah8")
.add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9", "subject", "ha ha bla blah9")
.add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10", "subject", "ha ha bla blah10")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
@@ -2811,7 +2823,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
new UpdateRequest()
.add(id, "10", "a_s", "hello", "a_i", "13", "a_f", "9")
.add(id, "11", "a_s", "hello", "a_i", "14", "a_f", "10")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
expression = StreamExpressionParser.parse("parallel(collection1, " +
"workers=\"2\", " +
@@ -2854,7 +2866,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
new UpdateRequest()
.add(id, "12", "a_s", "hello", "a_i", "13", "a_f", "9")
.add(id, "13", "a_s", "hello", "a_i", "14", "a_f", "10")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
//Run the same topic again including the initialCheckpoint. It should start where it left off.
//initialCheckpoint should be ignored for all but the first run.
@@ -3244,6 +3256,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
@Test
public void testParallelTerminatingDaemonUpdateStream() throws Exception {
+ Assume.assumeTrue(!useAlias);
CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish("parallelDestinationCollection1", cluster.getSolrClient().getZkStateReader(),
@@ -3709,7 +3722,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "setAB", "a_i", "0")
.add(id, "8", "a_s", "setAB", "a_i", "6")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamExpression expression;
TupleStream stream;
@@ -3735,6 +3748,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
@Test
public void testClassifyStream() throws Exception {
+ Assume.assumeTrue(!useAlias);
+
CollectionAdminRequest.createCollection("modelCollection", "ml", 2, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish("modelCollection", cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
@@ -3752,14 +3767,14 @@ public class StreamExpressionTest extends SolrCloudTestCase {
updateRequest.add(id, String.valueOf(i+1), "tv_text", "a b e e f", "out_i", "0");
}
- updateRequest.commit(cluster.getSolrClient(), COLLECTION);
+ updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
updateRequest = new UpdateRequest();
updateRequest.add(id, String.valueOf(0), "text_s", "a b c c d");
updateRequest.add(id, String.valueOf(1), "text_s", "a b e e f");
updateRequest.commit(cluster.getSolrClient(), "uknownCollection");
- String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTION;
+ String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS;
TupleStream updateTrainModelStream;
ModifiableSolrParams paramsLoc;
@@ -3817,14 +3832,14 @@ public class StreamExpressionTest extends SolrCloudTestCase {
// Train another model
updateRequest = new UpdateRequest();
updateRequest.deleteByQuery("*:*");
- updateRequest.commit(cluster.getSolrClient(), COLLECTION);
+ updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
updateRequest = new UpdateRequest();
for (int i = 0; i < 500; i+=2) {
updateRequest.add(id, String.valueOf(i), "tv_text", "a b c c d", "out_i", "0");
updateRequest.add(id, String.valueOf(i+1), "tv_text", "a b e e f", "out_i", "1");
}
- updateRequest.commit(cluster.getSolrClient(), COLLECTION);
+ updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
updateTrainModelStream = factory.constructStream("update(modelCollection, batchSize=5, "+textLogitExpression+")");
getTuples(updateTrainModelStream);
cluster.getSolrClient().commit("modelCollection");
@@ -4018,6 +4033,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
@Test
public void testBasicTextLogitStream() throws Exception {
+ Assume.assumeTrue(!useAlias);
+
CollectionAdminRequest.createCollection("destinationCollection", "ml", 2, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish("destinationCollection", cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
@@ -4027,7 +4044,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
updateRequest.add(id, String.valueOf(i), "tv_text", "a b c c d", "out_i", "1");
updateRequest.add(id, String.valueOf(i+1), "tv_text", "a b e e f", "out_i", "0");
}
- updateRequest.commit(cluster.getSolrClient(), COLLECTION);
+ updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamExpression expression;
TupleStream stream;
@@ -4144,7 +4161,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "setAB", "a_i", "0")
.add(id, "8", "a_s", "setAB", "a_i", "6")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamFactory streamFactory = new StreamFactory()
.withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
@@ -4170,6 +4187,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
@Test
public void testFeaturesSelectionStream() throws Exception {
+ Assume.assumeTrue(!useAlias);
CollectionAdminRequest.createCollection("destinationCollection", "ml", 2, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish("destinationCollection", cluster.getSolrClient().getZkStateReader(),
@@ -4180,7 +4198,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
updateRequest.add(id, String.valueOf(i), "whitetok", "a b c d", "out_i", "1");
updateRequest.add(id, String.valueOf(i+1), "whitetok", "a b e f", "out_i", "0");
}
- updateRequest.commit(cluster.getSolrClient(), COLLECTION);
+ updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamExpression expression;
TupleStream stream;
@@ -4239,7 +4257,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "setAB", "a_i", "0")
.add(id, "8", "a_s", "setAB", "a_i", "6")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamExpression expression;
TupleStream stream;
@@ -4278,7 +4296,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "setAB", "a_i", "0")
.add(id, "8", "a_s", "setAB", "a_i", "6")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamFactory streamFactory = new StreamFactory()
.withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())