You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by tf...@apache.org on 2017/05/19 00:14:02 UTC

[41/50] [abbrv] lucene-solr:jira/solr-10233: SOLR-10623: Add sql Streaming Expression

SOLR-10623: Add sql Streaming Expression


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/f326cfb1
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/f326cfb1
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/f326cfb1

Branch: refs/heads/jira/solr-10233
Commit: f326cfb17569eb7607084f7fca1ae9aeff31e383
Parents: 5a50887
Author: Joel Bernstein <jb...@apache.org>
Authored: Thu May 18 12:38:22 2017 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Thu May 18 12:50:59 2017 -0400

----------------------------------------------------------------------
 .../org/apache/solr/handler/StreamHandler.java  |   1 +
 .../solr/client/solrj/io/stream/SqlStream.java  | 280 +++++++++++++++++++
 .../solrj/io/stream/StreamExpressionTest.java   |  40 +++
 3 files changed, 321 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f326cfb1/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index 99f5cc3..bf37383 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -167,6 +167,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
       .withFunctionName("get", GetStream.class)
       .withFunctionName("timeseries", TimeSeriesStream.class)
       .withFunctionName("tuple", TupStream.class)
+      .withFunctionName("sql", SqlStream.class)
       .withFunctionName("col", ColumnEvaluator.class)
       .withFunctionName("predict", PredictEvaluator.class)
       .withFunctionName("regress", RegressionEvaluator.class)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f326cfb1/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SqlStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SqlStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SqlStream.java
