You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2016/10/27 01:24:53 UTC

[1/3] lucene-solr:branch_6x: SOLR-9533: Reload core config when a core is reloaded

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_6x bbc318947 -> 0eb62c01b


SOLR-9533: Reload core config when a core is reloaded


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/9758603b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/9758603b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/9758603b

Branch: refs/heads/branch_6x
Commit: 9758603b81dfc3cf880bc8d40d5692177b61ae17
Parents: bbc3189
Author: Joel Bernstein <jb...@apache.org>
Authored: Wed Oct 26 17:38:13 2016 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Wed Oct 26 20:46:00 2016 -0400

----------------------------------------------------------------------
 .../src/java/org/apache/solr/core/SolrCore.java |  4 +-
 .../solr/core/TestCorePropertiesReload.java     | 71 ++++++++++++++++++++
 2 files changed, 74 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9758603b/solr/core/src/java/org/apache/solr/core/SolrCore.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 9c06acc..7130c23 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -585,9 +585,11 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
     boolean success = false;
     SolrCore core = null;
     try {
+      CoreDescriptor cd = new CoreDescriptor(coreDescriptor.getName(), coreDescriptor);
+      cd.loadExtraProperties(); //Reload the extra properties
       core = new SolrCore(getName(), getDataDir(), coreConfig.getSolrConfig(),
           coreConfig.getIndexSchema(), coreConfig.getProperties(),
-          coreDescriptor, updateHandler, solrDelPolicy, currentCore);
+          cd, updateHandler, solrDelPolicy, currentCore);
       
       // we open a new IndexWriter to pick up the latest config
       core.getUpdateHandler().getSolrCoreState().newIndexWriter(core, false);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9758603b/solr/core/src/test/org/apache/solr/core/TestCorePropertiesReload.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/TestCorePropertiesReload.java b/solr/core/src/test/org/apache/solr/core/TestCorePropertiesReload.java
new file mode 100644
index 0000000..bb7aaa0
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/core/TestCorePropertiesReload.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.core;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.util.Properties;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.solr.SolrTestCaseJ4;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestCorePropertiesReload extends SolrTestCaseJ4 {
+
+  private final File solrHomeDirectory = createTempDir().toFile();
+
+  public void setMeUp() throws Exception {
+    FileUtils.copyDirectory(new File(TEST_HOME()), solrHomeDirectory);
+    Properties props = new Properties();
+    props.setProperty("test", "Before reload");
+    writeProperties(props);
+    initCore("solrconfig.xml", "schema.xml", solrHomeDirectory.getAbsolutePath());
+  }
+
+  @Test
+  public void testPropertiesReload() throws Exception {
+    setMeUp();
+    SolrCore core = h.getCore();
+    CoreDescriptor coreDescriptor = core.getCoreDescriptor();
+    String testProp = coreDescriptor.getCoreProperty("test", null);
+    assertTrue(testProp.equals("Before reload"));
+
+    //Re-write the properties file
+    Properties props = new Properties();
+    props.setProperty("test", "After reload");
+    writeProperties(props);
+
+    h.reload();
+    core = h.getCore();
+    coreDescriptor = core.getCoreDescriptor();
+
+    testProp = coreDescriptor.getCoreProperty("test", null);
+    assertTrue(testProp.equals("After reload"));
+  }
+
+  private void writeProperties(Properties props) throws Exception {
+    FileWriter out = null;
+    try {
+      File confDir = new File(new File(solrHomeDirectory, "collection1"), "conf");
+      out = new FileWriter(new File(confDir, "solrcore.properties"));
+      props.store(out, "Reload Test");
+    } finally {
+      out.close();
+    }
+  }
+}


[3/3] lucene-solr:branch_6x: SOLR-9533: Fix precommit

Posted by jb...@apache.org.
SOLR-9533: Fix precommit


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/0eb62c01
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/0eb62c01
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/0eb62c01

Branch: refs/heads/branch_6x
Commit: 0eb62c01b29e13b360b97aa28a991e4b5e694aa7
Parents: 67938c2
Author: Joel Bernstein <jb...@apache.org>
Authored: Wed Oct 26 20:08:28 2016 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Wed Oct 26 20:48:38 2016 -0400

