You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/06/15 03:27:05 UTC

[pulsar] branch master updated: Feature - reset cursor on Reader to current position (#4331)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c51adc  Feature - reset cursor on Reader to current position (#4331)
1c51adc is described below

commit 1c51adc7114ab7aaf1c54e6e022920ed59f56cfa
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Sat Jun 15 00:26:59 2019 -0300

    Feature - reset cursor on Reader to current position (#4331)
    
    * Feature - reset cursor on Reader to current position
    
    *Motivation*
    
    There are some cases in which is it useful to be able to include current
    position of message when reset of cursor was made.
    
    This was reported by a `vvy` on slack, no issue has been created to track this.
    
    *Modifications*
    
      - Add startMessageIdInclusive() to support include current position of
        reset on ReaderBuilder.
      - Add resetIncludeHead field for Reader and Consumer Configuration Data
      - Fix position of cursor for non durable consumer.
      - Improve discard if statement for batch enable mode.
      - Add discard if statement for batch disable mode.
      - Improve test case for latest Reader seek.
      - Add test case to assert the start of specific message id at the expected
        position with data provider scenarios:
          A. Batch enable and start inclusive enable.
          B. Batch enable and start inclusive disable.
          C. Batch disable and start inclusive enable.
          D. Batch disable and start inclusive disable.
---
 .../broker/service/persistent/PersistentTopic.java |   9 +-
 .../apache/pulsar/client/api/TopicReaderTest.java  | 126 +++++++++++++++++----
 .../apache/pulsar/client/api/ReaderBuilder.java    |  14 +++
 pulsar-client-cpp/python/pulsar_test.py            |  13 ++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  45 ++++++--
 .../pulsar/client/impl/ReaderBuilderImpl.java      |   6 +
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |   5 +
 .../impl/conf/ConsumerConfigurationData.java       |   2 +
 .../client/impl/conf/ReaderConfigurationData.java  |   1 +
 9 files changed, 181 insertions(+), 40 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1110b29..0f132c0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -547,7 +547,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 future.completeExceptionally(e);
             }
         }).exceptionally(ex -> {
-            log.warn("[{}] Failed to create subscription for {}: ", topic, subscriptionName, ex.getMessage());
+            log.warn("[{}] Failed to create subscription: {} error: {}", topic, subscriptionName, ex.getMessage());
             USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this);
             future.completeExceptionally(new PersistenceException(ex));
             return null;
@@ -607,11 +607,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             if (msgId instanceof BatchMessageIdImpl) {
                 // When the start message is relative to a batch, we need to take one step back on the previous message,
                 // because the "batch" might not have been consumed in its entirety.
-                // The client will then be able to discard the first messages in the batch.
-                if (((BatchMessageIdImpl) msgId).getBatchIndex() >= 0) {
-                    entryId = msgId.getEntryId() - 1;
-                }
+                // The client will then be able to discard the first messages if needed.
+                entryId = msgId.getEntryId() - 1;
             }
+
             Position startPosition = new PositionImpl(ledgerId, entryId);
             ManagedCursor cursor = null;
             try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 4bfa589..53ff341 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -27,10 +27,7 @@ import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
@@ -42,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class TopicReaderTest extends ProducerConsumerBase {
@@ -60,6 +58,31 @@ public class TopicReaderTest extends ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    @DataProvider
+    public static Object[][] variationsForExpectedPos() {
+        return new Object[][] {
+                // batching / start-inclusive / num-of-messages
+                {true, true, 10 },
+                {true, false, 10 },
+                {false, true, 10 },
+                {false, false, 10 },
+
+                {true, true, 100 },
+                {true, false, 100 },
+                {false, true, 100 },
+                {false, false, 100 },
+        };
+    }
+
+    @DataProvider
+    public static Object[][] variationsForResetOnLatestMsg() {
+        return new Object[][] {
+                // start-inclusive / num-of-messages
+                {true, 20},
+                {false, 20}
+        };
+    }
+
     @Test
     public void testSimpleReader() throws Exception {
         Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReader")
@@ -178,36 +201,46 @@ public class TopicReaderTest extends ProducerConsumerBase {
         assertEquals(stats.subscriptions.size(), 0);
     }
 
-    @Test
-    public void testReaderOnLastMessage() throws Exception {
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderOnLastMessage")
+    @Test(dataProvider = "variationsForResetOnLatestMsg")
+    public void testReaderOnLatestMessage(boolean startInclusive, int numOfMessages) throws Exception {
+        final String topicName = "persistent://my-property/my-ns/ReaderOnLatestMessage";
+        final int halfOfMsgs = numOfMessages / 2;
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
                 .create();
-        for (int i = 0; i < 10; i++) {
-            String message = "my-message-" + i;
-            producer.send(message.getBytes());
+
+        for (int i = 0; i < halfOfMsgs; i++) {
+            producer.send(String.format("my-message-%d", i).getBytes());
         }
 
-        Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderOnLastMessage")
-                .startMessageId(MessageId.latest).create();
+        ReaderBuilder<byte[]> readerBuilder = pulsarClient.newReader()
+                .topic(topicName)
+                .startMessageId(MessageId.latest);
 
-        for (int i = 10; i < 20; i++) {
-            String message = "my-message-" + i;
-            producer.send(message.getBytes());
+        if (startInclusive) {
+            readerBuilder.startMessageIdInclusive();
         }
 
-        // Publish more messages and verify the readers only sees new messages
+        Reader<byte[]> reader = readerBuilder.create();
 
-        Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
-        for (int i = 10; i < 20; i++) {
-            msg = reader.readNext(1, TimeUnit.SECONDS);
+        for (int i = halfOfMsgs; i < numOfMessages; i++) {
+            producer.send(String.format("my-message-%d", i).getBytes());
+        }
 
-            String receivedMessage = new String(msg.getData());
-            log.debug("Received message: [{}]", receivedMessage);
-            String expectedMessage = "my-message-" + i;
+        // Publish more messages and verify the readers only sees new messages
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = halfOfMsgs; i < numOfMessages; i++) {
+            Message<byte[]> message = reader.readNext();
+            String receivedMessage = new String(message.getData());
+            String expectedMessage = String.format("my-message-%d", i);
             testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
         }
 
+        assertTrue(reader.isConnected());
+        assertEquals(((ReaderImpl) reader).getConsumer().numMessagesInQueue(), 0);
+        assertEquals(messageSet.size(), halfOfMsgs);
+
         // Acknowledge the consumption of all messages at once
         reader.close();
         producer.close();
@@ -650,4 +683,51 @@ public class TopicReaderTest extends ProducerConsumerBase {
         reader.close();
         producer.close();
     }
+
+    @Test(dataProvider = "variationsForExpectedPos")
+    public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages)
+            throws Exception {
+        final String topicName = "persistent://my-property/my-ns/ReaderStartMessageIdAtExpectedPos";
+        final int resetIndex = new Random().nextInt(numOfMessages); // Choose some random index to reset
+        final int firstMessage = startInclusive ? resetIndex : resetIndex + 1; // First message of reset
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(batching)
+                .create();
+
+        MessageId resetPos = null;
+        for (int i = 0; i < numOfMessages; i++) {
+            MessageId msgId = producer.send(String.format("msg num %d", i).getBytes());
+            if (resetIndex == i) {
+                resetPos = msgId;
+            }
+        }
+
+        ReaderBuilder<byte[]> readerBuilder = pulsarClient.newReader()
+                .topic(topicName)
+                .startMessageId(resetPos);
+
+        if (startInclusive) {
+            readerBuilder.startMessageIdInclusive();
+        }
+
+        Reader<byte[]> reader = readerBuilder.create();
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = firstMessage; i < numOfMessages; i++) {
+            Message<byte[]> message = reader.readNext();
+            String receivedMessage = new String(message.getData());
+            String expectedMessage = String.format("msg num %d", i);
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+
+        assertTrue(reader.isConnected());
+        assertEquals(((ReaderImpl) reader).getConsumer().numMessagesInQueue(), 0);
+
+        // Processed messages should be the number of messages in the range: [FirstResetMessage..TotalNumOfMessages]
+        assertEquals(messageSet.size(), numOfMessages - firstMessage);
+
+        reader.close();
+        producer.close();
+    }
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
index 792cf7b..8793eac 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -108,6 +108,7 @@ public interface ReaderBuilder<T> extends Cloneable {
 
     /**
      * The initial reader positioning is done by specifying a message id. The options are:
+     * <p>
      * <ul>
      * <li>{@link MessageId#earliest}: Start reading from the earliest message available in the topic</li>
      * <li>{@link MessageId#latest}: Start reading from end of the topic. The first message read will be the one
@@ -116,12 +117,25 @@ public interface ReaderBuilder<T> extends Cloneable {
      * immediately <b>*after*</b> the specified message</li>
      * </ul>
      *
+     * <p>
+     * If the first message <b>*after*</b> the specified message is not the desired behaviour, use
+     * {@link ReaderBuilder#startMessageIdInclusive()}.
+     *
      * @param startMessageId the message id where the reader will be initially positioned on
      * @return the reader builder instance
      */
     ReaderBuilder<T> startMessageId(MessageId startMessageId);
 
     /**
+     * Set the reader to include the given position of {@link ReaderBuilder#startMessageId(MessageId)}
+     * <p>
+     * This configuration option also applies for any cursor reset operation like {@link Reader#seek(MessageId)}.
+     *
+     * @return the reader builder instance
+     */
+    ReaderBuilder<T> startMessageIdInclusive();
+
+    /**
      * Sets a {@link ReaderListener} for the reader
      * <p>
      * When a {@link ReaderListener} is set, application will receive messages through it. Calls to
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index 6e4a14b..36d7fca 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -370,26 +370,33 @@ class PulsarTest(TestCase):
         client.close()
 
     def test_reader_on_specific_message(self):
+        num_of_msgs = 10
         client = Client(self.serviceUrl)
         producer = client.create_producer(
             'my-python-topic-reader-on-specific-message')
 
-        for i in range(10):
+        for i in range(num_of_msgs):
             producer.send(b'hello-%d' % i)
 
         reader1 = client.create_reader(
                 'my-python-topic-reader-on-specific-message',
                 MessageId.earliest)
 
-        for i in range(5):
+        for i in range(num_of_msgs/2):
             msg = reader1.read_next()
+            self.assertTrue(msg)
+            self.assertEqual(msg.data(), b'hello-%d' % i)
             last_msg_id = msg.message_id()
+            last_msg_idx = i
 
         reader2 = client.create_reader(
                 'my-python-topic-reader-on-specific-message',
                 last_msg_id)
 
-        for i in range(5, 10):
+        # The reset would be effectively done on the next position relative to reset.
+        # When available, we should test this behaviour with `startMessageIdInclusive` opt.
+        from_msg_idx = last_msg_idx
+        for i in range(from_msg_idx, num_of_msgs):
             msg = reader2.read_next()
             self.assertTrue(msg)
             self.assertEqual(msg.data(), b'hello-%d' % i)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 5548356..f329cd7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -124,6 +124,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     private final Map<String, String> metadata;
 
     private final boolean readCompacted;
+    private final boolean resetIncludeHead;
 
     private final SubscriptionInitialPosition subscriptionInitialPosition;
     private final ConnectionHandler connectionHandler;
@@ -187,6 +188,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         this.readCompacted = conf.isReadCompacted();
         this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
         this.negativeAcksTracker = new NegativeAcksTracker(this, conf);
+        this.resetIncludeHead = conf.isResetIncludeHead();
 
         if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
             stats = new ConsumerStatsRecorderImpl(client, conf, this);
@@ -795,9 +797,21 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         // if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
         // and return undecrypted payload
         if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {
-            final MessageImpl<T> message = new MessageImpl<>(topicName.toString(), msgId,
-                                                             msgMetadata, uncompressedPayload,
-                                                             createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount);
+
+            if (isNonDurableAndSameEntryAndLedger(messageId) && isPriorEntryIndex(messageId.getEntryId())) {
+                // We need to discard entries that were prior to startMessageId
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
+                            consumerName, startMessageId);
+                }
+
+                uncompressedPayload.release();
+                msgMetadata.recycle();
+                return;
+            }
+
+            final MessageImpl<T> message = new MessageImpl<>(topicName.toString(), msgId, msgMetadata,
+                    uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount);
             uncompressedPayload.release();
             msgMetadata.recycle();
 
@@ -938,15 +952,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
                         singleMessageMetadataBuilder, i, batchSize);
 
-                if (subscriptionMode == SubscriptionMode.NonDurable && startMessageId != null
-                        && messageId.getLedgerId() == startMessageId.getLedgerId()
-                        && messageId.getEntryId() == startMessageId.getEntryId()
-                        && i <= startMessageId.getBatchIndex()) {
+                if (isNonDurableAndSameEntryAndLedger(messageId) && isPriorBatchIndex(i)) {
                     // If we are receiving a batch message, we need to discard messages that were prior
                     // to the startMessageId
                     if (log.isDebugEnabled()) {
-                        log.debug("[{}] [{}] Ignoring message from before the startMessageId", subscription,
-                                consumerName);
+                        log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
+                                consumerName, startMessageId);
                     }
                     singleMessagePayload.release();
                     singleMessageMetadataBuilder.recycle();
@@ -954,6 +965,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     ++skippedMessages;
                     continue;
                 }
+
                 if (singleMessageMetadataBuilder.getCompactedOut()) {
                     // message has been compacted out, so don't send to the user
                     singleMessagePayload.release();
@@ -988,6 +1000,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName);
             discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError);
         }
+
         if (possibleToDeadLetter != null && possibleSendToDeadLetterTopicMessages != null) {
             possibleSendToDeadLetterTopicMessages.put(batchMessage, possibleToDeadLetter);
         }
@@ -1002,6 +1015,20 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         }
     }
 
+    private boolean isPriorEntryIndex(long idx) {
+        return resetIncludeHead ? idx < startMessageId.getEntryId() : idx <= startMessageId.getEntryId();
+    }
+
+    private boolean isPriorBatchIndex(long idx) {
+        return resetIncludeHead ? idx < startMessageId.getBatchIndex() : idx <= startMessageId.getBatchIndex();
+    }
+
+    private boolean isNonDurableAndSameEntryAndLedger(MessageIdData messageId) {
+        return subscriptionMode == SubscriptionMode.NonDurable && startMessageId != null
+                && messageId.getLedgerId() == startMessageId.getLedgerId()
+                && messageId.getEntryId() == startMessageId.getEntryId();
+    }
+
     /**
      * Record the event that one message has been processed by the application.
      *
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index 9c28e03..dc898cc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -115,6 +115,12 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> {
     }
 
     @Override
+    public ReaderBuilder<T> startMessageIdInclusive() {
+        conf.setResetIncludeHead(true);
+        return this;
+    }
+
+    @Override
     public ReaderBuilder<T> readerListener(ReaderListener<T> readerListener) {
         conf.setReaderListener(readerListener);
         return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 26a2ad7..71953b9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -49,10 +49,15 @@ public class ReaderImpl<T> implements Reader<T> {
         consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
         consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
         consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
+
         if (readerConfiguration.getReaderName() != null) {
             consumerConfiguration.setConsumerName(readerConfiguration.getReaderName());
         }
 
+        if (readerConfiguration.isResetIncludeHead()) {
+            consumerConfiguration.setResetIncludeHead(true);
+        }
+
         if (readerConfiguration.getReaderListener() != null) {
             ReaderListener<T> readerListener = readerConfiguration.getReaderListener();
             consumerConfiguration.setMessageListener(new MessageListener<T>() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 8aca7b2..bb7c92d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -101,6 +101,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
 
     private boolean replicateSubscriptionState = false;
 
+    private boolean resetIncludeHead = false;
+
     @JsonIgnore
     public String getSingleTopic() {
         checkArgument(topicNames.size() == 1);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
index 6645c9c..d6cf1cb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -47,6 +47,7 @@ public class ReaderConfigurationData<T> implements Serializable, Cloneable {
     private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
 
     private boolean readCompacted = false;
+    private boolean resetIncludeHead = false;
 
     @SuppressWarnings("unchecked")
     public ReaderConfigurationData<T> clone() {