You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by cp...@apache.org on 2016/10/24 18:32:18 UTC

[14/50] [abbrv] lucene-solr:jira/solr-8542-v2: SOLR-9417: Allow daemons to terminate when they finish iterating a topic

SOLR-9417: Allow daemons to terminate when they finish iterating a topic


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/f43742ac
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/f43742ac
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/f43742ac

Branch: refs/heads/jira/solr-8542-v2
Commit: f43742acc5148ea89a9a625818a7229d56e0558e
Parents: d03cc92
Author: Joel Bernstein <jb...@apache.org>
Authored: Wed Oct 19 13:16:01 2016 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Wed Oct 19 13:17:06 2016 -0400

----------------------------------------------------------------------
 .../org/apache/solr/handler/StreamHandler.java  |   4 +-
 .../client/solrj/io/stream/DaemonStream.java    |  44 ++++-
 .../solrj/io/stream/StreamExpressionTest.java   | 193 +++++++++++++++++--
 3 files changed, 213 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f43742ac/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index b9f30bc..3e841bd 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -19,6 +19,7 @@ package org.apache.solr.handler;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -97,7 +98,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
   private StreamFactory streamFactory = new StreamFactory();
   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private String coreName;
-  private Map<String, DaemonStream> daemons = new HashMap<>();
+  private Map<String, DaemonStream> daemons = Collections.synchronizedMap(new HashMap());
 
   @Override
   public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) {
@@ -245,6 +246,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
       if(daemons.containsKey(daemonStream.getId())) {
         daemons.remove(daemonStream.getId()).close();
       }
+      daemonStream.setDaemons(daemons);
       daemonStream.open();  //This will start the deamonStream
       daemons.put(daemonStream.getId(), daemonStream);
       rsp.add("result-set", new DaemonResponseStream("Deamon:"+daemonStream.getId()+" started on "+coreName));

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f43742ac/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
index 77648df..8214f9a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
@@ -52,6 +52,8 @@ public class DaemonStream extends TupleStream implements Expressible {
   private Exception exception;
   private long runInterval;
   private String id;
+  private Map<String, DaemonStream> daemons;
+  private boolean terminate;
   private boolean closed = false;
   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -64,10 +66,13 @@ public class DaemonStream extends TupleStream implements Expressible {
     StreamExpressionNamedParameter idExpression = factory.getNamedOperand(expression, "id");
     StreamExpressionNamedParameter runExpression = factory.getNamedOperand(expression, "runInterval");
     StreamExpressionNamedParameter queueExpression = factory.getNamedOperand(expression, "queueSize");
+    StreamExpressionNamedParameter terminateExpression = factory.getNamedOperand(expression, "terminate");
+
 
     String id = null;
     long runInterval = 0L;
     int queueSize = 0;
+    boolean terminate = false;
 
     if(idExpression == null) {
       throw new IOException("Invalid expression id parameter expected");
@@ -82,24 +87,26 @@ public class DaemonStream extends TupleStream implements Expressible {
     }
 
     if(queueExpression != null) {
-       queueSize= Integer.parseInt(((StreamExpressionValue)queueExpression.getParameter()).getValue());
+       queueSize= Integer.parseInt(((StreamExpressionValue) queueExpression.getParameter()).getValue());
     }
 
-    // validate expression contains only what we want.
-    if(expression.getParameters().size() != streamExpressions.size() + 2 &&
-        expression.getParameters().size() != streamExpressions.size() + 3) {
-      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
+    if(terminateExpression != null) {
+      terminate = Boolean.parseBoolean(((StreamExpressionValue) terminateExpression.getParameter()).getValue());
     }
 
     if(1 != streamExpressions.size()){
       throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
     }
 
-    init(tupleStream, id, runInterval, queueSize);
+    init(tupleStream, id, runInterval, queueSize, terminate);
+  }
+
+  public DaemonStream(TupleStream tupleStream, String id, long runInterval, int queueSize, boolean terminate) {
+    init(tupleStream, id, runInterval, queueSize, terminate);
   }
 
   public DaemonStream(TupleStream tupleStream, String id, long runInterval, int queueSize) {
-    init(tupleStream, id, runInterval, queueSize);
+    this(tupleStream, id, runInterval, queueSize, false);
   }
 
   @Override
@@ -126,6 +133,7 @@ public class DaemonStream extends TupleStream implements Expressible {
     expression.addParameter(new StreamExpressionNamedParameter("id", id));
     expression.addParameter(new StreamExpressionNamedParameter("runInterval", Long.toString(runInterval)));
     expression.addParameter(new StreamExpressionNamedParameter("queueSize", Integer.toString(queueSize)));
+    expression.addParameter(new StreamExpressionNamedParameter("terminate", Boolean.toString(terminate)));
 
     return expression;
   }
@@ -148,10 +156,16 @@ public class DaemonStream extends TupleStream implements Expressible {
   }
 
   public void init(TupleStream tupleStream, String id, long runInterval, int queueSize) {
+    init(tupleStream, id, runInterval, queueSize, false);
+  }
+
+  public void init(TupleStream tupleStream, String id, long runInterval, int queueSize, boolean terminate) {
     this.tupleStream = tupleStream;
     this.id = id;
     this.runInterval = runInterval;
     this.queueSize = queueSize;
+    this.terminate = terminate;
+
     if(queueSize > 0) {
       queue = new ArrayBlockingQueue(queueSize);
       eatTuples = false;
@@ -228,6 +242,10 @@ public class DaemonStream extends TupleStream implements Expressible {
     return tuple;
   }
 
+  public void setDaemons(Map<String, DaemonStream> daemons) {
+    this.daemons = daemons;
+  }
+
   private synchronized void incrementIterations() {
     ++iterations;
   }
@@ -279,6 +297,18 @@ public class DaemonStream extends TupleStream implements Expressible {
                 errors = 0; // Reset errors on successful run.
                 if (tuple.fields.containsKey("sleepMillis")) {
                   this.sleepMillis = tuple.getLong("sleepMillis");
+
+                  if(terminate && sleepMillis > 0) {
+                    //TopicStream provides sleepMillis > 0 if the last run had no Tuples.
+                    //This means the topic queue is empty. Time to terminate.
+                    //Remove ourselves from the daemons map.
+                    if(daemons != null) {
+                      daemons.remove(id);
+                    }
+                    //Break out of the thread loop and end the run.
+                    break OUTER;
+                  }
+
                   this.runInterval = -1;
                 }
                 break INNER;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f43742ac/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 842f6a6..7b5777d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -533,24 +533,24 @@ public class StreamExpressionTest extends SolrCloudTestCase {
 
     // Basic test desc
     expression = StreamExpressionParser.parse("top("
-                                              + "n=2,"
-                                              + "unique("
-                                              +   "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
-                                              +   "over=\"a_f\"),"
-                                              + "sort=\"a_f desc\")");
+        + "n=2,"
+        + "unique("
+        + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+        + "over=\"a_f\"),"
+        + "sort=\"a_f desc\")");
     stream = new RankStream(expression, factory);
     tuples = getTuples(stream);
     
     assert(tuples.size() == 2);
-    assertOrder(tuples, 4,3);
+    assertOrder(tuples, 4, 3);
     
     // full factory
     stream = factory.constructStream("top("
-                                    + "n=4,"
-                                    + "unique("
-                                    +   "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
-                                    +   "over=\"a_f\"),"
-                                    + "sort=\"a_f asc\")");
+        + "n=4,"
+        + "unique("
+        + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
+        + "over=\"a_f\"),"
+        + "sort=\"a_f asc\")");
     tuples = getTuples(stream);
     
     assert(tuples.size() == 4);
@@ -827,7 +827,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
         .withFunctionName("parallel", ParallelStream.class)
         .withFunctionName("fetch", FetchStream.class);
 
-    stream = factory.constructStream("parallel("+COLLECTION+", workers=2, sort=\"a_f asc\", fetch("+COLLECTION+",  search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))");
+    stream = factory.constructStream("parallel(" + COLLECTION + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTION + ",  search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))");
     tuples = getTuples(stream);
 
     assert(tuples.size() == 10);
@@ -853,7 +853,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     assertTrue("blah blah blah 9".equals(t.getString("subject")));
 
 
-    stream = factory.constructStream("parallel("+COLLECTION+", workers=2, sort=\"a_f asc\", fetch("+COLLECTION+",  search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))");
+    stream = factory.constructStream("parallel(" + COLLECTION + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTION + ",  search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))");
     tuples = getTuples(stream);
 
     assert(tuples.size() == 10);
@@ -1003,6 +1003,45 @@ public class StreamExpressionTest extends SolrCloudTestCase {
 
   }
 
+
+  @Test
+  public void testTerminatingDaemonStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTION);
+
+    StreamFactory factory = new StreamFactory()
+        .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+        .withFunctionName("topic", TopicStream.class)
+        .withFunctionName("daemon", DaemonStream.class);
+
+    StreamExpression expression;
+    DaemonStream daemonStream;
+
+    SolrClientCache cache = new SolrClientCache();
+    StreamContext context = new StreamContext();
+    context.setSolrClientCache(cache);
+    expression = StreamExpressionParser.parse("daemon(topic("+COLLECTION+","+COLLECTION+", q=\"a_s:hello\", initialCheckpoint=0, id=\"topic1\", rows=2, fl=\"id\""
+        + "), id=test, runInterval=1000, terminate=true, queueSize=50)");
+    daemonStream = (DaemonStream)factory.constructStream(expression);
+    daemonStream.setStreamContext(context);
+
+    List<Tuple> tuples = getTuples(daemonStream);
+    assertTrue(tuples.size() == 10);
+    cache.close();
+  }
+
+
   @Test
   public void testRollupStream() throws Exception {
 
@@ -1367,7 +1406,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
 
 
     assert(tuples.size() == 9);
-    assertOrder(tuples, 0,1,2,3,4,7,6,8,9);
+    assertOrder(tuples, 0, 1, 2, 3, 4, 7, 6, 8, 9);
 
     //Test descending
 
@@ -1376,7 +1415,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     tuples = getTuples(pstream);
 
     assert(tuples.size() == 8);
-    assertOrder(tuples, 9,8,6,4,3,2,1,0);
+    assertOrder(tuples, 9, 8, 6, 4, 3, 2, 1, 0);
 
   }
 
@@ -1627,7 +1666,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     stream = new LeftOuterJoinStream(expression, factory);
     tuples = getTuples(stream);    
     assert(tuples.size() == 10);
-    assertOrder(tuples, 7,6,3,4,5,1,1,15,15,2);
+    assertOrder(tuples, 7, 6, 3, 4, 5, 1, 1, 15, 15, 2);
     
     // Results in both searches, no join matches
     expression = StreamExpressionParser.parse("leftOuterJoin("
@@ -1637,7 +1676,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     stream = new LeftOuterJoinStream(expression, factory);
     tuples = getTuples(stream);    
     assert(tuples.size() == 8);
-    assertOrder(tuples, 1,15,2,3,4,5,6,7);
+    assertOrder(tuples, 1, 15, 2, 3, 4, 5, 6, 7);
     
     // Differing field names
     expression = StreamExpressionParser.parse("leftOuterJoin("
@@ -1647,7 +1686,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     stream = new LeftOuterJoinStream(expression, factory);
     tuples = getTuples(stream);
     assert(tuples.size() == 10);
-    assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7);
+    assertOrder(tuples, 1, 1, 15, 15, 2, 3, 4, 5, 6, 7);
 
   }
 
@@ -1764,7 +1803,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     stream = new OuterHashJoinStream(expression, factory);
     tuples = getTuples(stream);    
     assert(tuples.size() == 10);
-    assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7);
+    assertOrder(tuples, 1, 1, 15, 15, 2, 3, 4, 5, 6, 7);
 
     // Basic desc
     expression = StreamExpressionParser.parse("outerHashJoin("
@@ -1794,7 +1833,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     stream = new OuterHashJoinStream(expression, factory);
     tuples = getTuples(stream);
     assert(tuples.size() == 10);
-    assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7);
+    assertOrder(tuples, 1, 1, 15, 15, 2, 3, 4, 5, 6, 7);
   }
 
   @Test
@@ -3202,6 +3241,120 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient());
   }
 
