You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dp...@apache.org on 2016/10/14 15:43:29 UTC

lucene-solr:branch_6x: SOLR-8487: Adds CommitStream to support sending commits to a collection being updated

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_6x 5e6a8e7c7 -> edde43359


SOLR-8487: Adds CommitStream to support sending commits to a collection being updated


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/edde4335
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/edde4335
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/edde4335

Branch: refs/heads/branch_6x
Commit: edde433594c104668137350d9db640180b04f648
Parents: 5e6a8e7
Author: Dennis Gove <dp...@gmail.com>
Authored: Fri Oct 14 11:43:13 2016 -0400
Committer: Dennis Gove <dp...@gmail.com>
Committed: Fri Oct 14 11:43:13 2016 -0400

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   2 +
 .../org/apache/solr/handler/StreamHandler.java  |   2 +
 .../client/solrj/io/stream/CommitStream.java    | 260 ++++++++++++++
 .../client/solrj/io/stream/UpdateStream.java    |   3 +-
 .../solrj/io/stream/expr/StreamFactory.java     |  35 ++
 .../solrj/io/stream/StreamExpressionTest.java   | 340 +++++++++++++++++++
 6 files changed, 641 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/edde4335/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 90205a6..d500d15 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -88,6 +88,8 @@ New Features
 
 * SOLR-9103: Restore ability for users to add custom Streaming Expressions (Cao Manh Dat)
 
+* SOLR-8487: Adds CommitStream to support sending commits to a collection being updated (Dennis Gove)
+
 Bug Fixes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/edde4335/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index 9fc8ade..ee28598 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -35,6 +35,7 @@ import org.apache.solr.client.solrj.io.ops.DistinctOperation;
 import org.apache.solr.client.solrj.io.ops.GroupOperation;
 import org.apache.solr.client.solrj.io.ops.ReplaceOperation;
 import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
+import org.apache.solr.client.solrj.io.stream.CommitStream;
 import org.apache.solr.client.solrj.io.stream.ComplementStream;
 import org.apache.solr.client.solrj.io.stream.DaemonStream;
 import org.apache.solr.client.solrj.io.stream.ExceptionStream;
@@ -137,6 +138,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
       .withFunctionName("update", UpdateStream.class)
       .withFunctionName("jdbc", JDBCStream.class)
       .withFunctionName("topic", TopicStream.class)
+      .withFunctionName("commit", CommitStream.class)
       
       // decorator streams
       .withFunctionName("merge", MergeStream.class)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/edde4335/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java
