You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/10/31 05:06:54 UTC

[08/50] [abbrv] lucene-solr:jira/http2_benchmark: SOLR-12829: Add plist (parallel list) Streaming Expression

SOLR-12829: Add plist (parallel list) Streaming Expression


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/fcaea07f
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/fcaea07f
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/fcaea07f

Branch: refs/heads/jira/http2_benchmark
Commit: fcaea07f3c8cba34906ca02f40fb1d2c40badc08
Parents: c9776d8
Author: Joel Bernstein <jb...@apache.org>
Authored: Mon Oct 22 15:20:13 2018 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Mon Oct 22 15:20:13 2018 -0400

----------------------------------------------------------------------
 .../org/apache/solr/client/solrj/io/Lang.java   |   1 +
 .../solr/client/solrj/io/stream/ListStream.java |   4 +
 .../solrj/io/stream/ParallelListStream.java     | 205 +++++++++++++++++++
 .../solr/client/solrj/io/stream/TupStream.java  |  89 ++++----
 .../apache/solr/client/solrj/io/TestLang.java   |   2 +-
 .../solrj/io/stream/MathExpressionTest.java     |   8 +-
 6 files changed, 260 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fcaea07f/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
index 7cc842f..2be48e3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
@@ -90,6 +90,7 @@ public class Lang {
         .withFunctionName("timeseries", TimeSeriesStream.class)
         .withFunctionName("tuple", TupStream.class)
         .withFunctionName("sql", SqlStream.class)
+        .withFunctionName("plist", ParallelListStream.class)
 
             // metrics
         .withFunctionName("min", MinMetric.class)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fcaea07f/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java
index 826e948..33f8fd5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java
@@ -108,6 +108,10 @@ public class ListStream extends TupleStream implements Expressible {
       if (currentStream == null) {
         if (streamIndex < streams.length) {
           currentStream = streams[streamIndex];
+          // Set the stream to null in the array of streams once its been set to the current stream.
+          // This will remove the reference to the stream
+          // and should allow it to be garbage collected once it's no longer the current stream.
+          streams[streamIndex] = null;
           currentStream.open();
         } else {
           HashMap map = new HashMap();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fcaea07f/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java
new file mode 100644
index 0000000..ef02ffa
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+
+public class ParallelListStream extends TupleStream implements Expressible {
+
+  private static final long serialVersionUID = 1;
+  private TupleStream[] streams;
+  private TupleStream currentStream;
+  private int streamIndex;
+
+  public ParallelListStream(TupleStream... streams) throws IOException {
+    init(streams);
+  }
+
+  public ParallelListStream(StreamExpression expression, StreamFactory factory) throws IOException {
+    List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
+    TupleStream[] streams = new TupleStream[streamExpressions.size()];
+    for(int idx = 0; idx < streamExpressions.size(); ++idx){
+      streams[idx] = factory.constructStream(streamExpressions.get(idx));
+    }
+
+    init(streams);
+  }
+
+  private void init(TupleStream ... tupleStreams) {
+    this.streams = tupleStreams;
+  }
+
+  @Override
+  public StreamExpression toExpression(StreamFactory factory) throws IOException{
+    return toExpression(factory, true);
+  }
+
+  private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    if(includeStreams) {
+      for(TupleStream stream : streams) {
+        expression.addParameter(((Expressible)stream).toExpression(factory));
+      }
+    }
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+    StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
+    explanation.setFunctionName(factory.getFunctionName(this.getClass()));
+    explanation.setImplementingClass(this.getClass().getName());
+    explanation.setExpressionType(ExpressionType.STREAM_DECORATOR);
+    explanation.setExpression(toExpression(factory, false).toString());
+    for(TupleStream stream : streams) {
+      explanation.addChild(stream.toExplanation(factory));
+    }
+
+    return explanation;
+  }
+
+  public void setStreamContext(StreamContext context) {
+    for(TupleStream stream : streams) {
+      stream.setStreamContext(context);
+    }
+  }
+
+  public List<TupleStream> children() {
+    List<TupleStream> l =  new ArrayList<TupleStream>();
+    for(TupleStream stream : streams) {
+      l.add(stream);
+    }
+    return l;
+  }
+
+  public Tuple read() throws IOException {
+    while(true) {
+      if (currentStream == null) {
+        if (streamIndex < streams.length) {
+          currentStream = streams[streamIndex];
+        } else {
+          HashMap map = new HashMap();
+          map.put("EOF", true);
+          return new Tuple(map);
+        }
+      }
+
+      Tuple tuple = currentStream.read();
+      if (tuple.EOF) {
+        currentStream.close();
+        currentStream = null;
+        ++streamIndex;
+      } else {
+        return tuple;
+      }
+    }
+  }
+
+  public void close() throws IOException {
+  }
+
+  public void open() throws IOException {
+    openStreams();
+  }
+
+  private void openStreams() throws IOException {
+    ExecutorService service = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("ParallelListStream"));
+    try {
+      List<Future<StreamIndex>> futures = new ArrayList();
+      int i=0;
+      for (TupleStream tupleStream : streams) {
+        StreamOpener so = new StreamOpener(new StreamIndex(tupleStream, i++));
+        Future<StreamIndex> future = service.submit(so);
+        futures.add(future);
+      }
+
+      try {
+        for (Future<StreamIndex> f : futures) {
+          StreamIndex streamIndex = f.get();
+          this.streams[streamIndex.getIndex()] = streamIndex.getTupleStream();
+        }
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    } finally {
+      service.shutdown();
+    }
+  }
+
+  protected class StreamOpener implements Callable<StreamIndex> {
+
+    private StreamIndex streamIndex;
+
+    public StreamOpener(StreamIndex streamIndex) {
+      this.streamIndex = streamIndex;
+    }
+
+    public StreamIndex call() throws Exception {
+      streamIndex.getTupleStream().open();
+      return streamIndex;
+    }
+  }
+
+  protected class StreamIndex {
+    private TupleStream tupleStream;
+    private int index;
+
+    public StreamIndex(TupleStream tupleStream, int index) {
+      this.tupleStream = tupleStream;
+      this.index = index;
+    }
+
+    public int getIndex() {
+      return this.index;
+    }
+
+    public TupleStream getTupleStream() {
+      return this.tupleStream;
+    }
+  }
+
+  /** Return the stream sort - ie, the order in which records are returned */
+  public StreamComparator getStreamSort(){
+    return null;
+  }
+
+  public int getCost() {
+    return 0;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fcaea07f/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
index c87dc24..fde8298 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
@@ -51,6 +51,7 @@ public class TupStream extends TupleStream implements Expressible {
   private Map<String,TupleStream> streamParams = new HashMap<>();
   private List<String> fieldNames = new ArrayList();
   private Map<String, String> fieldLabels = new HashMap();
+  private Tuple tup = null;
   
   private boolean finished;
 
@@ -151,50 +152,6 @@ public class TupStream extends TupleStream implements Expressible {
       return new Tuple(m);
     } else {
       finished = true;
-      Map<String, Object> values = new HashMap<>();
-      
-      // add all string based params
-      // these could come from the context, or they will just be treated as straight strings
-      for(Entry<String,String> param : stringParams.entrySet()){
-        if(streamContext.getLets().containsKey(param.getValue())){
-          values.put(param.getKey(), streamContext.getLets().get(param.getValue()));
-        }
-        else{
-          values.put(param.getKey(), param.getValue());
-        }
-      }
-      
-      // add all evaluators
-      for(Entry<String,StreamEvaluator> param : evaluatorParams.entrySet()){
-        values.put(param.getKey(), param.getValue().evaluateOverContext());
-      }
-      
-      // Add all streams
-      for(Entry<String,TupleStream> param : streamParams.entrySet()){
-        
-        try{
-          List<Tuple> streamTuples = new ArrayList();
-          // open the stream, closed in finally block
-          param.getValue().open();
-          
-          // read all values from stream (memory expensive)
-          Tuple streamTuple = param.getValue().read();
-          while(!streamTuple.EOF){
-            streamTuples.add(streamTuple);
-            streamTuple = param.getValue().read();
-          }
-          
-          values.put(param.getKey(), streamTuples);
-        }
-        finally{
-          // safely close the stream
-          param.getValue().close();
-        }        
-      }
-
-      Tuple tup = new Tuple(values);
-      tup.fieldNames = fieldNames;
-      tup.fieldLabels = fieldLabels;
       return tup;
     }
   }
@@ -204,6 +161,50 @@ public class TupStream extends TupleStream implements Expressible {
   }
 
   public void open() throws IOException {
+    Map<String, Object> values = new HashMap<>();
+
+    // add all string based params
+    // these could come from the context, or they will just be treated as straight strings
+    for(Entry<String,String> param : stringParams.entrySet()){
+      if(streamContext.getLets().containsKey(param.getValue())){
+        values.put(param.getKey(), streamContext.getLets().get(param.getValue()));
+      }
+      else{
+        values.put(param.getKey(), param.getValue());
+      }
+    }
+
+    // add all evaluators
+    for(Entry<String,StreamEvaluator> param : evaluatorParams.entrySet()){
+      values.put(param.getKey(), param.getValue().evaluateOverContext());
+    }
+
+    // Add all streams
+    for(Entry<String,TupleStream> param : streamParams.entrySet()){
+
+      try{
+        List<Tuple> streamTuples = new ArrayList();
+        // open the stream, closed in finally block
+        param.getValue().open();
+
+        // read all values from stream (memory expensive)
+        Tuple streamTuple = param.getValue().read();
+        while(!streamTuple.EOF){
+          streamTuples.add(streamTuple);
+          streamTuple = param.getValue().read();
+        }
+
+        values.put(param.getKey(), streamTuples);
+      }
+      finally{
+        // safely close the stream
+        param.getValue().close();
+      }
+    }
+
+    this.tup = new Tuple(values);
+    tup.fieldNames = fieldNames;
+    tup.fieldLabels = fieldLabels;
     // nothing to do here
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fcaea07f/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
index 07b0938..e06b973 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
@@ -73,7 +73,7 @@ public class TestLang extends LuceneTestCase {
       "outliers", "stream", "getCache", "putCache", "listCache", "removeCache", "zscores", "latlonVectors",
       "convexHull", "getVertices", "getBaryCenter", "getArea", "getBoundarySize","oscillate",
       "getAmplitude", "getPhase", "getAngularFrequency", "enclosingDisk", "getCenter", "getRadius",
-      "getSupportPoints", "pairSort", "log10"};
+      "getSupportPoints", "pairSort", "log10", "plist"};
 
   @Test
   public void testLang() {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fcaea07f/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
index 78fc2ce..2bff1ab 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
@@ -235,7 +235,7 @@ public class MathExpressionTest extends SolrCloudTestCase {
   @Test
   public void testMemsetSize() throws Exception {
     String expr = "let(echo=\"b, c\"," +
-        "              a=memset(list(tuple(field1=val(1), field2=val(10)), tuple(field1=val(2), field2=val(20))), " +
+        "              a=memset(plist(tuple(field1=val(1), field2=val(10)), tuple(field1=val(2), field2=val(20))), " +
         "                       cols=\"field1, field2\", " +
         "                       vars=\"f1, f2\"," +
         "                       size=1)," +
@@ -1974,7 +1974,7 @@ public class MathExpressionTest extends SolrCloudTestCase {
     //Test exclude. This should drop off the term jim
 
     cexpr = "let(echo=true," +
-        "        a=select(list(tuple(id=\"1\", text=\"hello world\"), " +
+        "        a=select(plist(tuple(id=\"1\", text=\"hello world\"), " +
         "                      tuple(id=\"2\", text=\"hello steve\"), " +
         "                      tuple(id=\"3\", text=\"hello jim jim\"), " +
         "                      tuple(id=\"4\", text=\"hello jack\")), id, analyze(text, test_t) as terms)," +
@@ -2046,7 +2046,7 @@ public class MathExpressionTest extends SolrCloudTestCase {
     //Test minDocFreq attribute at .5. This should eliminate all but the term hello
 
     cexpr = "let(echo=true," +
-        "a=select(list(tuple(id=\"1\", text=\"hello world\"), " +
+        "a=select(plist(tuple(id=\"1\", text=\"hello world\"), " +
         "tuple(id=\"2\", text=\"hello steve\"), " +
         "tuple(id=\"3\", text=\"hello jim jim\"), " +
         "tuple(id=\"4\", text=\"hello jack\")), id, analyze(text, test_t) as terms)," +
@@ -2100,7 +2100,7 @@ public class MathExpressionTest extends SolrCloudTestCase {
     //Test maxDocFreq attribute at 0. This should eliminate all terms
 
     cexpr = "let(echo=true," +
-        "a=select(list(tuple(id=\"1\", text=\"hello world\"), " +
+        "a=select(plist(tuple(id=\"1\", text=\"hello world\"), " +
         "tuple(id=\"2\", text=\"hello steve\"), " +
         "tuple(id=\"3\", text=\"hello jim jim\"), " +
         "tuple(id=\"4\", text=\"hello jack\")), id, analyze(text, test_t) as terms)," +