You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/12/15 10:03:58 UTC
[06/34] lucene-solr:jira/http2: SOLR-13057: Allow search,
facet and timeseries Streaming Expressions to accept a comma
delimited list of collections
SOLR-13057: Allow search, facet and timeseries Streaming Expressions to accept a comma delimited list of collections
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/7e4555a2
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/7e4555a2
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/7e4555a2
Branch: refs/heads/jira/http2
Commit: 7e4555a2fdb863d6aac2f785116f8f13e51bf16b
Parents: ce9a801
Author: Joel Bernstein <jb...@apache.org>
Authored: Wed Dec 12 09:15:41 2018 -0500
Committer: Joel Bernstein <jb...@apache.org>
Committed: Wed Dec 12 09:16:08 2018 -0500
----------------------------------------------------------------------
.../client/solrj/io/stream/CloudSolrStream.java | 21 +++-
.../client/solrj/io/stream/FacetStream.java | 11 +-
.../solrj/io/stream/SearchFacadeStream.java | 6 ++
.../client/solrj/io/stream/SearchStream.java | 6 +-
.../solrj/io/stream/TimeSeriesStream.java | 11 +-
.../solrj/io/stream/StreamExpressionTest.java | 108 +++++++++++++++++++
6 files changed, 155 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7e4555a2/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index 2cff0a7..f871473 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -170,7 +170,11 @@ public class CloudSolrStream extends TupleStream implements Expressible {
StreamExpression expression = new StreamExpression("search");
// collection
- expression.addParameter(collection);
+ if(collection.indexOf(',') > -1) {
+ expression.addParameter("\""+collection+"\"");
+ } else {
+ expression.addParameter(collection);
+ }
for (Entry<String, String[]> param : params.getMap().entrySet()) {
for (String val : param.getValue()) {
@@ -334,11 +338,18 @@ public class CloudSolrStream extends TupleStream implements Expressible {
// which is something already supported in other parts of Solr
// check for alias or collection
- List<String> collections = checkAlias
- ? zkStateReader.getAliases().resolveAliases(collectionName) // if not an alias, returns collectionName
- : Collections.singletonList(collectionName);
+
+ List<String> allCollections = new ArrayList();
+ String[] collectionNames = collectionName.split(",");
+ for(String col : collectionNames) {
+ List<String> collections = checkAlias
+ ? zkStateReader.getAliases().resolveAliases(col) // if not an alias, returns collectionName
+ : Collections.singletonList(collectionName);
+ allCollections.addAll(collections);
+ }
+
// Lookup all actives slices for these collections
- List<Slice> slices = collections.stream()
+ List<Slice> slices = allCollections.stream()
.map(collectionsMap::get)
.filter(Objects::nonNull)
.flatMap(docCol -> Arrays.stream(docCol.getActiveSlicesArr()))
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7e4555a2/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
index b84967c..38d0904 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
@@ -100,6 +100,11 @@ public class FacetStream extends TupleStream implements Expressible {
public FacetStream(StreamExpression expression, StreamFactory factory) throws IOException{
// grab all parameters out
String collectionName = factory.getValueOperand(expression, 0);
+
+ if(collectionName.indexOf('"') > -1) {
+ collectionName = collectionName.replaceAll("\"", "").replaceAll(" ", "");
+ }
+
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
StreamExpressionNamedParameter bucketExpression = factory.getNamedOperand(expression, "buckets");
StreamExpressionNamedParameter bucketSortExpression = factory.getNamedOperand(expression, "bucketSorts");
@@ -378,7 +383,11 @@ public class FacetStream extends TupleStream implements Expressible {
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// collection
- expression.addParameter(collection);
+ if(collection.indexOf(',') > -1) {
+ expression.addParameter("\""+collection+"\"");
+ } else {
+ expression.addParameter(collection);
+ }
// parameters
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7e4555a2/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchFacadeStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchFacadeStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchFacadeStream.java
index 5e8b549..de2b20e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchFacadeStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchFacadeStream.java
@@ -48,6 +48,12 @@ public class SearchFacadeStream extends TupleStream implements Expressible {
public SearchFacadeStream(StreamExpression expression, StreamFactory factory) throws IOException{
// grab all parameters out
String collectionName = factory.getValueOperand(expression, 0);
+
+ //Handle comma delimited list of collections.
+ if(collectionName.indexOf('"') > -1) {
+ collectionName = collectionName.replaceAll("\"", "").replaceAll(" ", "");
+ }
+
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7e4555a2/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
index a4ed996..3643969 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
@@ -128,7 +128,11 @@ public class SearchStream extends TupleStream implements Expressible {
StreamExpression expression = new StreamExpression("search");
// collection
- expression.addParameter(collection);
+ if(collection.indexOf(',') > -1) {
+ expression.addParameter("\""+collection+"\"");
+ } else {
+ expression.addParameter(collection);
+ }
for (Entry<String, String[]> param : params.getMap().entrySet()) {
for (String val : param.getValue()) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7e4555a2/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
index 13e72fa..764c70b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
@@ -89,6 +89,11 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
public TimeSeriesStream(StreamExpression expression, StreamFactory factory) throws IOException{
// grab all parameters out
String collectionName = factory.getValueOperand(expression, 0);
+
+ if(collectionName.indexOf('"') > -1) {
+ collectionName = collectionName.replaceAll("\"", "").replaceAll(" ", "");
+ }
+
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
StreamExpressionNamedParameter startExpression = factory.getNamedOperand(expression, "start");
StreamExpressionNamedParameter endExpression = factory.getNamedOperand(expression, "end");
@@ -212,7 +217,11 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// collection
- expression.addParameter(collection);
+ if(collection.indexOf(',') > -1) {
+ expression.addParameter("\""+collection+"\"");
+ } else {
+ expression.addParameter(collection);
+ }
// parameters
ModifiableSolrParams tmpParams = new ModifiableSolrParams(params);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7e4555a2/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 2725903..e271401 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -1400,6 +1400,114 @@ public class StreamExpressionTest extends SolrCloudTestCase {
}
@Test
+ public void testMultiCollection() throws Exception {
+
+ CollectionAdminRequest.createCollection("collection2", "conf", 2, 1).process(cluster.getSolrClient());
+ cluster.waitForActiveCollection("collection2", 2, 2);
+
+ new UpdateRequest()
+ .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "test_dt", getDateString("2016", "5", "1"), "i_multi", "4", "i_multi", "7")
+ .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "test_dt", getDateString("2016", "5", "1"), "i_multi", "44", "i_multi", "77")
+ .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "test_dt", getDateString("2016", "5", "1"), "i_multi", "444", "i_multi", "777")
+ .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "test_dt", getDateString("2016", "5", "1"), "i_multi", "4444", "i_multi", "7777")
+ .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "test_dt", getDateString("2016", "5", "1"), "i_multi", "44444", "i_multi", "77777")
+ .commit(cluster.getSolrClient(), "collection1");
+
+ new UpdateRequest()
+ .add(id, "10", "a_s", "hello", "a_i", "10", "a_f", "0", "s_multi", "aaaa", "test_dt", getDateString("2016", "5", "1"), "i_multi", "4", "i_multi", "7")
+ .add(id, "12", "a_s", "hello", "a_i", "12", "a_f", "0", "s_multi", "aaaa1", "test_dt", getDateString("2016", "5", "1"), "i_multi", "44", "i_multi", "77")
+ .add(id, "13", "a_s", "hello", "a_i", "13", "a_f", "3", "s_multi", "aaaa2", "test_dt", getDateString("2016", "5", "1"), "i_multi", "444", "i_multi", "777")
+ .add(id, "14", "a_s", "hello", "a_i", "14", "a_f", "4", "s_multi", "aaaa3", "test_dt", getDateString("2016", "5", "1"), "i_multi", "4444", "i_multi", "7777")
+ .add(id, "11", "a_s", "hello", "a_i", "11", "a_f", "1", "s_multi", "aaaa4", "test_dt", getDateString("2016", "5", "1"), "i_multi", "44444", "i_multi", "77777")
+ .commit(cluster.getSolrClient(), "collection2");
+
+
+ List<Tuple> tuples;
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+ List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext);
+
+ try {
+ StringBuilder buf = new StringBuilder();
+ for (String shardUrl : shardUrls) {
+ if (buf.length() > 0) {
+ buf.append(",");
+ }
+ buf.append(shardUrl);
+ }
+
+ ModifiableSolrParams solrParams = new ModifiableSolrParams();
+ solrParams.add("qt", "/stream");
+ solrParams.add("expr", "search(\"collection1, collection2\", q=\"*:*\", fl=\"id, a_i\", rows=50, sort=\"a_i asc\")");
+ SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams);
+ solrStream.setStreamContext(streamContext);
+ tuples = getTuples(solrStream);
+ assert (tuples.size() == 10);
+ assertOrder(tuples, 0, 1, 2, 3, 4,10,11,12,13,14);
+
+ //Test with export handler, different code path.
+
+ solrParams = new ModifiableSolrParams();
+ solrParams.add("qt", "/stream");
+ solrParams.add("expr", "search(\"collection1, collection2\", q=\"*:*\", fl=\"id, a_i\", sort=\"a_i asc\", qt=\"/export\")");
+ solrStream = new SolrStream(shardUrls.get(0), solrParams);
+ solrStream.setStreamContext(streamContext);
+ tuples = getTuples(solrStream);
+ assert (tuples.size() == 10);
+ assertOrder(tuples, 0, 1, 2, 3, 4,10,11,12,13,14);
+
+
+ solrParams = new ModifiableSolrParams();
+ solrParams.add("qt", "/stream");
+ solrParams.add("expr", "facet(\"collection1, collection2\", q=\"*:*\", buckets=\"a_s\", bucketSorts=\"count(*) asc\", count(*))");
+ solrStream = new SolrStream(shardUrls.get(0), solrParams);
+ solrStream.setStreamContext(streamContext);
+ tuples = getTuples(solrStream);
+ assert (tuples.size() == 1);
+ Tuple tuple = tuples.get(0);
+ assertEquals(tuple.getString("a_s"), "hello");
+ assertEquals(tuple.getLong("count(*)").longValue(), 10);
+
+ String expr = "timeseries(\"collection1, collection2\", q=\"*:*\", " +
+ "start=\"2016-01-01T01:00:00.000Z\", " +
+ "end=\"2016-12-01T01:00:00.000Z\", " +
+ "gap=\"+1YEAR\", " +
+ "field=\"test_dt\", " +
+ "format=\"yyyy\","+
+ "count(*))";
+
+ solrParams = new ModifiableSolrParams();
+ solrParams.add("qt", "/stream");
+ solrParams.add("expr", expr);
+ solrStream = new SolrStream(shardUrls.get(0), solrParams);
+ solrStream.setStreamContext(streamContext);
+ tuples = getTuples(solrStream);
+ assert (tuples.size() == 1);
+ tuple = tuples.get(0);
+ assertEquals(tuple.getString("test_dt"), "2016");
+ assertEquals(tuple.getLong("count(*)").longValue(), 10);
+
+ //Test parallel
+
+ solrParams = new ModifiableSolrParams();
+ solrParams.add("qt", "/stream");
+ solrParams.add("expr", "parallel(collection1, sort=\"a_i asc\", workers=2, search(\"collection1, collection2\", q=\"*:*\", fl=\"id, a_i\", sort=\"a_i asc\", qt=\"/export\", partitionKeys=\"a_s\"))");
+ solrStream = new SolrStream(shardUrls.get(0), solrParams);
+ solrStream.setStreamContext(streamContext);
+ tuples = getTuples(solrStream);
+ assert (tuples.size() == 10);
+ assertOrder(tuples, 0, 1, 2, 3, 4,10,11,12,13,14);
+
+ } finally {
+ CollectionAdminRequest.deleteCollection("collection2").process(cluster.getSolrClient());
+ solrClientCache.close();
+ }
+
+
+ }
+
+ @Test
public void testSubFacetStream() throws Exception {
new UpdateRequest()