new file mode 100644
index 0000000..c075978
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Sends a commit message to a SolrCloud collection
+ */
+public class CommitStream extends TupleStream implements Expressible {
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  // Part of expression / passed in
+  private String collection;
+  private String zkHost;
+  private boolean waitFlush;
+  private boolean waitSearcher;
+  private boolean softCommit;
+  private int commitBatchSize;
+  private TupleStream tupleSource;
+  
+  private transient SolrClientCache clientCache;
+  private long docsSinceCommit;
+
+  public CommitStream(StreamExpression expression, StreamFactory factory) throws IOException {
+    
+    String collectionName = factory.getValueOperand(expression, 0);
+    String zkHost = findZkHost(factory, collectionName, expression);
+    int batchSize = factory.getIntOperand(expression, "batchSize", 0);
+    boolean waitFlush = factory.getBooleanOperand(expression, "waitFlush", false);
+    boolean waitSearcher = factory.getBooleanOperand(expression, "waitSearcher", false);
+    boolean softCommit = factory.getBooleanOperand(expression, "softCommit", false);
+
+    if(null == collectionName){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
+    }
+    if(null == zkHost){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
+    }
+    if(batchSize < 0){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - batchSize cannot be less than 0 but is '%d'",expression,batchSize));
+    }
+
+    //Extract underlying TupleStream.
+    List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
+    if (1 != streamExpressions.size()) {
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
+    }
+    StreamExpression sourceStreamExpression = streamExpressions.get(0);
+    
+    init(collectionName, factory.constructStream(sourceStreamExpression), zkHost, batchSize, waitFlush, waitSearcher, softCommit);
+  }
+  
+  public CommitStream(String collectionName, TupleStream tupleSource, String zkHost, int batchSize, boolean waitFlush, boolean waitSearcher, boolean softCommit) throws IOException {
+    if (batchSize < 0) {
+      throw new IOException(String.format(Locale.ROOT,"batchSize '%d' cannot be less than 0.", batchSize));
+    }
+    init(collectionName, tupleSource, zkHost, batchSize, waitFlush, waitSearcher, softCommit);
+  }
+  
+  private void init(String collectionName, TupleStream tupleSource, String zkHost, int batchSize, boolean waitFlush, boolean waitSearcher, boolean softCommit) {
+    this.collection = collectionName;
+    this.zkHost = zkHost;
+    this.commitBatchSize = batchSize;
+    this.waitFlush = waitFlush;
+    this.waitSearcher = waitSearcher;
+    this.softCommit = softCommit;
+    this.tupleSource = tupleSource;
+  }
+  
+  @Override
+  public void open() throws IOException {
+    tupleSource.open();
+    clientCache = new SolrClientCache();
+    docsSinceCommit = 0;
+  }
+  
+  @Override
+  public Tuple read() throws IOException {
+    
+    Tuple tuple = tupleSource.read();
+    if(tuple.EOF){
+      if(docsSinceCommit > 0){
+        sendCommit();
+      }
+    }
+    else{
+      // if the read document contains field 'batchIndexed' then it's a summary
+      // document and we can update our count based on it's value. If not then 
+      // just increment by 1
+      if(tuple.fields.containsKey(UpdateStream.BATCH_INDEXED_FIELD_NAME) && isInteger(tuple.getString(UpdateStream.BATCH_INDEXED_FIELD_NAME))){
+        docsSinceCommit += Integer.parseInt(tuple.getString(UpdateStream.BATCH_INDEXED_FIELD_NAME));
+      }
+      else{
+        docsSinceCommit += 1;
+      }
+      
+      if(commitBatchSize > 0 && docsSinceCommit >= commitBatchSize){
+        // if commitBatchSize == 0 then the tuple.EOF above will end up calling sendCommit()
+        sendCommit();
+      }
+    }
+    
+    return tuple;
+  }
+  
+  private boolean isInteger(String string){
+    try{
+      Integer.parseInt(string);
+      return true;
+    }
+    catch(NumberFormatException e){
+      return false;
+    }
+  }
+  
+  @Override
+  public void close() throws IOException {
+    clientCache.close();
+    tupleSource.close();
+  }
+  
+  @Override
+  public StreamComparator getStreamSort() {
+    return tupleSource.getStreamSort();
+  }
+  
+  @Override
+  public List<TupleStream> children() {
+    ArrayList<TupleStream> sourceList = new ArrayList<TupleStream>(1);
+    sourceList.add(tupleSource);
+    return sourceList;
+  }
+  
+  @Override
+  public StreamExpression toExpression(StreamFactory factory) throws IOException{
+    return toExpression(factory, true);
+  }
+  
+  private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    expression.addParameter(collection);
+    expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
+    expression.addParameter(new StreamExpressionNamedParameter("batchSize", Integer.toString(commitBatchSize)));
+    expression.addParameter(new StreamExpressionNamedParameter("waitFlush", Boolean.toString(waitFlush)));
+    expression.addParameter(new StreamExpressionNamedParameter("waitSearcher", Boolean.toString(waitSearcher)));
+    expression.addParameter(new StreamExpressionNamedParameter("softCommit", Boolean.toString(softCommit)));
+        
+    if(includeStreams){
+      if(tupleSource instanceof Expressible){
+        expression.addParameter(((Expressible)tupleSource).toExpression(factory));
+      } else {
+        throw new IOException("This CommitStream contains a non-expressible TupleStream - it cannot be converted to an expression");
+      }
+    }
+    else{
+      expression.addParameter("<stream>");
+    }
+    
+    return expression;
+  }
+  
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+    // A commit stream is backward wrt the order in the explanation. This stream is the "child"
+    // while the collection we're committing to is the parent.
+    
+    StreamExplanation explanation = new StreamExplanation(getStreamNodeId() + "-datastore");
+    
+    explanation.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection));
+    explanation.setImplementingClass("Solr/Lucene");
+    explanation.setExpressionType(ExpressionType.DATASTORE);
+    explanation.setExpression("Commit into " + collection);
+    
+    // child is a stream so add it at this point
+    StreamExplanation child = new StreamExplanation(getStreamNodeId().toString());
+    child.setFunctionName(String.format(Locale.ROOT, factory.getFunctionName(getClass())));
+    child.setImplementingClass(getClass().getName());
+    child.setExpressionType(ExpressionType.STREAM_DECORATOR);
+    child.setExpression(toExpression(factory, false).toString());
+    child.addChild(tupleSource.toExplanation(factory));
+    
+    explanation.addChild(child);
+    
+    return explanation;
+  }
+  
+  @Override
+  public void setStreamContext(StreamContext context) {
+    if(null != context.getSolrClientCache()){
+      this.clientCache = context.getSolrClientCache();
+        // this overrides the one created in open
+    }
+    
+    this.tupleSource.setStreamContext(context);
+  }
+  
+  private String findZkHost(StreamFactory factory, String collectionName, StreamExpression expression) {
+    StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
+    if(null == zkHostExpression){
+      String zkHost = factory.getCollectionZkHost(collectionName);
+      if(zkHost == null) {
+        return factory.getDefaultZkHost();
+      } else {
+        return zkHost;
+      }
+    } else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
+      return ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
+    }
+    
+    return null;
+  }
+  
+  private void sendCommit() throws IOException {
+    
+    try {
+      clientCache.getCloudSolrClient(zkHost).commit(collection, waitFlush, waitSearcher, softCommit);
+    } catch (SolrServerException | IOException e) {
+      LOG.warn(String.format(Locale.ROOT, "Unable to commit documents to collection '%s' due to unexpected error.", collection), e);
+      String className = e.getClass().getName();
+      String message = e.getMessage();
+      throw new IOException(String.format(Locale.ROOT,"Unexpected error when committing documents to collection %s- %s:%s", collection, className, message));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/edde4335/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java
index 5b1aae7..55291bf 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
 public class UpdateStream extends TupleStream implements Expressible {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  public static String BATCH_INDEXED_FIELD_NAME = "batchIndexed"; // field name in summary tuple for #docs updated in batch
   private String collection;
   private String zkHost;
   private int updateBatchSize;
@@ -307,7 +308,7 @@ public class UpdateStream extends TupleStream implements Expressible {
     Map m = new HashMap();
     this.totalDocsIndex += batchSize;
     ++batchNumber;
-    m.put("batchIndexed", batchSize);
+    m.put(BATCH_INDEXED_FIELD_NAME, batchSize);
     m.put("totalIndexed", this.totalDocsIndex);
     m.put("batchNumber", batchNumber);
     if(coreName != null) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/edde4335/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
index f127d36..d2e72df 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
@@ -174,6 +174,41 @@ public class StreamFactory implements Serializable {
     return matchingStreamExpressions;   
   }
   
+  public int getIntOperand(StreamExpression expression, String paramName, Integer defaultValue) throws IOException{
+    StreamExpressionNamedParameter param = getNamedOperand(expression, paramName);
+    
+    if(null == param || null == param.getParameter() || !(param.getParameter() instanceof StreamExpressionValue)){
+      if(null != defaultValue){
+        return defaultValue;
+      }
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single '%s' parameter of type integer but didn't find one",expression, paramName));
+    }
+    String nStr = ((StreamExpressionValue)param.getParameter()).getValue();
+    try{
+      return Integer.parseInt(nStr);
+    }
+    catch(NumberFormatException e){
+      if(null != defaultValue){
+        return defaultValue;
+      }
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - %s '%s' is not a valid integer.",expression, paramName, nStr));
+    }
+  }
+
+  public boolean getBooleanOperand(StreamExpression expression, String paramName, Boolean defaultValue) throws IOException{
+    StreamExpressionNamedParameter param = getNamedOperand(expression, paramName);
+    
+    if(null == param || null == param.getParameter() || !(param.getParameter() instanceof StreamExpressionValue)){
+      if(null != defaultValue){
+        return defaultValue;
+      }
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single '%s' parameter of type boolean but didn't find one",expression, paramName));
+    }
+    String nStr = ((StreamExpressionValue)param.getParameter()).getValue();
+    return Boolean.parseBoolean(nStr);
+  }
+
+  
   public TupleStream constructStream(String expressionClause) throws IOException {
     return constructStream(StreamExpressionParser.parse(expressionClause));
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/edde4335/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 4a3db77..842f6a6 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -3202,6 +3202,346 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient());
   }
 
+  ////////////////////////////////////////////
+  @Test
+  public void testCommitStream() throws Exception {
+
+    CollectionAdminRequest.createCollection("destinationCollection", "conf", 2, 1).process(cluster.getSolrClient());
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish("destinationCollection", cluster.getSolrClient().getZkStateReader(),
+        false, true, TIMEOUT);
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0", "s_multi", "aaaa",  "s_multi", "bbbb",  "i_multi", "4", "i_multi", "7")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777")
+        .commit(cluster.getSolrClient(), "collection1");
+    
+    StreamExpression expression;
+    TupleStream stream;
+    Tuple t;
+    
+    StreamFactory factory = new StreamFactory()
+      .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
+      .withCollectionZkHost("destinationCollection", cluster.getZkServer().getZkAddress())
+      .withFunctionName("search", CloudSolrStream.class)
+      .withFunctionName("update", UpdateStream.class)
+      .withFunctionName("commit", CommitStream.class);
+    
+    //Copy all docs to destinationCollection
+    expression = StreamExpressionParser.parse("commit(destinationCollection, batchSize=2, update(destinationCollection, batchSize=5, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\")))");
+    stream = factory.constructStream(expression);
+    List<Tuple> tuples = getTuples(stream);
+    
+    //Ensure that all CommitStream tuples indicate the correct number of copied/indexed docs
+    assert(tuples.size() == 1);
+    t = tuples.get(0);
+    assert(t.EOF == false);
+    assertEquals(5, t.get("batchIndexed"));
+    
+    //Ensure that destinationCollection actually has the new docs.
+    expression = StreamExpressionParser.parse("search(destinationCollection, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")");
+    stream = new CloudSolrStream(expression, factory);
+    tuples = getTuples(stream);
+    assertEquals(5, tuples.size());
+
+    Tuple tuple = tuples.get(0);
+    assert(tuple.getLong("id") == 0);
+    assert(tuple.get("a_s").equals("hello0"));
+    assert(tuple.getLong("a_i") == 0);
+    assert(tuple.getDouble("a_f") == 0.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7"));
+
+    tuple = tuples.get(1);
+    assert(tuple.getLong("id") == 1);
+    assert(tuple.get("a_s").equals("hello1"));
+    assert(tuple.getLong("a_i") == 1);
+    assert(tuple.getDouble("a_f") == 1.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777"));
+
+    tuple = tuples.get(2);
+    assert(tuple.getLong("id") == 2);
+    assert(tuple.get("a_s").equals("hello2"));
+    assert(tuple.getLong("a_i") == 2);
+    assert(tuple.getDouble("a_f") == 0.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77"));
+
+    tuple = tuples.get(3);
+    assert(tuple.getLong("id") == 3);
+    assert(tuple.get("a_s").equals("hello3"));
+    assert(tuple.getLong("a_i") == 3);
+    assert(tuple.getDouble("a_f") == 3.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777"));
+
+    tuple = tuples.get(4);
+    assert(tuple.getLong("id") == 4);
+    assert(tuple.get("a_s").equals("hello4"));
+    assert(tuple.getLong("a_i") == 4);
+    assert(tuple.getDouble("a_f") == 4.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777"));
+
+    CollectionAdminRequest.deleteCollection("destinationCollection").process(cluster.getSolrClient());
+  }
+
+  @Test
+  public void testParallelCommitStream() throws Exception {
+
+    CollectionAdminRequest.createCollection("parallelDestinationCollection", "conf", 2, 1).process(cluster.getSolrClient());
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish("parallelDestinationCollection", cluster.getSolrClient().getZkStateReader(),
+        false, true, TIMEOUT);
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0", "s_multi", "aaaa",  "s_multi", "bbbb",  "i_multi", "4", "i_multi", "7")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777")
+        .commit(cluster.getSolrClient(), "collection1");
+    
+    StreamExpression expression;
+    TupleStream stream;
+    Tuple t;
+    
+    String zkHost = cluster.getZkServer().getZkAddress();
+    StreamFactory factory = new StreamFactory()
+      .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
+      .withCollectionZkHost("parallelDestinationCollection", cluster.getZkServer().getZkAddress())
+      .withFunctionName("search", CloudSolrStream.class)
+      .withFunctionName("update", UpdateStream.class)
+      .withFunctionName("commit", CommitStream.class)
+      .withFunctionName("parallel", ParallelStream.class);
+
+    //Copy all docs to destinationCollection
+    String updateExpression = "commit(parallelDestinationCollection, batchSize=0, zkHost=\"" + cluster.getZkServer().getZkAddress() + "\", update(parallelDestinationCollection, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\")))";
+    TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\""+zkHost+"\", sort=\"batchNumber asc\")");
+    List<Tuple> tuples = getTuples(parallelUpdateStream);
+    
+    //Ensure that all UpdateStream tuples indicate the correct number of copied/indexed docs
+    long count = 0;
+
+    for(Tuple tuple : tuples) {
+      count+=tuple.getLong("batchIndexed");
+    }
+
+    assert(count == 5);
+
+    //Ensure that destinationCollection actually has the new docs.
+    expression = StreamExpressionParser.parse("search(parallelDestinationCollection, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")");
+    stream = new CloudSolrStream(expression, factory);
+    tuples = getTuples(stream);
+    assertEquals(5, tuples.size());
+
+    Tuple tuple = tuples.get(0);
+    assert(tuple.getLong("id") == 0);
+    assert(tuple.get("a_s").equals("hello0"));
+    assert(tuple.getLong("a_i") == 0);
+    assert(tuple.getDouble("a_f") == 0.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7"));
+
+    tuple = tuples.get(1);
+    assert(tuple.getLong("id") == 1);
+    assert(tuple.get("a_s").equals("hello1"));
+    assert(tuple.getLong("a_i") == 1);
+    assert(tuple.getDouble("a_f") == 1.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777"));
+
+    tuple = tuples.get(2);
+    assert(tuple.getLong("id") == 2);
+    assert(tuple.get("a_s").equals("hello2"));
+    assert(tuple.getLong("a_i") == 2);
+    assert(tuple.getDouble("a_f") == 0.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77"));
+
+    tuple = tuples.get(3);
+    assert(tuple.getLong("id") == 3);
+    assert(tuple.get("a_s").equals("hello3"));
+    assert(tuple.getLong("a_i") == 3);
+    assert(tuple.getDouble("a_f") == 3.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777"));
+
+    tuple = tuples.get(4);
+    assert(tuple.getLong("id") == 4);
+    assert(tuple.get("a_s").equals("hello4"));
+    assert(tuple.getLong("a_i") == 4);
+    assert(tuple.getDouble("a_f") == 4.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777"));
+
+    CollectionAdminRequest.deleteCollection("parallelDestinationCollection").process(cluster.getSolrClient());
+  }
+
+  @Test
+  public void testParallelDaemonCommitStream() throws Exception {
+
+    CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).process(cluster.getSolrClient());
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish("parallelDestinationCollection1", cluster.getSolrClient().getZkStateReader(),
+        false, true, TIMEOUT);
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0", "s_multi", "aaaa",  "s_multi", "bbbb",  "i_multi", "4", "i_multi", "7")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777")
+        .commit(cluster.getSolrClient(), "collection1");
+
+    StreamExpression expression;
+    TupleStream stream;
+    Tuple t;
+
+    String zkHost = cluster.getZkServer().getZkAddress();
+    StreamFactory factory = new StreamFactory()
+        .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
+        .withCollectionZkHost("parallelDestinationCollection1", cluster.getZkServer().getZkAddress())
+        .withFunctionName("search", CloudSolrStream.class)
+        .withFunctionName("update", UpdateStream.class)
+        .withFunctionName("commit", CommitStream.class)
+        .withFunctionName("parallel", ParallelStream.class)
+        .withFunctionName("daemon", DaemonStream.class);
+
+    //Copy all docs to destinationCollection
+    String updateExpression = "daemon(commit(parallelDestinationCollection1, batchSize=0, zkHost=\"" + cluster.getZkServer().getZkAddress() + "\", update(parallelDestinationCollection1, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"))), runInterval=\"1000\", id=\"test\")";
+    TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\""+zkHost+"\", sort=\"batchNumber asc\")");
+    List<Tuple> tuples = getTuples(parallelUpdateStream);
+    assert(tuples.size() == 2);
+
+    //Lets sleep long enough for daemon updates to run.
+    //Lets stop the daemons
+    ModifiableSolrParams sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream", "action", "list"));
+
+    int workersComplete = 0;
+    for(JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+      int iterations = 0;
+      INNER:
+      while(iterations == 0) {
+        SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams);
+        solrStream.open();
+        Tuple tupleResponse = solrStream.read();
+        if (tupleResponse.EOF) {
+          solrStream.close();
+          break INNER;
+        } else {
+          long l = tupleResponse.getLong("iterations");
+          if(l > 0) {
+            ++workersComplete;
+          } else {
+            try {
+              Thread.sleep(1000);
+            } catch(Exception e) {
+            }
+          }
+          iterations = (int) l;
+          solrStream.close();
+        }
+      }
+    }
+
+    assertEquals(cluster.getJettySolrRunners().size(), workersComplete);
+
+    //Lets stop the daemons
+    sParams = new ModifiableSolrParams();
+    sParams.set(CommonParams.QT, "/stream");
+    sParams.set("action", "stop");
+    sParams.set("id", "test");
+    for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+      SolrStream solrStream = new SolrStream(jetty.getBaseUrl() + "/collection1", sParams);
+      solrStream.open();
+      Tuple tupleResponse = solrStream.read();
+      solrStream.close();
+    }
+
+    sParams = new ModifiableSolrParams();
+    sParams.set(CommonParams.QT, "/stream");
+    sParams.set("action", "list");
+
+    workersComplete = 0;
+    for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+      long stopTime = 0;
+      INNER:
+      while(stopTime == 0) {
+        SolrStream solrStream = new SolrStream(jetty.getBaseUrl() + "/collection1", sParams);
+        solrStream.open();
+        Tuple tupleResponse = solrStream.read();
+        if (tupleResponse.EOF) {
+          solrStream.close();
+          break INNER;
+        } else {
+          stopTime = tupleResponse.getLong("stopTime");
+          if (stopTime > 0) {
+            ++workersComplete;
+          } else {
+            try {
+              Thread.sleep(1000);
+            } catch(Exception e) {
+
+            }
+          }
+          solrStream.close();
+        }
+      }
+    }
+
+    assertEquals(cluster.getJettySolrRunners().size(), workersComplete);
+    //Ensure that destinationCollection actually has the new docs.
+    expression = StreamExpressionParser.parse("search(parallelDestinationCollection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")");
+    stream = new CloudSolrStream(expression, factory);
+    tuples = getTuples(stream);
+    assertEquals(5, tuples.size());
+
+    Tuple tuple = tuples.get(0);
+    assert(tuple.getLong("id") == 0);
+    assert(tuple.get("a_s").equals("hello0"));
+    assert(tuple.getLong("a_i") == 0);
+    assert(tuple.getDouble("a_f") == 0.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7"));
+
+    tuple = tuples.get(1);
+    assert(tuple.getLong("id") == 1);
+    assert(tuple.get("a_s").equals("hello1"));
+    assert(tuple.getLong("a_i") == 1);
+    assert(tuple.getDouble("a_f") == 1.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777"));
+
+    tuple = tuples.get(2);
+    assert(tuple.getLong("id") == 2);
+    assert(tuple.get("a_s").equals("hello2"));
+    assert(tuple.getLong("a_i") == 2);
+    assert(tuple.getDouble("a_f") == 0.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77"));
+
+    tuple = tuples.get(3);
+    assert(tuple.getLong("id") == 3);
+    assert(tuple.get("a_s").equals("hello3"));
+    assert(tuple.getLong("a_i") == 3);
+    assert(tuple.getDouble("a_f") == 3.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777"));
+
+    tuple = tuples.get(4);
+    assert(tuple.getLong("id") == 4);
+    assert(tuple.get("a_s").equals("hello4"));
+    assert(tuple.getLong("a_i") == 4);
+    assert(tuple.getDouble("a_f") == 4.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777"));
+
+    CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient());
+  }
+  ////////////////////////////////////////////  
+  
   @Test
   public void testIntersectStream() throws Exception {