You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2016/07/17 03:18:07 UTC

[1/2] lucene-solr:master: SOLR-9240: Added testcase with text field in the fl for topic

Repository: lucene-solr
Updated Branches:
  refs/heads/master 74633594d -> c3c1f8d6e


SOLR-9240: Added testcase with text field in the fl for topic


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c3c1f8d6
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c3c1f8d6
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c3c1f8d6

Branch: refs/heads/master
Commit: c3c1f8d6e6cb57cb30e736d5ff0387400729d216
Parents: fc3894e
Author: jbernste <jb...@apache.org>
Authored: Tue Jul 12 11:36:05 2016 -0400
Committer: jbernste <jb...@apache.org>
Committed: Sat Jul 16 22:36:30 2016 -0400

----------------------------------------------------------------------
 .../solrj/io/stream/StreamExpressionTest.java   | 79 +++++++++++++++++---
 1 file changed, 69 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c3c1f8d6/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 4af565a..f2446f3 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -2533,16 +2533,16 @@ public class StreamExpressionTest extends SolrCloudTestCase {
   public void testParallelTopicStream() throws Exception {
 
     new UpdateRequest()
-        .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1")
-        .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2")
-        .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3")
-        .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4")
-        .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "5")
-        .add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6")
-        .add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7")
-        .add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8")
-        .add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9")
-        .add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10")
+        .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1", "subject", "ha ha bla blah0")
+        .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2", "subject", "ha ha bla blah2")
+        .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3", "subject", "ha ha bla blah3")
+        .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4", "subject", "ha ha bla blah4")
+        .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "5", "subject", "ha ha bla blah5")
+        .add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6","subject", "ha ha bla blah6")
+        .add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7","subject", "ha ha bla blah7")
+        .add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8", "subject", "ha ha bla blah8")
+        .add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9", "subject", "ha ha bla blah9")
+        .add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10", "subject", "ha ha bla blah10")
         .commit(cluster.getSolrClient(), COLLECTION);
 
     StreamFactory factory = new StreamFactory()
@@ -2653,6 +2653,37 @@ public class StreamExpressionTest extends SolrCloudTestCase {
       context.setSolrClientCache(cache);
       stream.setStreamContext(context);
       assertTopicRun(stream, "12","13");
+
+      //Test text extraction
+
+      expression = StreamExpressionParser.parse("parallel(collection1, " +
+          "workers=\"2\", " +
+          "sort=\"_version_ asc\"," +
+          "topic(collection1, " +
+          "collection1, " +
+          "q=\"subject:bla\", " +
+          "fl=\"subject\", " +
+          "id=\"3000000\", " +
+          "initialCheckpoint=\"0\", " +
+          "partitionKeys=\"id\"))");
+
+      stream = factory.constructStream(expression);
+      context = new StreamContext();
+      context.setSolrClientCache(cache);
+      stream.setStreamContext(context);
+
+      assertTopicSubject(stream, "ha ha bla blah0",
+          "ha ha bla blah1",
+          "ha ha bla blah2",
+          "ha ha bla blah3",
+          "ha ha bla blah4",
+          "ha ha bla blah5",
+          "ha ha bla blah6",
+          "ha ha bla blah7",
+          "ha ha bla blah8",
+          "ha ha bla blah9",
+          "ha ha bla blah10");
+
     } finally {
       cache.close();
     }
@@ -3314,4 +3345,32 @@ public class StreamExpressionTest extends SolrCloudTestCase {
       throw new Exception("Wrong count in topic run:"+count);
     }
   }
+
+  private void assertTopicSubject(TupleStream stream, String... textArray) throws Exception {
+    long version = -1;
+    int count = 0;
+    List<String> texts = new ArrayList();
+    for(String text : textArray) {
+      texts.add(text);
+    }
+
+    try {
+      stream.open();
+      while (true) {
+        Tuple tuple = stream.read();
+        if (tuple.EOF) {
+          break;
+        } else {
+          ++count;
+          String subject = tuple.getString("subject");
+          if (!texts.contains(subject)) {
+            throw new Exception("Expecting subject in topic run not found:" + subject);
+          }
+        }
+      }
+    } finally {
+      stream.close();
+    }
+  }
+
 }


[2/2] lucene-solr:master: SOLR-9240:Support running the topic() Streaming Expression in parallel mode.

