You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@solr.apache.org by GitBox <gi...@apache.org> on 2022/07/18 13:59:52 UTC

[GitHub] [solr] cpoerschke commented on a diff in pull request #935: SOLR-16286 : Topic stream not honoring initialCheckpoint in getPersis…

cpoerschke commented on code in PR #935:
URL: https://github.com/apache/solr/pull/935#discussion_r923396965


##########
solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java:
##########
@@ -4372,7 +4372,7 @@ public void testClassifyStream() throws Exception {
 
     classifyStream = new SolrStream(url, paramsLoc);
     idToLabel = getIdToLabel(classifyStream, "probability_d");
-    assertEquals(idToLabel.size(), 2);

Review Comment:
   note to self: I don't understand yet why the `testClassifyStream` expectations change here.



##########
solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java:
##########
@@ -2528,6 +2528,97 @@ public void testSubFacetStream() throws Exception {
     assertTrue(peri == 1.0D);
   }
 
+  @Test
+  public void testTopicStreamInitialCheckpoint() throws Exception {
+    Assume.assumeTrue(!useAlias);
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    StreamFactory factory =
+        new StreamFactory()
+            .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
+            .withFunctionName("topic", TopicStream.class)
+            .withFunctionName("search", CloudSolrStream.class)
+            .withFunctionName("daemon", DaemonStream.class);
+
+    StreamExpression expression;
+    TupleStream stream = null;
+    List<Tuple> tuples;
+
+    SolrClientCache cache = new SolrClientCache();
+
+    try {
+
+      // Store checkpoints in the same index as the main documents. This perfectly valid
+      expression =
+          StreamExpressionParser.parse(
+              "topic(collection1, collection1, q=\"a_s:hello\", fl=\"id\", id=\"1000000\", initialCheckpoint=0)");
+
+      stream = new TopicStream(expression, factory);
+      StreamContext context = new StreamContext();
+      context.setSolrClientCache(cache);
+      stream.setStreamContext(context);
+      tuples = getTuples(stream);
+
+      assertEquals(10, tuples.size());
+
+      // force commit of checkpoints
+      cluster.getSolrClient().commit("collection1");
+
+      expression =
+          StreamExpressionParser.parse(
+              "search(collection1, q=\"id:1000000\", fl=\"id, checkpoint_ss, _version_\", sort=\"id asc\")");
+      stream = factory.constructStream(expression);
+      context = new StreamContext();
+      context.setSolrClientCache(cache);
+      stream.setStreamContext(context);
+      tuples = getTuples(stream);
+      assertEquals(tuples.size(), 1);
+      List<String> checkpoints = tuples.get(0).getStrings("checkpoint_ss");
+      assertEquals(checkpoints.size(), 2);

Review Comment:
   ```suggestion
         assertEquals(checkpoints.size(), 2); // one checkpoint for each shard
   ```



##########
solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java:
##########
@@ -3048,7 +3048,7 @@ public void testPriorityStream() throws Exception {
       expression =
           StreamExpressionParser.parse(
               "priority(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0),"

Review Comment:
   Looking at what the `testPriorityStream` test is testing, I wonder if the `initialCheckpoint=0` here and at lines 3035/3036 need also be removed?
   ```suggestion
                 "priority(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000),"
   ```
   
   Having said that, it's not very obvious when the checkpoints from the line 3019/3020 stream will be  persisted and/or otherwise carry-over to the line 3035/3036 stream.



##########
solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java:
##########
@@ -2528,6 +2528,97 @@ public void testSubFacetStream() throws Exception {
     assertTrue(peri == 1.0D);
   }
 
+  @Test
+  public void testTopicStreamInitialCheckpoint() throws Exception {
+    Assume.assumeTrue(!useAlias);
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+
+    StreamFactory factory =
+        new StreamFactory()
+            .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
+            .withFunctionName("topic", TopicStream.class)
+            .withFunctionName("search", CloudSolrStream.class)
+            .withFunctionName("daemon", DaemonStream.class);
+
+    StreamExpression expression;
+    TupleStream stream = null;
+    List<Tuple> tuples;
+
+    SolrClientCache cache = new SolrClientCache();
+
+    try {
+
+      // Store checkpoints in the same index as the main documents. This perfectly valid
+      expression =
+          StreamExpressionParser.parse(
+              "topic(collection1, collection1, q=\"a_s:hello\", fl=\"id\", id=\"1000000\", initialCheckpoint=0)");
+
+      stream = new TopicStream(expression, factory);
+      StreamContext context = new StreamContext();
+      context.setSolrClientCache(cache);
+      stream.setStreamContext(context);
+      tuples = getTuples(stream);
+
+      assertEquals(10, tuples.size());
+
+      // force commit of checkpoints
+      cluster.getSolrClient().commit("collection1");
+
+      expression =
+          StreamExpressionParser.parse(
+              "search(collection1, q=\"id:1000000\", fl=\"id, checkpoint_ss, _version_\", sort=\"id asc\")");
+      stream = factory.constructStream(expression);
+      context = new StreamContext();
+      context.setSolrClientCache(cache);
+      stream.setStreamContext(context);
+      tuples = getTuples(stream);
+      assertEquals(tuples.size(), 1);
+      List<String> checkpoints = tuples.get(0).getStrings("checkpoint_ss");
+      assertEquals(checkpoints.size(), 2);
+
+      expression =
+          StreamExpressionParser.parse(
+              "topic(collection1, collection1, q=\"a_s:hello\", fl=\"id\", id=\"1000000\", initialCheckpoint=0)");
+
+      stream = new TopicStream(expression, factory);
+      context = new StreamContext();
+      context.setSolrClientCache(cache);
+      stream.setStreamContext(context);
+      tuples = getTuples(stream);
+
+      assertEquals(10, tuples.size());
+
+      expression =
+          StreamExpressionParser.parse(
+              "topic(collection1, collection1, q=\"a_s:hello\", fl=\"id\", id=\"1000000\")");
+
+      stream = new TopicStream(expression, factory);
+      context = new StreamContext();
+      context.setSolrClientCache(cache);
+      stream.setStreamContext(context);
+      tuples = getTuples(stream);
+
+      // Should be zero because the checkpoints will be set to the highest vesion on the shards.

Review Comment:
   ```suggestion
         // Should be zero because the checkpoints will be set to the highest version on the shards.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org
For additional commands, e-mail: issues-help@solr.apache.org