+
+  @Test
+  public void testParallelTerminatingDaemonUpdateStream() throws Exception {
+
+    CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).process(cluster.getSolrClient());
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish("parallelDestinationCollection1", cluster.getSolrClient().getZkStateReader(),
+        false, true, TIMEOUT);
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "0", "s_multi", "aaaa",  "s_multi", "bbbb",  "i_multi", "4", "i_multi", "7")
+        .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77")
+        .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777")
+        .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777")
+        .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777")
+        .commit(cluster.getSolrClient(), "collection1");
+
+    StreamExpression expression;
+    TupleStream stream;
+    Tuple t;
+
+    String zkHost = cluster.getZkServer().getZkAddress();
+    StreamFactory factory = new StreamFactory()
+        .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
+        .withCollectionZkHost("parallelDestinationCollection1", cluster.getZkServer().getZkAddress())
+        .withFunctionName("topic", TopicStream.class)
+        .withFunctionName("update", UpdateStream.class)
+        .withFunctionName("parallel", ParallelStream.class)
+        .withFunctionName("daemon", DaemonStream.class);
+
+    //Copy all docs to destinationCollection
+    String updateExpression = "daemon(update(parallelDestinationCollection1, batchSize=2, topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", partitionKeys=\"a_f\", initialCheckpoint=0, id=\"topic1\")), terminate=true, runInterval=\"1000\", id=\"test\")";
+    TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\""+zkHost+"\", sort=\"batchNumber asc\")");
+    List<Tuple> tuples = getTuples(parallelUpdateStream);
+    assert(tuples.size() == 2);
+
+
+    ModifiableSolrParams sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream", "action", "list"));
+
+    int workersComplete = 0;
+
+    //Daemons should terminate after the topic is completed
+    //Loop through all shards and wait for the daemons to be gone from the listing.
+    for(JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+      INNER:
+      while(true) {
+        SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams);
+        solrStream.open();
+        Tuple tupleResponse = solrStream.read();
+        if (tupleResponse.EOF) {
+          solrStream.close();
+          ++workersComplete;
+          break INNER;
+        } else {
+          solrStream.close();
+          Thread.sleep(1000);
+        }
+      }
+    }
+
+    assertEquals(cluster.getJettySolrRunners().size(), workersComplete);
+
+    cluster.getSolrClient().commit("parallelDestinationCollection1");
+
+    //Ensure that destinationCollection actually has the new docs.
+    expression = StreamExpressionParser.parse("search(parallelDestinationCollection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")");
+    stream = new CloudSolrStream(expression, factory);
+    tuples = getTuples(stream);
+    assertEquals(5, tuples.size());
+
+    Tuple tuple = tuples.get(0);
+    assert(tuple.getLong("id") == 0);
+    assert(tuple.get("a_s").equals("hello"));
+    assert(tuple.getLong("a_i") == 0);
+    assert(tuple.getDouble("a_f") == 0.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7"));
+
+    tuple = tuples.get(1);
+    assert(tuple.getLong("id") == 1);
+    assert(tuple.get("a_s").equals("hello"));
+    assert(tuple.getLong("a_i") == 1);
+    assert(tuple.getDouble("a_f") == 1.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777"));
+
+    tuple = tuples.get(2);
+    assert(tuple.getLong("id") == 2);
+    assert(tuple.get("a_s").equals("hello"));
+    assert(tuple.getLong("a_i") == 2);
+    assert(tuple.getDouble("a_f") == 0.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77"));
+
+    tuple = tuples.get(3);
+    assert(tuple.getLong("id") == 3);
+    assert(tuple.get("a_s").equals("hello"));
+    assert(tuple.getLong("a_i") == 3);
+    assert(tuple.getDouble("a_f") == 3.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777"));
+
+    tuple = tuples.get(4);
+    assert(tuple.getLong("id") == 4);
+    assert(tuple.get("a_s").equals("hello"));
+    assert(tuple.getLong("a_i") == 4);
+    assert(tuple.getDouble("a_f") == 4.0);
+    assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3");
+    assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777"));
+
+    CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient());
+  }
+
+
+
   ////////////////////////////////////////////
   @Test
   public void testCommitStream() throws Exception {