Posted by jb...@apache.org.
SOLR-9240:Support running the topic() Streaming Expression in parallel mode.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/fc3894e8
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/fc3894e8
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/fc3894e8

Branch: refs/heads/master
Commit: fc3894e837701b78a4704cf27529c34c15666586
Parents: 7463359
Author: jbernste <jb...@apache.org>
Authored: Mon Jul 11 20:10:27 2016 -0400
Committer: jbernste <jb...@apache.org>
Committed: Sat Jul 16 22:36:30 2016 -0400

----------------------------------------------------------------------
 .../client/solrj/io/stream/ParallelStream.java  |   2 +-
 .../client/solrj/io/stream/TopicStream.java     |  41 +++-
 .../solr/configsets/streaming/conf/schema.xml   |   2 +-
 .../solr/client/solrj/io/sql/JdbcTest.java      |   4 +-
 .../solrj/io/stream/StreamExpressionTest.java   | 185 ++++++++++++++++++-
 .../client/solrj/io/stream/StreamingTest.java   |  19 +-
 6 files changed, 231 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fc3894e8/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
index 779cc31..3125ff0 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
@@ -101,7 +101,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
 
     // Workers
     if(null == workersParam || null == workersParam.getParameter() || !(workersParam.getParameter() instanceof StreamExpressionValue)){
-      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single 'workersParam' parameter of type positive integer but didn't find one",expression));
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single 'workers' parameter of type positive integer but didn't find one",expression));
     }
     String workersStr = ((StreamExpressionValue)workersParam.getParameter()).getValue();
     int workersInt = 0;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fc3894e8/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
index 30c6f59..c4343c6 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -74,9 +75,9 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
   private int runCount;
   private String id;
   protected long checkpointEvery;
-
   private Map<String, Long> checkpoints = new HashMap<String, Long>();
   private String checkpointCollection;
+  private long initialCheckpoint = -1;
 
   // Use TopicStream that takes a SolrParams
   @Deprecated
@@ -84,12 +85,14 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
                      String checkpointCollection,
                      String collection,
                      String id,
+                     long initialCheckpoint,
                      long checkpointEvery,
                      Map<String, String> params) {
     init(zkHost,
          checkpointCollection,
          collection,
          id,
+         initialCheckpoint,
          checkpointEvery,
          new MapSolrParams(params));
   }
@@ -98,12 +101,14 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
                      String checkpointCollection,
                      String collection,
                      String id,
+                     long initialCheckpoint,
                      long checkpointEvery,
                      SolrParams params) {
     init(zkHost,
         checkpointCollection,
         collection,
         id,
+        initialCheckpoint,
         checkpointEvery,
         params);
   }
@@ -113,6 +118,7 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
                     String checkpointCollection,
                     String collection,
                     String id,
+                    long initialCheckpoint,
                     long checkpointEvery,
                     SolrParams params) {
     this.zkHost  = zkHost;
@@ -121,11 +127,13 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
     if(mParams.getParams("rows") == null) {
       mParams.set("rows", "500");
     }
+
     this.params  = mParams; 
     this.collection = collection;
     this.checkpointCollection = checkpointCollection;
     this.checkpointEvery = checkpointEvery;
     this.id = id;
+    this.initialCheckpoint = initialCheckpoint;
     this.comp = new FieldComparator("_version_", ComparatorOrder.ASCENDING);
   }
 
@@ -147,6 +155,13 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
       throw new IOException("invalid TopicStream fl cannot be null");
     }
 
+    long initialCheckpoint = -1;
+    StreamExpressionNamedParameter initialCheckpointParam = factory.getNamedOperand(expression, "initialCheckpoint");
+
+    if(initialCheckpointParam != null) {
+      initialCheckpoint = Long.parseLong(((StreamExpressionValue) initialCheckpointParam.getParameter()).getValue());
+    }
+
     long checkpointEvery = -1;
     StreamExpressionNamedParameter checkpointEveryParam = factory.getNamedOperand(expression, "checkpointEvery");
 
@@ -198,6 +213,7 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
         checkpointCollectionName,
         collectionName,
         ((StreamExpressionValue) idParam.getParameter()).getValue(),
+        initialCheckpoint,
         checkpointEvery,
         params);
   }
@@ -226,6 +242,9 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
     // zkHost
     expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
     expression.addParameter(new StreamExpressionNamedParameter("id", id));