----------------------------------------------------------------------
 .../org/apache/solr/core/TestCorePropertiesReload.java   | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0eb62c01/solr/core/src/test/org/apache/solr/core/TestCorePropertiesReload.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/TestCorePropertiesReload.java b/solr/core/src/test/org/apache/solr/core/TestCorePropertiesReload.java
index bb7aaa0..6e1f768 100644
--- a/solr/core/src/test/org/apache/solr/core/TestCorePropertiesReload.java
+++ b/solr/core/src/test/org/apache/solr/core/TestCorePropertiesReload.java
@@ -17,12 +17,14 @@
 package org.apache.solr.core;
 
 import java.io.File;
-import java.io.FileWriter;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.BufferedWriter;
+import java.io.Writer;
 import java.util.Properties;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.solr.SolrTestCaseJ4;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestCorePropertiesReload extends SolrTestCaseJ4 {
@@ -59,11 +61,12 @@ public class TestCorePropertiesReload extends SolrTestCaseJ4 {
   }
 
   private void writeProperties(Properties props) throws Exception {
-    FileWriter out = null;
+    Writer out = null;
     try {
       File confDir = new File(new File(solrHomeDirectory, "collection1"), "conf");
-      out = new FileWriter(new File(confDir, "solrcore.properties"));
+      out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File(confDir, "solrcore.properties")), "UTF8"));
       props.store(out, "Reload Test");
+
     } finally {
       out.close();
     }


[2/3] lucene-solr:branch_6x: SOLR-9559: Add ExecutorStream to execute stored Streaming Expressions

Posted by jb...@apache.org.
SOLR-9559: Add ExecutorStream to execute stored Streaming Expressions


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/67938c2b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/67938c2b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/67938c2b

Branch: refs/heads/branch_6x
Commit: 67938c2bab9011e1049763368897645a1bf9209f
Parents: 9758603
Author: Joel Bernstein <jb...@apache.org>
Authored: Wed Oct 26 17:39:59 2016 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Wed Oct 26 20:48:06 2016 -0400

----------------------------------------------------------------------
 .../org/apache/solr/handler/StreamHandler.java  |  32 +--
 .../client/solrj/io/stream/ExecutorStream.java  | 224 +++++++++++++++++++
 .../solrj/io/stream/StreamExpressionTest.java   | 152 ++++++++++++-
 3 files changed, 372 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/67938c2b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index 0b42192..6e00efc 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -35,36 +35,7 @@ import org.apache.solr.client.solrj.io.ops.ConcatOperation;
 import org.apache.solr.client.solrj.io.ops.DistinctOperation;
 import org.apache.solr.client.solrj.io.ops.GroupOperation;
 import org.apache.solr.client.solrj.io.ops.ReplaceOperation;
-import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
-import org.apache.solr.client.solrj.io.stream.CommitStream;
-import org.apache.solr.client.solrj.io.stream.ComplementStream;
-import org.apache.solr.client.solrj.io.stream.DaemonStream;
-import org.apache.solr.client.solrj.io.stream.ExceptionStream;
-import org.apache.solr.client.solrj.io.stream.FacetStream;
-import org.apache.solr.client.solrj.io.stream.FeaturesSelectionStream;
-import org.apache.solr.client.solrj.io.stream.FetchStream;
-import org.apache.solr.client.solrj.io.stream.HashJoinStream;
-import org.apache.solr.client.solrj.io.stream.InnerJoinStream;
-import org.apache.solr.client.solrj.io.stream.IntersectStream;
-import org.apache.solr.client.solrj.io.stream.JDBCStream;
-import org.apache.solr.client.solrj.io.stream.LeftOuterJoinStream;
-import org.apache.solr.client.solrj.io.stream.MergeStream;
-import org.apache.solr.client.solrj.io.stream.ModelStream;
-import org.apache.solr.client.solrj.io.stream.OuterHashJoinStream;
-import org.apache.solr.client.solrj.io.stream.ParallelStream;
-import org.apache.solr.client.solrj.io.stream.RankStream;
-import org.apache.solr.client.solrj.io.stream.ReducerStream;
-import org.apache.solr.client.solrj.io.stream.RollupStream;
-import org.apache.solr.client.solrj.io.stream.ScoreNodesStream;
-import org.apache.solr.client.solrj.io.stream.SelectStream;
-import org.apache.solr.client.solrj.io.stream.SortStream;
-import org.apache.solr.client.solrj.io.stream.StatsStream;
-import org.apache.solr.client.solrj.io.stream.StreamContext;
-import org.apache.solr.client.solrj.io.stream.TextLogitStream;
-import org.apache.solr.client.solrj.io.stream.TopicStream;
-import org.apache.solr.client.solrj.io.stream.TupleStream;
-import org.apache.solr.client.solrj.io.stream.UniqueStream;
-import org.apache.solr.client.solrj.io.stream.UpdateStream;
+import org.apache.solr.client.solrj.io.stream.*;
 import org.apache.solr.client.solrj.io.stream.expr.Explanation;
 import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
 import org.apache.solr.client.solrj.io.stream.expr.Expressible;
