You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/04/12 21:58:41 UTC

[GitHub] [pulsar] jai1 commented on a change in pull request #4031: Feature - support seek() on Reader

jai1 commented on a change in pull request #4031: Feature - support seek() on Reader
URL: https://github.com/apache/pulsar/pull/4031#discussion_r275072673
 
 

 ##########
 File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
 ##########
 @@ -510,4 +513,114 @@ public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception {
         reader.close();
         producer.close();
     }
+
+    @Test
+    public void testReaderIsAbleToSeekWithTimeOnBeginningOfTopic() throws Exception {
+        final String topicName = "persistent://my-property/my-ns/ReaderSeekWithTimeOnBeginningOfTopic";
+        final int numOfMessage = 10;
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName).create();
+
+        for (int i = 0; i < numOfMessage; i++) {
+            producer.send(String.format("msg num %d", i).getBytes());
+        }
+
+        Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
+                .startMessageId(MessageId.earliest).create();
+
+        assertTrue(reader.hasMessageAvailable());
+
+        // Read all messages the first time
+        for (int i = 0; i < numOfMessage; i++) {
+            Message<byte[]> message = reader.readNext();
+            Assert.assertEquals(message.getData(), String.format("msg num %d", i).getBytes());
+        }
+
+        assertFalse(reader.hasMessageAvailable());
+
+        // Perform cursor reset by time
+        reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
+
+        // FIXME: This sleep is necessary because future completed by seekAsync() might complete with a state different
+        //        than Ready, this is from the fact that consumer reset makes a disconnect from broker in order to be
+        //        able to do the reset().
+        Thread.sleep(1000);
 
 Review comment:
   Didn't understand what you mean by - 
   `this is from the fact that consumer reset makes a disconnect from broker in order to be able to do the reset()`
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services