+    if(initialCheckpoint > -1) {
+      expression.addParameter(new StreamExpressionNamedParameter("initialCheckpoint", Long.toString(initialCheckpoint)));
+    }
     expression.addParameter(new StreamExpressionNamedParameter("checkpointEvery", Long.toString(checkpointEvery)));
 
     return expression;
@@ -279,6 +298,11 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
     this.solrStreams = new ArrayList();
     this.eofTuples = Collections.synchronizedMap(new HashMap());
 
+    if(checkpoints.size() == 0 && streamContext.numWorkers > 1) {
+      //Each worker must maintain it's own checkpoints
+      this.id = this.id+"_"+streamContext.workerID;
+    }
+
     if(streamContext.getSolrClientCache() != null) {
       cloudSolrClient = streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
     } else {
@@ -385,7 +409,13 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
 
     for(Slice slice : slices) {
       String sliceName = slice.getName();
-      long checkpoint = getCheckpoint(slice, clusterState.getLiveNodes());
+      long checkpoint = 0;
+      if(initialCheckpoint > -1) {
+        checkpoint = initialCheckpoint;
+      } else {
+        checkpoint = getCheckpoint(slice, clusterState.getLiveNodes());
+      }
+
       this.checkpoints.put(sliceName, checkpoint);
     }
   }
@@ -405,7 +435,9 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
         SolrStream solrStream = new SolrStream(coreUrl, params);
 
         if(streamContext != null) {
-          solrStream.setStreamContext(streamContext);
+          StreamContext localContext = new StreamContext();
+          localContext.setSolrClientCache(streamContext.getSolrClientCache());
+          solrStream.setStreamContext(localContext);
         }
 
         try {
@@ -502,6 +534,9 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
           throw new Exception("Collection not found:" + this.collection);
         }
       }
+
+
+      Iterator<String> iterator = params.getParameterNamesIterator();
       ModifiableSolrParams mParams = new ModifiableSolrParams(params);
       mParams.set("distrib", "false"); // We are the aggregator.
       String fl = mParams.get("fl");

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fc3894e8/solr/solrj/src/test-files/solrj/solr/configsets/streaming/conf/schema.xml
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test-files/solrj/solr/configsets/streaming/conf/schema.xml b/solr/solrj/src/test-files/solrj/solr/configsets/streaming/conf/schema.xml
index 34ecdcb..e7f2772 100644
--- a/solr/solrj/src/test-files/solrj/solr/configsets/streaming/conf/schema.xml
+++ b/solr/solrj/src/test-files/solrj/solr/configsets/streaming/conf/schema.xml
@@ -387,7 +387,7 @@
     -->
 
 
-    <field name="id" type="int" indexed="true" stored="true" multiValued="false" required="false"/>
+    <field name="id" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
     <field name="signatureField" type="string" indexed="true" stored="false"/>
 
     <field name="s_multi" type="string" indexed="true" stored="true" docValues="true" multiValued="true"/>

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fc3894e8/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java
index a031e58..41f3309 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java
@@ -615,13 +615,13 @@ public class JdbcTest extends SolrCloudTestCase {
     assertEquals("my_float_col".length(), resultSetMetaData.getColumnDisplaySize(4));
     assertEquals("testnull_i".length(), resultSetMetaData.getColumnDisplaySize(5));
 
-    assertEquals("Long", resultSetMetaData.getColumnTypeName(1));
+    assertEquals("String", resultSetMetaData.getColumnTypeName(1));
     assertEquals("Long", resultSetMetaData.getColumnTypeName(2));
     assertEquals("String", resultSetMetaData.getColumnTypeName(3));
     assertEquals("Double", resultSetMetaData.getColumnTypeName(4));
     assertEquals("Long", resultSetMetaData.getColumnTypeName(5));
 
-    assertEquals(Types.DOUBLE, resultSetMetaData.getColumnType(1));
+    assertEquals(Types.VARCHAR, resultSetMetaData.getColumnType(1));
     assertEquals(Types.DOUBLE, resultSetMetaData.getColumnType(2));
     assertEquals(Types.VARCHAR, resultSetMetaData.getColumnType(3));
     assertEquals(Types.DOUBLE, resultSetMetaData.getColumnType(4));

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fc3894e8/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index b1da1c6..4af565a 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -261,7 +261,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     tuples = getTuples(stream);
 
     assertEquals(5, tuples.size());
-    assertOrder(tuples, 0,2,1,3,4);
+    assertOrder(tuples, 0, 2, 1, 3, 4);
   }
 
   @Test