@@ -168,6 +139,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
          .withFunctionName("model", ModelStream.class)
          .withFunctionName("classify", ClassifyStream.class)
              .withFunctionName("fetch", FetchStream.class)
+      .withFunctionName("executor", ExecutorStream.class)
 
       // metrics
       .withFunctionName("min", MinMetric.class)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/67938c2b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExecutorStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExecutorStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExecutorStream.java
new file mode 100644
index 0000000..6765f72
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExecutorStream.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  The executor function wraps a stream with Tuples containing Streaming Expressions
+ *  and executes them in parallel. Sample syntax:
+ *
+ *  executor(thread=10, topic(storedExpressions, q="*:*", fl="expr_s, id", id="topic1"))
+ *
+ *  The Streaming Expression to execute is taken from the expr field in the Tuples.
+ */
+
+public class ExecutorStream extends TupleStream implements Expressible {
+
+  private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private TupleStream stream;
+
+  private int threads;
+
+  private ExecutorService executorService;
+  private StreamFactory streamFactory;
+  private StreamContext streamContext;
+
+  public ExecutorStream(StreamExpression expression, StreamFactory factory) throws IOException {
+    // grab all parameters out
+    List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
+    StreamExpressionNamedParameter threadsParam = factory.getNamedOperand(expression, "threads");
+
+    int threads = 6;
+
+    if(threadsParam != null)  {
+      threads = Integer.parseInt(((StreamExpressionValue)threadsParam.getParameter()).getValue());
+    }
+
+    if(1 != streamExpressions.size()){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
+    }
+
+    TupleStream stream = factory.constructStream(streamExpressions.get(0));
+    init(stream, threads, factory);
+  }
+
+  private void init(TupleStream tupleStream, int threads, StreamFactory factory) throws IOException{
+    this.threads = threads;
+    this.stream = tupleStream;
+    this.streamFactory = factory;
+  }
+
+  @Override
+  public StreamExpression toExpression(StreamFactory factory) throws IOException {
+    return toExpression(factory, true);
+  }
+
+  private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
+
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    expression.addParameter(new StreamExpressionNamedParameter("threads", Integer.toString(threads)));
+
+    // stream
+    if(includeStreams) {
+      if (stream instanceof Expressible) {
+        expression.addParameter(((Expressible) stream).toExpression(factory));
+      } else {
+        throw new IOException("The ExecuteStream contains a non-expressible TupleStream - it cannot be converted to an expression");
+      }
+    }
+
+    return expression;
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+    return new StreamExplanation(getStreamNodeId().toString())
+        .withChildren(new Explanation[]{
+            stream.toExplanation(factory)
+        })
+        .withFunctionName(factory.getFunctionName(this.getClass()))
+        .withImplementingClass(this.getClass().getName())
+        .withExpressionType(ExpressionType.STREAM_DECORATOR)
+        .withExpression(toExpression(factory, false).toString());
+  }
+
+  public void setStreamContext(StreamContext streamContext) {
+    this.streamContext = streamContext;
+    this.stream.setStreamContext(streamContext);
+  }
+
+  public List<TupleStream> children() {
+    List<TupleStream> l =  new ArrayList();
+    l.add(stream);
+    return l;
+  }
+
+  public void open() throws IOException {
+    executorService = ExecutorUtil.newMDCAwareFixedThreadPool(threads, new SolrjNamedThreadFactory("ExecutorStream"));
+    stream.open();
+  }
+
+  public void close() throws IOException {
+    stream.close();
+    executorService.shutdown();
+    try {
+      executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    } catch(InterruptedException e) {
+      logger.error("Interrupted while waiting for termination", e);
+    }
+  }
+
+  public Tuple read() throws IOException {
+    ArrayBlockingQueue<Tuple> queue = new ArrayBlockingQueue(10000);
+    while(true) {
+      Tuple tuple = stream.read();
+      if (!tuple.EOF) {
+        try {
+          queue.put(tuple);
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        }
+        executorService.execute(new StreamTask(queue, streamFactory, streamContext));
+      } else {
+        return tuple;
+      }
+    }
+  }
+
+  public StreamComparator getStreamSort(){
+    return stream.getStreamSort();
+  }
+
+  public int getCost() {
+    return 0;
+  }
+
+  public static class StreamTask implements Runnable {
+
+    private ArrayBlockingQueue<Tuple> queue;
+    private StreamFactory streamFactory;
+    private StreamContext streamContext;
+
+    public StreamTask(ArrayBlockingQueue queue, StreamFactory streamFactory, StreamContext streamContext) {
+      this.queue = queue;
+      this.streamFactory = streamFactory;
+      this.streamContext = new StreamContext();
+      this.streamContext.setSolrClientCache(streamContext.getSolrClientCache());
+      this.streamContext.setModelCache(streamContext.getModelCache());
+    }
+
+    public void run() {
+      Tuple tuple = null;
+      try {
+        tuple = queue.take();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+
+      String expr = tuple.getString("expr_s");
+      Object id = tuple.get("id");
+      TupleStream stream = null;
+
+      try {
+        stream = streamFactory.constructStream(expr);
+        stream.setStreamContext(streamContext);
+        stream.open();
+        while (true) {
+          Tuple t = stream.read();
+          if (t.EOF) {
+            break;
+          }
+        }
+      } catch (Exception e) {
+        logger.error("Executor Error: id="+id+" expr_s="+expr, e);
+      } finally {
+        try {
+          stream.close();
+        } catch (Exception e1) {
+          logger.error("Executor Error", e1);
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/67938c2b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 7b5777d..106368e 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -256,7 +256,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     tuples = getTuples(stream);
 
     assertEquals(4, tuples.size());
-    assertOrder(tuples, 4,3,1,0);
+    assertOrder(tuples, 4, 3, 1, 0);
 
     // Basic w/ multi comp
     sParams.set("q2", "search(" + COLLECTION + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
@@ -522,14 +522,14 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     
     // Basic test
     expression = StreamExpressionParser.parse("top("
-                                              + "n=3,"
-                                              + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
-                                              + "sort=\"a_f asc, a_i asc\")");
+        + "n=3,"
+        + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
+        + "sort=\"a_f asc, a_i asc\")");
     stream = new RankStream(expression, factory);
     tuples = getTuples(stream);
     
     assert(tuples.size() == 3);
-    assertOrder(tuples, 0,2,1);
+    assertOrder(tuples, 0, 2, 1);
 
     // Basic test desc
     expression = StreamExpressionParser.parse("top("
@@ -3794,7 +3794,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
 
     paramsLoc = new ModifiableSolrParams();
     paramsLoc.set("expr", expr);
-    paramsLoc.set("qt","/stream");
+    paramsLoc.set("qt", "/stream");
     SolrStream classifyStream = new SolrStream(url, paramsLoc);
     Map<String, Double> idToLabel = getIdToLabel(classifyStream, "probability_d");
     assertEquals(idToLabel.size(), 2);
@@ -3866,6 +3866,146 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     CollectionAdminRequest.deleteCollection("checkpointCollection").process(cluster.getSolrClient());
   }
 
+  @Test
+  public void testExecutorStream() throws Exception {
+    CollectionAdminRequest.createCollection("workQueue", "conf", 2, 1).process(cluster.getSolrClient());
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish("workQueue", cluster.getSolrClient().getZkStateReader(),
+        false, true, TIMEOUT);
+    CollectionAdminRequest.createCollection("mainCorpus", "conf", 2, 1).process(cluster.getSolrClient());
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish("mainCorpus", cluster.getSolrClient().getZkStateReader(),
+        false, true, TIMEOUT);
+    CollectionAdminRequest.createCollection("destination", "conf", 2, 1).process(cluster.getSolrClient());
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish("destination", cluster.getSolrClient().getZkStateReader(),
+        false, true, TIMEOUT);
+
+    UpdateRequest workRequest = new UpdateRequest();
+    UpdateRequest dataRequest = new UpdateRequest();
+
+
+    for (int i = 0; i < 500; i++) {
+      workRequest.add(id, String.valueOf(i), "expr_s", "update(destination, batchSize=50, search(mainCorpus, q=id:"+i+", rows=1, sort=\"id asc\", fl=\"id, body_t, field_i\"))");
+      dataRequest.add(id, String.valueOf(i), "body_t", "hello world "+i, "field_i", Integer.toString(i));
+    }
+
+    workRequest.commit(cluster.getSolrClient(), "workQueue");
+    dataRequest.commit(cluster.getSolrClient(), "mainCorpus");
+
+    String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/destination";
+    TupleStream executorStream;
+    ModifiableSolrParams paramsLoc;
+
+    StreamFactory factory = new StreamFactory()
+        .withCollectionZkHost("workQueue", cluster.getZkServer().getZkAddress())
+        .withCollectionZkHost("mainCorpus", cluster.getZkServer().getZkAddress())
+        .withCollectionZkHost("destination", cluster.getZkServer().getZkAddress())
+        .withFunctionName("search", CloudSolrStream.class)
+        .withFunctionName("executor", ExecutorStream.class)
+        .withFunctionName("update", UpdateStream.class);
+
+    String executorExpression = "executor(threads=3, search(workQueue, q=\"*:*\", fl=\"id, expr_s\", rows=1000, sort=\"id desc\"))";
+    executorStream = factory.constructStream(executorExpression);
+
+    StreamContext context = new StreamContext();
+    SolrClientCache clientCache = new SolrClientCache();
+    context.setSolrClientCache(clientCache);
+    executorStream.setStreamContext(context);
+    getTuples(executorStream);
+    //Destination collection should now contain all the records in the main corpus.
+    cluster.getSolrClient().commit("destination");
+    paramsLoc = new ModifiableSolrParams();
+    paramsLoc.set("expr", "search(destination, q=\"*:*\", fl=\"id, body_t, field_i\", rows=1000, sort=\"field_i asc\")");
+    paramsLoc.set("qt","/stream");
+
+    SolrStream solrStream = new SolrStream(url, paramsLoc);
+    List<Tuple> tuples = getTuples(solrStream);
+    assertTrue(tuples.size() == 500);
+    for(int i=0; i<500; i++) {
+      Tuple tuple = tuples.get(i);
+      long ivalue = tuple.getLong("field_i");
+      String body = tuple.getString("body_t");
+      assertTrue(ivalue == i);
+      assertTrue(body.equals("hello world "+i));
+    }
+
+    solrStream.close();
+    clientCache.close();
+    CollectionAdminRequest.deleteCollection("workQueue").process(cluster.getSolrClient());
+    CollectionAdminRequest.deleteCollection("mainCorpus").process(cluster.getSolrClient());
+    CollectionAdminRequest.deleteCollection("destination").process(cluster.getSolrClient());
+  }
+
+
+  @Test
+  public void testParallelExecutorStream() throws Exception {
+    CollectionAdminRequest.createCollection("workQueue", "conf", 2, 1).process(cluster.getSolrClient());
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish("workQueue", cluster.getSolrClient().getZkStateReader(),
+        false, true, TIMEOUT);
+    CollectionAdminRequest.createCollection("mainCorpus", "conf", 2, 1).process(cluster.getSolrClient());
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish("mainCorpus", cluster.getSolrClient().getZkStateReader(),
+        false, true, TIMEOUT);
+    CollectionAdminRequest.createCollection("destination", "conf", 2, 1).process(cluster.getSolrClient());
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish("destination", cluster.getSolrClient().getZkStateReader(),
+        false, true, TIMEOUT);
+
+    UpdateRequest workRequest = new UpdateRequest();
+    UpdateRequest dataRequest = new UpdateRequest();
+
+
+    for (int i = 0; i < 500; i++) {
+      workRequest.add(id, String.valueOf(i), "expr_s", "update(destination, batchSize=50, search(mainCorpus, q=id:"+i+", rows=1, sort=\"id asc\", fl=\"id, body_t, field_i\"))");
+      dataRequest.add(id, String.valueOf(i), "body_t", "hello world "+i, "field_i", Integer.toString(i));
+    }
+
+    workRequest.commit(cluster.getSolrClient(), "workQueue");
+    dataRequest.commit(cluster.getSolrClient(), "mainCorpus");
+
+    String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/destination";
+    TupleStream executorStream;
+    ModifiableSolrParams paramsLoc;
+
+    StreamFactory factory = new StreamFactory()
+        .withCollectionZkHost("workQueue", cluster.getZkServer().getZkAddress())
+        .withCollectionZkHost("mainCorpus", cluster.getZkServer().getZkAddress())
+        .withCollectionZkHost("destination", cluster.getZkServer().getZkAddress())
+        .withFunctionName("search", CloudSolrStream.class)
+        .withFunctionName("executor", ExecutorStream.class)
+        .withFunctionName("parallel", ParallelStream.class)
+        .withFunctionName("update", UpdateStream.class);
+
+    String executorExpression = "parallel(workQueue, workers=2, sort=\"EOF asc\", executor(threads=3, queueSize=100, search(workQueue, q=\"*:*\", fl=\"id, expr_s\", rows=1000, partitionKeys=id, sort=\"id desc\")))";
+    executorStream = factory.constructStream(executorExpression);
+
+    StreamContext context = new StreamContext();
+    SolrClientCache clientCache = new SolrClientCache();
+    context.setSolrClientCache(clientCache);
+    executorStream.setStreamContext(context);
+    getTuples(executorStream);
+    //Destination collection should now contain all the records in the main corpus.
+    cluster.getSolrClient().commit("destination");
+    paramsLoc = new ModifiableSolrParams();
+    paramsLoc.set("expr", "search(destination, q=\"*:*\", fl=\"id, body_t, field_i\", rows=1000, sort=\"field_i asc\")");
+    paramsLoc.set("qt","/stream");
+
+    SolrStream solrStream = new SolrStream(url, paramsLoc);
+    List<Tuple> tuples = getTuples(solrStream);
+    assertTrue(tuples.size() == 500);
+    for(int i=0; i<500; i++) {
+      Tuple tuple = tuples.get(i);
+      long ivalue = tuple.getLong("field_i");
+      String body = tuple.getString("body_t");
+      assertTrue(ivalue == i);
+      assertTrue(body.equals("hello world " + i));
+    }
+
+    solrStream.close();
+    clientCache.close();
+    CollectionAdminRequest.deleteCollection("workQueue").process(cluster.getSolrClient());
+    CollectionAdminRequest.deleteCollection("mainCorpus").process(cluster.getSolrClient());
+    CollectionAdminRequest.deleteCollection("destination").process(cluster.getSolrClient());
+  }
+
+
+
   private Map<String,Double> getIdToLabel(TupleStream stream, String outField) throws IOException {
     Map<String, Double> idToLabel = new HashMap<>();
     List<Tuple> tuples = getTuples(stream);