new file mode 100644
index 0000000..5a81b74
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SqlStream.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.StrUtils;
+
+public class SqlStream extends TupleStream implements Expressible {
+
+  private static final long serialVersionUID = 1;
+
+  protected String zkHost;
+  protected String collection;
+  protected SolrParams params;
+  protected transient CloudSolrClient cloudSolrClient;
+  protected transient TupleStream tupleStream;
+  protected transient StreamContext streamContext;
+
+  /**
+   * @param zkHost         Zookeeper ensemble connection string
+   * @param collectionName Name of the collection to operate on
+   * @param params         Map&lt;String, String&gt; of parameter/value pairs
+   * @throws IOException Something went wrong
+   *                     <p>
+   *                     This form does not allow specifying multiple clauses, say "fq" clauses, use the form that
+   *                     takes a SolrParams. Transition code can call the preferred method that takes SolrParams
+   *                     by calling CloudSolrStream(zkHost, collectionName,
+   *                     new ModifiableSolrParams(SolrParams.toMultiMap(new NamedList(Map&lt;String, String&gt;)));
+   * @deprecated         Use the constructor that has a SolrParams obj rather than a Map
+   */
+  
+  public SqlStream(String zkHost, String collectionName, SolrParams params) throws IOException {
+    init(collectionName, zkHost, params);
+  }
+
+  public SqlStream(StreamExpression expression, StreamFactory factory) throws IOException{
+    // grab all parameters out
+    String collectionName = factory.getValueOperand(expression, 0);
+    List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
+    StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
+
+    // Collection Name
+    if(null == collectionName){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
+    }
+
+    // Validate there are no unknown parameters - zkHost and alias are namedParameter so we don't need to count it twice
+    if(expression.getParameters().size() != 1 + namedParams.size()){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - unknown operands found",expression));
+    }
+
+    // Named parameters - passed directly to solr as solrparams
+    if(0 == namedParams.size()){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression));
+    }
+
+    ModifiableSolrParams mParams = new ModifiableSolrParams();
+    for(StreamExpressionNamedParameter namedParam : namedParams){
+      if(!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("aliases")){
+        mParams.add(namedParam.getName(), namedParam.getParameter().toString().trim());
+      }
+    }
+
+    // zkHost, optional - if not provided then will look into factory list to get
+    String zkHost = null;
+    if(null == zkHostExpression){
+      zkHost = factory.getCollectionZkHost(collectionName);
+      if(zkHost == null) {
+        zkHost = factory.getDefaultZkHost();
+      }
+    }
+    else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
+      zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
+    }
+    /*
+    if(null == zkHost){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
+    }
+    */
+
+    // We've got all the required items
+    init(collectionName, zkHost, mParams);
+  }
+
+  @Override
+  public StreamExpression toExpression(StreamFactory factory) throws IOException {
+    // functionName(collectionName, param1, param2, ..., paramN, sort="comp", [aliases="field=alias,..."])
+
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(getClass()));
+
+    // collection
+    expression.addParameter(collection);
+
+    // parameters
+
+    ModifiableSolrParams mParams = new ModifiableSolrParams(SolrParams.toMultiMap(params.toNamedList()));
+    for (Entry<String, String[]> param : mParams.getMap().entrySet()) {
+      String value = String.join(",", param.getValue());
+
+      // SOLR-8409: This is a special case where the params contain a " character
+      // Do note that in any other BASE streams with parameters where a " might come into play
+      // that this same replacement needs to take place.
+      value = value.replace("\"", "\\\"");
+
+      expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), value));
+    }
+
+    // zkHost
+    expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
+
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+    StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
+
+    explanation.setFunctionName(factory.getFunctionName(this.getClass()));
+    explanation.setImplementingClass(this.getClass().getName());
+    explanation.setExpressionType(ExpressionType.STREAM_SOURCE);
+    explanation.setExpression(toExpression(factory).toString());
+
+    // child is a datastore so add it at this point
+    StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore");
+    child.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection));
+    child.setImplementingClass("Solr/Lucene");
+    child.setExpressionType(ExpressionType.DATASTORE);
+
+    if(null != params){
+      ModifiableSolrParams mParams = new ModifiableSolrParams(params);
+      child.setExpression(mParams.getMap().entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
+    }
+    explanation.addChild(child);
+
+    return explanation;
+  }
+
+  protected void init(String collectionName, String zkHost, SolrParams params) throws IOException {
+    this.zkHost = zkHost;
+    this.collection = collectionName;
+    this.params = new ModifiableSolrParams(params);
+
+    // If the comparator is null then it was not explicitly set so we will create one using the sort parameter
+    // of the query. While doing this we will also take into account any aliases such that if we are sorting on
+    // fieldA but fieldA is aliased to alias.fieldA then the comparater will be against alias.fieldA.
+
+    if (params.get("stmt") == null) {
+      throw new IOException("stmt param expected for search function");
+    }
+  }
+
+  public void setStreamContext(StreamContext context) {
+    this.streamContext = context;
+  }
+
+  /**
+   * Opens the CloudSolrStream
+   *
+   ***/
+  public void open() throws IOException {
+    constructStream();
+    tupleStream.open();
+  }
+
+  public List<TupleStream> children() {
+    return null;
+  }
+
+
+  public static Collection<Slice> getSlices(String collectionName, ZkStateReader zkStateReader, boolean checkAlias) throws IOException {
+    ClusterState clusterState = zkStateReader.getClusterState();
+
+    Map<String, DocCollection> collectionsMap = clusterState.getCollectionsMap();
+
+    // Check collection case sensitive
+    if(collectionsMap.containsKey(collectionName)) {
+      return collectionsMap.get(collectionName).getActiveSlices();
+    }
+
+    // Check collection case insensitive
+    for(String collectionMapKey : collectionsMap.keySet()) {
+      if(collectionMapKey.equalsIgnoreCase(collectionName)) {
+        return collectionsMap.get(collectionMapKey).getActiveSlices();
+      }
+    }
+
+    if(checkAlias) {
+      // check for collection alias
+      Aliases aliases = zkStateReader.getAliases();
+      String alias = aliases.getCollectionAlias(collectionName);
+      if (alias != null) {
+        Collection<Slice> slices = new ArrayList<>();
+
+        List<String> aliasList = StrUtils.splitSmart(alias, ",", true);
+        for (String aliasCollectionName : aliasList) {
+          // Add all active slices for this alias collection
+          slices.addAll(collectionsMap.get(aliasCollectionName).getActiveSlices());
+        }
+
+        return slices;
+      }
+    }
+
+    throw new IOException("Slices not found for " + collectionName);
+  }
+
+  protected void constructStream() throws IOException {
+    try {
+
+      List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
+      Collections.shuffle(shardUrls);
+      String url  = shardUrls.get(0);
+      ModifiableSolrParams mParams = new ModifiableSolrParams(params);
+      mParams.add(CommonParams.QT, "/sql");
+      this.tupleStream = new SolrStream(url, mParams);
+      if(streamContext != null) {
+        tupleStream.setStreamContext(streamContext);
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  public void close() throws IOException {
+    tupleStream.close();
+  }
+
+  /** Return the stream sort - ie, the order in which records are returned */
+  public StreamComparator getStreamSort(){
+    return null;
+  }
+
+  public Tuple read() throws IOException {
+    return tupleStream.read();
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f326cfb1/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 129de1c..e651338 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -242,6 +242,46 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     }
   }
 
+  @Test
+  public void testSqlStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    List<Tuple> tuples;
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+    List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext);
+
+    try {
+      StringBuilder buf = new StringBuilder();
+      for (String shardUrl : shardUrls) {
+        if (buf.length() > 0) {
+          buf.append(",");
+        }
+        buf.append(shardUrl);
+      }
+
+      ModifiableSolrParams solrParams = new ModifiableSolrParams();
+      solrParams.add("qt", "/stream");
+      solrParams.add("expr", "sql("+COLLECTIONORALIAS+", stmt=\"select id from collection1 order by a_i asc\")");
+      SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams);
+      solrStream.setStreamContext(streamContext);
+      tuples = getTuples(solrStream);
+      assert (tuples.size() == 5);
+      assertOrder(tuples, 0, 1, 2, 3, 4);
+
+    } finally {
+      solrClientCache.close();
+    }
+  }
+