@@ -1548,7 +1548,9 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     stream = new HashJoinStream(expression, factory);
     tuples = getTuples(stream);
     assertEquals(17, tuples.size());
-    assertOrder(tuples, 1, 1, 2, 2, 15, 15, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 7);
+
+    //Does a lexical sort
+    assertOrder(tuples, 1, 1, 15, 15, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 7);
 
   }
 
@@ -2526,6 +2528,138 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     }
   }
 
+
+  @Test
+  public void testParallelTopicStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTION);
+
+    StreamFactory factory = new StreamFactory()
+        .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
+        .withFunctionName("topic", TopicStream.class)
+        .withFunctionName("search", CloudSolrStream.class)
+        .withFunctionName("parallel", ParallelStream.class)
+        .withFunctionName("daemon", DaemonStream.class);
+
+    StreamExpression expression;
+    TupleStream stream;
+    List<Tuple> tuples;
+
+    SolrClientCache cache = new SolrClientCache();
+
+    try {
+      //Store checkpoints in the same index as the main documents. This is perfectly valid
+      expression = StreamExpressionParser.parse("parallel(collection1, " +
+                                                         "workers=\"2\", " +
+                                                         "sort=\"_version_ asc\"," +
+                                                         "topic(collection1, " +
+                                                               "collection1, " +
+                                                               "q=\"a_s:hello\", " +
+                                                               "fl=\"id\", " +
+                                                               "id=\"1000000\", " +
+                                                               "partitionKeys=\"id\"))");
+
+      stream = factory.constructStream(expression);
+      StreamContext context = new StreamContext();
+      context.setSolrClientCache(cache);
+      stream.setStreamContext(context);
+      tuples = getTuples(stream);
+
+      //Should be zero because the checkpoints will be set to the highest version on the shards.
+      assertEquals(tuples.size(), 0);
+
+      cluster.getSolrClient().commit("collection1");
+      //Now check to see if the checkpoints are present
+
+      expression = StreamExpressionParser.parse("search(collection1, q=\"id:1000000*\", fl=\"id, checkpoint_ss, _version_\", sort=\"id asc\")");
+
+      stream = factory.constructStream(expression);
+      context = new StreamContext();
+      context.setSolrClientCache(cache);
+      stream.setStreamContext(context);
+      tuples = getTuples(stream);
+      assertEquals(tuples.size(), 2);
+      List<String> checkpoints = tuples.get(0).getStrings("checkpoint_ss");
+      assertEquals(checkpoints.size(), 2);
+      String id1 = tuples.get(0).getString("id");
+      String id2 = tuples.get(1).getString("id");
+      assertTrue(id1.equals("1000000_0"));
+      assertTrue(id2.equals("1000000_1"));
+
+      //Index a few more documents
+      new UpdateRequest()
+          .add(id, "10", "a_s", "hello", "a_i", "13", "a_f", "9")
+          .add(id, "11", "a_s", "hello", "a_i", "14", "a_f", "10")
+          .commit(cluster.getSolrClient(), COLLECTION);
+
+      expression = StreamExpressionParser.parse("parallel(collection1, " +
+          "workers=\"2\", " +
+          "sort=\"_version_ asc\"," +
+          "topic(collection1, " +
+          "collection1, " +
+          "q=\"a_s:hello\", " +
+          "fl=\"id\", " +
+          "id=\"1000000\", " +
+          "partitionKeys=\"id\"))");
+
+      stream = factory.constructStream(expression);
+      context = new StreamContext();
+      context.setSolrClientCache(cache);
+      stream.setStreamContext(context);
+
+      assertTopicRun(stream, "10", "11");
+
+      //Test will initial checkpoint. This should pull all
+
+      expression = StreamExpressionParser.parse("parallel(collection1, " +
+          "workers=\"2\", " +
+          "sort=\"_version_ asc\"," +
+          "topic(collection1, " +
+          "collection1, " +
+          "q=\"a_s:hello\", " +
+          "fl=\"id\", " +
+          "id=\"2000000\", " +
+          "initialCheckpoint=\"0\", " +
+          "partitionKeys=\"id\"))");
+
+      stream = factory.constructStream(expression);
+      context = new StreamContext();
+      context.setSolrClientCache(cache);
+      stream.setStreamContext(context);
+      assertTopicRun(stream, "0","1","2","3","4","5","6","7","8","9","10","11");
+
+      //Add more documents
+      //Index a few more documents
+      new UpdateRequest()
+          .add(id, "12", "a_s", "hello", "a_i", "13", "a_f", "9")
+          .add(id, "13", "a_s", "hello", "a_i", "14", "a_f", "10")
+          .commit(cluster.getSolrClient(), COLLECTION);
+
+      //Run the same topic again including the initialCheckpoint. It should start where it left off.
+      //initialCheckpoint should be ignored for all but the first run.
+      stream = factory.constructStream(expression);
+      context = new StreamContext();
+      context.setSolrClientCache(cache);
+      stream.setStreamContext(context);
+      assertTopicRun(stream, "12","13");
+    } finally {
+      cache.close();
+    }
+  }
+
+
+
   @Test
   public void testUpdateStream() throws Exception {
 
@@ -3031,9 +3165,9 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     int i = 0;
     for(int val : ids) {
       Tuple t = tuples.get(i);
-      Long tip = (Long)t.get(fieldName);
-      if(tip.intValue() != val) {
-        throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+      String tip = t.getString(fieldName);
+      if(!tip.equals(Integer.toString(val))) {
+        throw new Exception("Found value:"+tip+" expecting:"+val);
       }
       ++i;
     }
@@ -3119,9 +3253,9 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     int i=0;
     for(int val : ids) {
       Map t = maps.get(i);
-      Long tip = (Long)t.get("id");
-      if(tip.intValue() != val) {
-        throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+      String tip = (String)t.get("id");
+      if(!tip.equals(Integer.toString(val))) {
+        throw new Exception("Found value:"+tip+" expecting:"+val);
       }
       ++i;
     }
@@ -3145,4 +3279,39 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     return true;
   }
 
+  private void assertTopicRun(TupleStream stream, String... idArray) throws Exception {
+    long version = -1;
+    int count = 0;
+    List<String> ids = new ArrayList();
+    for(String id : idArray) {
+      ids.add(id);
+    }
+
+    try {
+      stream.open();
+      while (true) {
+        Tuple tuple = stream.read();
+        if (tuple.EOF) {
+          break;
+        } else {
+          ++count;
+          String id = tuple.getString("id");
+          if (!ids.contains(id)) {
+            throw new Exception("Expecting id in topic run not found:" + id);
+          }
+
+          long v = tuple.getLong("_version_");
+          if (v < version) {
+            throw new Exception("Out of order version in topic run:" + v);
+          }
+        }
+      }
+    } finally {
+      stream.close();
+    }
+
+    if(count != ids.size()) {
+      throw new Exception("Wrong count in topic run:"+count);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fc3894e8/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index 9685b74..0da6750 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -1315,7 +1315,12 @@ public class StreamingTest extends SolrCloudTestCase {
 
     SolrParams sParams = mapParams("q", "a_s:hello0", "rows", "500", "fl", "id");
 
-    TopicStream topicStream = new TopicStream(zkHost, COLLECTION, COLLECTION, "50000000", 1000000, sParams);
+    TopicStream topicStream = new TopicStream(zkHost,
+                                              COLLECTION,
+                                              COLLECTION,
+                                              "50000000",
+                                              -1,
+                                              1000000, sParams);
 
     DaemonStream daemonStream = new DaemonStream(topicStream, "daemon1", 1000, 500);
     daemonStream.setStreamContext(context);
@@ -1895,9 +1900,9 @@ public class StreamingTest extends SolrCloudTestCase {
     int i = 0;
     for(int val : ids) {
       Tuple t = tuples.get(i);
-      Long tip = (Long)t.get("id");
-      if(tip.intValue() != val) {
-        throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+      String tip = (String)t.get("id");
+      if(!tip.equals(Integer.toString(val))) {
+        throw new Exception("Found value:"+tip+" expecting:"+val);
       }
       ++i;
     }
@@ -1926,9 +1931,9 @@ public class StreamingTest extends SolrCloudTestCase {
     int i=0;
     for(int val : ids) {
       Map t = maps.get(i);
-      Long tip = (Long)t.get("id");
-      if(tip.intValue() != val) {
-        throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
+      String tip = (String)t.get("id");
+      if(!tip.equals(Integer.toString(val))) {
+        throw new Exception("Found value:"+tip+" expecting:"+val);
       }
       ++i;
     }