You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/10/31 05:06:54 UTC
[08/50] [abbrv] lucene-solr:jira/http2_benchmark: SOLR-12829: Add
plist (parallel list) Streaming Expression
SOLR-12829: Add plist (parallel list) Streaming Expression
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/fcaea07f
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/fcaea07f
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/fcaea07f
Branch: refs/heads/jira/http2_benchmark
Commit: fcaea07f3c8cba34906ca02f40fb1d2c40badc08
Parents: c9776d8
Author: Joel Bernstein <jb...@apache.org>
Authored: Mon Oct 22 15:20:13 2018 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Mon Oct 22 15:20:13 2018 -0400
----------------------------------------------------------------------
.../org/apache/solr/client/solrj/io/Lang.java | 1 +
.../solr/client/solrj/io/stream/ListStream.java | 4 +
.../solrj/io/stream/ParallelListStream.java | 205 +++++++++++++++++++
.../solr/client/solrj/io/stream/TupStream.java | 89 ++++----
.../apache/solr/client/solrj/io/TestLang.java | 2 +-
.../solrj/io/stream/MathExpressionTest.java | 8 +-
6 files changed, 260 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fcaea07f/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
index 7cc842f..2be48e3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
@@ -90,6 +90,7 @@ public class Lang {
.withFunctionName("timeseries", TimeSeriesStream.class)
.withFunctionName("tuple", TupStream.class)
.withFunctionName("sql", SqlStream.class)
+ .withFunctionName("plist", ParallelListStream.class)
// metrics
.withFunctionName("min", MinMetric.class)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fcaea07f/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java
index 826e948..33f8fd5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java
@@ -108,6 +108,10 @@ public class ListStream extends TupleStream implements Expressible {
if (currentStream == null) {
if (streamIndex < streams.length) {
currentStream = streams[streamIndex];
+ // Set the stream to null in the array of streams once its been set to the current stream.
+ // This will remove the reference to the stream
+ // and should allow it to be garbage collected once it's no longer the current stream.
+ streams[streamIndex] = null;
currentStream.open();
} else {
HashMap map = new HashMap();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fcaea07f/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java
new file mode 100644
index 0000000..ef02ffa
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+
+public class ParallelListStream extends TupleStream implements Expressible {
+
+ private static final long serialVersionUID = 1;
+ private TupleStream[] streams;
+ private TupleStream currentStream;
+ private int streamIndex;
+
+ public ParallelListStream(TupleStream... streams) throws IOException {
+ init(streams);
+ }
+
+ public ParallelListStream(StreamExpression expression, StreamFactory factory) throws IOException {
+ List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
+ TupleStream[] streams = new TupleStream[streamExpressions.size()];
+ for(int idx = 0; idx < streamExpressions.size(); ++idx){
+ streams[idx] = factory.constructStream(streamExpressions.get(idx));
+ }
+
+ init(streams);
+ }
+
+ private void init(TupleStream ... tupleStreams) {
+ this.streams = tupleStreams;
+ }
+
+ @Override
+ public StreamExpression toExpression(StreamFactory factory) throws IOException{
+ return toExpression(factory, true);
+ }
+
+ private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
+ // function name
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+ if(includeStreams) {
+ for(TupleStream stream : streams) {
+ expression.addParameter(((Expressible)stream).toExpression(factory));
+ }
+ }
+ return expression;
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+ StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
+ explanation.setFunctionName(factory.getFunctionName(this.getClass()));
+ explanation.setImplementingClass(this.getClass().getName());
+ explanation.setExpressionType(ExpressionType.STREAM_DECORATOR);
+ explanation.setExpression(toExpression(factory, false).toString());
+ for(TupleStream stream : streams) {
+ explanation.addChild(stream.toExplanation(factory));
+ }
+
+ return explanation;
+ }
+
+ public void setStreamContext(StreamContext context) {
+ for(TupleStream stream : streams) {
+ stream.setStreamContext(context);
+ }
+ }
+
+ public List<TupleStream> children() {
+ List<TupleStream> l = new ArrayList<TupleStream>();
+ for(TupleStream stream : streams) {
+ l.add(stream);
+ }
+ return l;
+ }
+
+ public Tuple read() throws IOException {
+ while(true) {
+ if (currentStream == null) {
+ if (streamIndex < streams.length) {
+ currentStream = streams[streamIndex];
+ } else {
+ HashMap map = new HashMap();
+ map.put("EOF", true);
+ return new Tuple(map);
+ }
+ }
+
+ Tuple tuple = currentStream.read();
+ if (tuple.EOF) {
+ currentStream.close();
+ currentStream = null;
+ ++streamIndex;
+ } else {
+ return tuple;
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ }
+
+ public void open() throws IOException {
+ openStreams();
+ }
+
+ private void openStreams() throws IOException {
+ ExecutorService service = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("ParallelListStream"));
+ try {
+ List<Future<StreamIndex>> futures = new ArrayList();
+ int i=0;
+ for (TupleStream tupleStream : streams) {
+ StreamOpener so = new StreamOpener(new StreamIndex(tupleStream, i++));
+ Future<StreamIndex> future = service.submit(so);
+ futures.add(future);
+ }
+
+ try {
+ for (Future<StreamIndex> f : futures) {
+ StreamIndex streamIndex = f.get();
+ this.streams[streamIndex.getIndex()] = streamIndex.getTupleStream();
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ } finally {
+ service.shutdown();
+ }
+ }
+
+ protected class StreamOpener implements Callable<StreamIndex> {
+
+ private StreamIndex streamIndex;
+
+ public StreamOpener(StreamIndex streamIndex) {
+ this.streamIndex = streamIndex;
+ }
+
+ public StreamIndex call() throws Exception {
+ streamIndex.getTupleStream().open();
+ return streamIndex;
+ }
+ }
+
+ protected class StreamIndex {
+ private TupleStream tupleStream;
+ private int index;
+
+ public StreamIndex(TupleStream tupleStream, int index) {
+ this.tupleStream = tupleStream;
+ this.index = index;
+ }
+
+ public int getIndex() {
+ return this.index;
+ }
+
+ public TupleStream getTupleStream() {
+ return this.tupleStream;
+ }
+ }
+
+ /** Return the stream sort - ie, the order in which records are returned */
+ public StreamComparator getStreamSort(){
+ return null;
+ }
+
+ public int getCost() {
+ return 0;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fcaea07f/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
index c87dc24..fde8298 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
@@ -51,6 +51,7 @@ public class TupStream extends TupleStream implements Expressible {
private Map<String,TupleStream> streamParams = new HashMap<>();
private List<String> fieldNames = new ArrayList();
private Map<String, String> fieldLabels = new HashMap();
+ private Tuple tup = null;
private boolean finished;
@@ -151,50 +152,6 @@ public class TupStream extends TupleStream implements Expressible {
return new Tuple(m);
} else {
finished = true;
- Map<String, Object> values = new HashMap<>();
-
- // add all string based params
- // these could come from the context, or they will just be treated as straight strings
- for(Entry<String,String> param : stringParams.entrySet()){
- if(streamContext.getLets().containsKey(param.getValue())){
- values.put(param.getKey(), streamContext.getLets().get(param.getValue()));
- }
- else{
- values.put(param.getKey(), param.getValue());
- }
- }
-
- // add all evaluators
- for(Entry<String,StreamEvaluator> param : evaluatorParams.entrySet()){
- values.put(param.getKey(), param.getValue().evaluateOverContext());
- }
-
- // Add all streams
- for(Entry<String,TupleStream> param : streamParams.entrySet()){
-
- try{
- List<Tuple> streamTuples = new ArrayList();
- // open the stream, closed in finally block
- param.getValue().open();
-
- // read all values from stream (memory expensive)
- Tuple streamTuple = param.getValue().read();
- while(!streamTuple.EOF){
- streamTuples.add(streamTuple);
- streamTuple = param.getValue().read();
- }
-
- values.put(param.getKey(), streamTuples);
- }
- finally{
- // safely close the stream
- param.getValue().close();
- }
- }
-
- Tuple tup = new Tuple(values);
- tup.fieldNames = fieldNames;
- tup.fieldLabels = fieldLabels;
return tup;
}
}
@@ -204,6 +161,50 @@ public class TupStream extends TupleStream implements Expressible {
}
public void open() throws IOException {
+ Map<String, Object> values = new HashMap<>();
+
+ // add all string based params
+ // these could come from the context, or they will just be treated as straight strings
+ for(Entry<String,String> param : stringParams.entrySet()){
+ if(streamContext.getLets().containsKey(param.getValue())){
+ values.put(param.getKey(), streamContext.getLets().get(param.getValue()));
+ }
+ else{
+ values.put(param.getKey(), param.getValue());
+ }
+ }
+
+ // add all evaluators
+ for(Entry<String,StreamEvaluator> param : evaluatorParams.entrySet()){
+ values.put(param.getKey(), param.getValue().evaluateOverContext());
+ }
+
+ // Add all streams
+ for(Entry<String,TupleStream> param : streamParams.entrySet()){
+
+ try{
+ List<Tuple> streamTuples = new ArrayList();
+ // open the stream, closed in finally block
+ param.getValue().open();
+
+ // read all values from stream (memory expensive)
+ Tuple streamTuple = param.getValue().read();
+ while(!streamTuple.EOF){
+ streamTuples.add(streamTuple);
+ streamTuple = param.getValue().read();
+ }
+
+ values.put(param.getKey(), streamTuples);
+ }
+ finally{
+ // safely close the stream
+ param.getValue().close();
+ }
+ }
+
+ this.tup = new Tuple(values);
+ tup.fieldNames = fieldNames;
+ tup.fieldLabels = fieldLabels;
// nothing to do here
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fcaea07f/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
index 07b0938..e06b973 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
@@ -73,7 +73,7 @@ public class TestLang extends LuceneTestCase {
"outliers", "stream", "getCache", "putCache", "listCache", "removeCache", "zscores", "latlonVectors",
"convexHull", "getVertices", "getBaryCenter", "getArea", "getBoundarySize","oscillate",
"getAmplitude", "getPhase", "getAngularFrequency", "enclosingDisk", "getCenter", "getRadius",
- "getSupportPoints", "pairSort", "log10"};
+ "getSupportPoints", "pairSort", "log10", "plist"};
@Test
public void testLang() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fcaea07f/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
index 78fc2ce..2bff1ab 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
@@ -235,7 +235,7 @@ public class MathExpressionTest extends SolrCloudTestCase {
@Test
public void testMemsetSize() throws Exception {
String expr = "let(echo=\"b, c\"," +
- " a=memset(list(tuple(field1=val(1), field2=val(10)), tuple(field1=val(2), field2=val(20))), " +
+ " a=memset(plist(tuple(field1=val(1), field2=val(10)), tuple(field1=val(2), field2=val(20))), " +
" cols=\"field1, field2\", " +
" vars=\"f1, f2\"," +
" size=1)," +
@@ -1974,7 +1974,7 @@ public class MathExpressionTest extends SolrCloudTestCase {
//Test exclude. This should drop off the term jim
cexpr = "let(echo=true," +
- " a=select(list(tuple(id=\"1\", text=\"hello world\"), " +
+ " a=select(plist(tuple(id=\"1\", text=\"hello world\"), " +
" tuple(id=\"2\", text=\"hello steve\"), " +
" tuple(id=\"3\", text=\"hello jim jim\"), " +
" tuple(id=\"4\", text=\"hello jack\")), id, analyze(text, test_t) as terms)," +
@@ -2046,7 +2046,7 @@ public class MathExpressionTest extends SolrCloudTestCase {
//Test minDocFreq attribute at .5. This should eliminate all but the term hello
cexpr = "let(echo=true," +
- "a=select(list(tuple(id=\"1\", text=\"hello world\"), " +
+ "a=select(plist(tuple(id=\"1\", text=\"hello world\"), " +
"tuple(id=\"2\", text=\"hello steve\"), " +
"tuple(id=\"3\", text=\"hello jim jim\"), " +
"tuple(id=\"4\", text=\"hello jack\")), id, analyze(text, test_t) as terms)," +
@@ -2100,7 +2100,7 @@ public class MathExpressionTest extends SolrCloudTestCase {
//Test maxDocFreq attribute at 0. This should eliminate all terms
cexpr = "let(echo=true," +
- "a=select(list(tuple(id=\"1\", text=\"hello world\"), " +
+ "a=select(plist(tuple(id=\"1\", text=\"hello world\"), " +
"tuple(id=\"2\", text=\"hello steve\"), " +
"tuple(id=\"3\", text=\"hello jim jim\"), " +
"tuple(id=\"4\", text=\"hello jack\")), id, analyze(text, test_t) as terms)," +