You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/12/15 10:03:58 UTC

[06/34] lucene-solr:jira/http2: SOLR-13057: Allow search, facet and timeseries Streaming Expressions to accept a comma delimited list of collections

SOLR-13057: Allow search, facet and timeseries Streaming Expressions to accept a comma delimited list of collections


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/7e4555a2
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/7e4555a2
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/7e4555a2

Branch: refs/heads/jira/http2
Commit: 7e4555a2fdb863d6aac2f785116f8f13e51bf16b
Parents: ce9a801
Author: Joel Bernstein <jb...@apache.org>
Authored: Wed Dec 12 09:15:41 2018 -0500
Committer: Joel Bernstein <jb...@apache.org>
Committed: Wed Dec 12 09:16:08 2018 -0500

----------------------------------------------------------------------
 .../client/solrj/io/stream/CloudSolrStream.java |  21 +++-
 .../client/solrj/io/stream/FacetStream.java     |  11 +-
 .../solrj/io/stream/SearchFacadeStream.java     |   6 ++
 .../client/solrj/io/stream/SearchStream.java    |   6 +-
 .../solrj/io/stream/TimeSeriesStream.java       |  11 +-
 .../solrj/io/stream/StreamExpressionTest.java   | 108 +++++++++++++++++++
 6 files changed, 155 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7e4555a2/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index 2cff0a7..f871473 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -170,7 +170,11 @@ public class CloudSolrStream extends TupleStream implements Expressible {
     StreamExpression expression = new StreamExpression("search");
     
     // collection
-    expression.addParameter(collection);
+    if(collection.indexOf(',') > -1) {
+      expression.addParameter("\""+collection+"\"");
+    } else {
+      expression.addParameter(collection);
+    }
     
     for (Entry<String, String[]> param : params.getMap().entrySet()) {
       for (String val : param.getValue()) {
@@ -334,11 +338,18 @@ public class CloudSolrStream extends TupleStream implements Expressible {
     //  which is something already supported in other parts of Solr
 
     // check for alias or collection
-    List<String> collections = checkAlias
-        ? zkStateReader.getAliases().resolveAliases(collectionName)  // if not an alias, returns collectionName
-        : Collections.singletonList(collectionName);
+
+    List<String> allCollections = new ArrayList();
+    String[] collectionNames = collectionName.split(",");
+    for(String col : collectionNames) {
+      List<String> collections = checkAlias
+          ? zkStateReader.getAliases().resolveAliases(col)  // if not an alias, returns collectionName
+          : Collections.singletonList(collectionName);
+      allCollections.addAll(collections);
+    }
+
     // Lookup all actives slices for these collections
-    List<Slice> slices = collections.stream()
+    List<Slice> slices = allCollections.stream()
         .map(collectionsMap::get)
         .filter(Objects::nonNull)
         .flatMap(docCol -> Arrays.stream(docCol.getActiveSlicesArr()))

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7e4555a2/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
index b84967c..38d0904 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
@@ -100,6 +100,11 @@ public class FacetStream extends TupleStream implements Expressible  {
   public FacetStream(StreamExpression expression, StreamFactory factory) throws IOException{   
     // grab all parameters out
     String collectionName = factory.getValueOperand(expression, 0);
+
+    if(collectionName.indexOf('"') > -1) {
+      collectionName = collectionName.replaceAll("\"", "").replaceAll(" ", "");
+    }
+
     List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
     StreamExpressionNamedParameter bucketExpression = factory.getNamedOperand(expression, "buckets");
     StreamExpressionNamedParameter bucketSortExpression = factory.getNamedOperand(expression, "bucketSorts");
@@ -378,7 +383,11 @@ public class FacetStream extends TupleStream implements Expressible  {
     StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
     
     // collection
-    expression.addParameter(collection);
+    if(collection.indexOf(',') > -1) {
+      expression.addParameter("\""+collection+"\"");
+    } else {
+      expression.addParameter(collection);
+    }
     
     // parameters
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7e4555a2/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchFacadeStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchFacadeStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchFacadeStream.java
index 5e8b549..de2b20e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchFacadeStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchFacadeStream.java
@@ -48,6 +48,12 @@ public class SearchFacadeStream extends TupleStream implements Expressible {
   public SearchFacadeStream(StreamExpression expression, StreamFactory factory) throws IOException{
     // grab all parameters out
     String collectionName = factory.getValueOperand(expression, 0);
+
+    //Handle comma delimited list of collections.
+    if(collectionName.indexOf('"') > -1) {
+      collectionName = collectionName.replaceAll("\"", "").replaceAll(" ", "");
+    }
+
     List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
     StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7e4555a2/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
index a4ed996..3643969 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
@@ -128,7 +128,11 @@ public class SearchStream extends TupleStream implements Expressible  {
     StreamExpression expression = new StreamExpression("search");
 
     // collection
-    expression.addParameter(collection);
+    if(collection.indexOf(',') > -1) {
+      expression.addParameter("\""+collection+"\"");
+    } else {
+      expression.addParameter(collection);
+    }
 
     for (Entry<String, String[]> param : params.getMap().entrySet()) {
       for (String val : param.getValue()) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7e4555a2/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
index 13e72fa..764c70b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
@@ -89,6 +89,11 @@ public class TimeSeriesStream extends TupleStream implements Expressible  {
   public TimeSeriesStream(StreamExpression expression, StreamFactory factory) throws IOException{
     // grab all parameters out
     String collectionName = factory.getValueOperand(expression, 0);
+
+    if(collectionName.indexOf('"') > -1) {
+      collectionName = collectionName.replaceAll("\"", "").replaceAll(" ", "");
+    }
+
     List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
     StreamExpressionNamedParameter startExpression = factory.getNamedOperand(expression, "start");
     StreamExpressionNamedParameter endExpression = factory.getNamedOperand(expression, "end");
@@ -212,7 +217,11 @@ public class TimeSeriesStream extends TupleStream implements Expressible  {
     // function name
     StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
     // collection
-    expression.addParameter(collection);
+    if(collection.indexOf(',') > -1) {
+      expression.addParameter("\""+collection+"\"");
+    } else {
+      expression.addParameter(collection);
+    }
 
     // parameters
     ModifiableSolrParams tmpParams = new ModifiableSolrParams(params);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7e4555a2/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 2725903..e271401 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -1400,6 +1400,114 @@ public class StreamExpressionTest extends SolrCloudTestCase {
   }
 
   @Test
+  public void testMultiCollection() throws Exception {
+
+    CollectionAdminRequest.createCollection("collection2", "conf", 2, 1).process(cluster.getSolrClient());
+    cluster.waitForActiveCollection("collection2", 2, 2);
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "test_dt", getDateString("2016", "5", "1"), "i_multi", "4", "i_multi", "7")
+        .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "test_dt", getDateString("2016", "5", "1"), "i_multi", "44", "i_multi", "77")
+        .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "test_dt", getDateString("2016", "5", "1"), "i_multi", "444", "i_multi", "777")
+        .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "test_dt", getDateString("2016", "5", "1"), "i_multi", "4444", "i_multi", "7777")
+        .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "test_dt", getDateString("2016", "5", "1"), "i_multi", "44444", "i_multi", "77777")
+        .commit(cluster.getSolrClient(), "collection1");
+
+    new UpdateRequest()
+        .add(id, "10", "a_s", "hello", "a_i", "10", "a_f", "0", "s_multi", "aaaa", "test_dt", getDateString("2016", "5", "1"), "i_multi", "4", "i_multi", "7")
+        .add(id, "12", "a_s", "hello", "a_i", "12", "a_f", "0", "s_multi", "aaaa1", "test_dt", getDateString("2016", "5", "1"), "i_multi", "44", "i_multi", "77")
+        .add(id, "13", "a_s", "hello", "a_i", "13", "a_f", "3", "s_multi", "aaaa2", "test_dt", getDateString("2016", "5", "1"),  "i_multi", "444", "i_multi", "777")
+        .add(id, "14", "a_s", "hello", "a_i", "14", "a_f", "4", "s_multi", "aaaa3", "test_dt", getDateString("2016", "5", "1"), "i_multi", "4444", "i_multi", "7777")
+        .add(id, "11", "a_s", "hello", "a_i", "11", "a_f", "1", "s_multi", "aaaa4", "test_dt", getDateString("2016", "5", "1"), "i_multi", "44444", "i_multi", "77777")
+        .commit(cluster.getSolrClient(), "collection2");
+
+
+    List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext);
+
+    try {
+      StringBuilder buf = new StringBuilder();
+      for (String shardUrl : shardUrls) {
+        if (buf.length() > 0) {
+          buf.append(",");
+        }
+        buf.append(shardUrl);
+      }
+
+      ModifiableSolrParams solrParams = new ModifiableSolrParams();
+      solrParams.add("qt", "/stream");
+      solrParams.add("expr", "search(\"collection1, collection2\", q=\"*:*\", fl=\"id, a_i\", rows=50, sort=\"a_i asc\")");
+      SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams);
+      solrStream.setStreamContext(streamContext);
+      tuples = getTuples(solrStream);
+      assert (tuples.size() == 10);
+      assertOrder(tuples, 0, 1, 2, 3, 4,10,11,12,13,14);
+
+      //Test with export handler, different code path.
+
+      solrParams = new ModifiableSolrParams();
+      solrParams.add("qt", "/stream");
+      solrParams.add("expr", "search(\"collection1, collection2\", q=\"*:*\", fl=\"id, a_i\", sort=\"a_i asc\", qt=\"/export\")");
+      solrStream = new SolrStream(shardUrls.get(0), solrParams);
+      solrStream.setStreamContext(streamContext);
+      tuples = getTuples(solrStream);
+      assert (tuples.size() == 10);
+      assertOrder(tuples, 0, 1, 2, 3, 4,10,11,12,13,14);
+
+
+      solrParams = new ModifiableSolrParams();
+      solrParams.add("qt", "/stream");
+      solrParams.add("expr", "facet(\"collection1, collection2\", q=\"*:*\", buckets=\"a_s\", bucketSorts=\"count(*) asc\", count(*))");
+      solrStream = new SolrStream(shardUrls.get(0), solrParams);
+      solrStream.setStreamContext(streamContext);
+      tuples = getTuples(solrStream);
+      assert (tuples.size() == 1);
+      Tuple tuple = tuples.get(0);
+      assertEquals(tuple.getString("a_s"), "hello");
+      assertEquals(tuple.getLong("count(*)").longValue(), 10);
+
+      String expr = "timeseries(\"collection1, collection2\", q=\"*:*\", " +
+              "start=\"2016-01-01T01:00:00.000Z\", " +
+              "end=\"2016-12-01T01:00:00.000Z\", " +
+              "gap=\"+1YEAR\", " +
+              "field=\"test_dt\", " +
+              "format=\"yyyy\","+
+              "count(*))";
+
+      solrParams = new ModifiableSolrParams();
+      solrParams.add("qt", "/stream");
+      solrParams.add("expr", expr);
+      solrStream = new SolrStream(shardUrls.get(0), solrParams);
+      solrStream.setStreamContext(streamContext);
+      tuples = getTuples(solrStream);
+      assert (tuples.size() == 1);
+      tuple = tuples.get(0);
+      assertEquals(tuple.getString("test_dt"), "2016");
+      assertEquals(tuple.getLong("count(*)").longValue(), 10);
+
+      //Test parallel
+
+      solrParams = new ModifiableSolrParams();
+      solrParams.add("qt", "/stream");
+      solrParams.add("expr", "parallel(collection1, sort=\"a_i asc\", workers=2, search(\"collection1, collection2\", q=\"*:*\", fl=\"id, a_i\", sort=\"a_i asc\", qt=\"/export\", partitionKeys=\"a_s\"))");
+      solrStream = new SolrStream(shardUrls.get(0), solrParams);
+      solrStream.setStreamContext(streamContext);
+      tuples = getTuples(solrStream);
+      assert (tuples.size() == 10);
+      assertOrder(tuples, 0, 1, 2, 3, 4,10,11,12,13,14);
+
+    } finally {
+      CollectionAdminRequest.deleteCollection("collection2").process(cluster.getSolrClient());
+      solrClientCache.close();
+    }
+
+
+  }
+
+  @Test
   public void testSubFacetStream() throws Exception {
 
     new UpdateRequest()