You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/06/15 03:27:05 UTC
[pulsar] branch master updated: Feature - reset cursor on Reader to
current position (#4331)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1c51adc Feature - reset cursor on Reader to current position (#4331)
1c51adc is described below
commit 1c51adc7114ab7aaf1c54e6e022920ed59f56cfa
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Sat Jun 15 00:26:59 2019 -0300
Feature - reset cursor on Reader to current position (#4331)
* Feature - reset cursor on Reader to current position
*Motivation*
There are some cases in which is it useful to be able to include current
position of message when reset of cursor was made.
This was reported by a `vvy` on slack, no issue has been created to track this.
*Modifications*
- Add startMessageIdInclusive() to support include current position of
reset on ReaderBuilder.
- Add resetIncludeHead field for Reader and Consumer Configuration Data
- Fix position of cursor for non durable consumer.
- Improve discard if statement for batch enable mode.
- Add discard if statement for batch disable mode.
- Improve test case for latest Reader seek.
- Add test case to assert the start of specific message id at the expected
position with data provider scenarios:
A. Batch enable and start inclusive enable.
B. Batch enable and start inclusive disable.
C. Batch disable and start inclusive enable.
D. Batch disable and start inclusive disable.
---
.../broker/service/persistent/PersistentTopic.java | 9 +-
.../apache/pulsar/client/api/TopicReaderTest.java | 126 +++++++++++++++++----
.../apache/pulsar/client/api/ReaderBuilder.java | 14 +++
pulsar-client-cpp/python/pulsar_test.py | 13 ++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 45 ++++++--
.../pulsar/client/impl/ReaderBuilderImpl.java | 6 +
.../org/apache/pulsar/client/impl/ReaderImpl.java | 5 +
.../impl/conf/ConsumerConfigurationData.java | 2 +
.../client/impl/conf/ReaderConfigurationData.java | 1 +
9 files changed, 181 insertions(+), 40 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1110b29..0f132c0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -547,7 +547,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
future.completeExceptionally(e);
}
}).exceptionally(ex -> {
- log.warn("[{}] Failed to create subscription for {}: ", topic, subscriptionName, ex.getMessage());
+ log.warn("[{}] Failed to create subscription: {} error: {}", topic, subscriptionName, ex.getMessage());
USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this);
future.completeExceptionally(new PersistenceException(ex));
return null;
@@ -607,11 +607,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
if (msgId instanceof BatchMessageIdImpl) {
// When the start message is relative to a batch, we need to take one step back on the previous message,
// because the "batch" might not have been consumed in its entirety.
- // The client will then be able to discard the first messages in the batch.
- if (((BatchMessageIdImpl) msgId).getBatchIndex() >= 0) {
- entryId = msgId.getEntryId() - 1;
- }
+ // The client will then be able to discard the first messages if needed.
+ entryId = msgId.getEntryId() - 1;
}
+
Position startPosition = new PositionImpl(ledgerId, entryId);
ManagedCursor cursor = null;
try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 4bfa589..53ff341 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -27,10 +27,7 @@ import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
@@ -42,6 +39,7 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class TopicReaderTest extends ProducerConsumerBase {
@@ -60,6 +58,31 @@ public class TopicReaderTest extends ProducerConsumerBase {
super.internalCleanup();
}
+ @DataProvider
+ public static Object[][] variationsForExpectedPos() {
+ return new Object[][] {
+ // batching / start-inclusive / num-of-messages
+ {true, true, 10 },
+ {true, false, 10 },
+ {false, true, 10 },
+ {false, false, 10 },
+
+ {true, true, 100 },
+ {true, false, 100 },
+ {false, true, 100 },
+ {false, false, 100 },
+ };
+ }
+
+ @DataProvider
+ public static Object[][] variationsForResetOnLatestMsg() {
+ return new Object[][] {
+ // start-inclusive / num-of-messages
+ {true, 20},
+ {false, 20}
+ };
+ }
+
@Test
public void testSimpleReader() throws Exception {
Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReader")
@@ -178,36 +201,46 @@ public class TopicReaderTest extends ProducerConsumerBase {
assertEquals(stats.subscriptions.size(), 0);
}
- @Test
- public void testReaderOnLastMessage() throws Exception {
- Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderOnLastMessage")
+ @Test(dataProvider = "variationsForResetOnLatestMsg")
+ public void testReaderOnLatestMessage(boolean startInclusive, int numOfMessages) throws Exception {
+ final String topicName = "persistent://my-property/my-ns/ReaderOnLatestMessage";
+ final int halfOfMsgs = numOfMessages / 2;
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
.create();
- for (int i = 0; i < 10; i++) {
- String message = "my-message-" + i;
- producer.send(message.getBytes());
+
+ for (int i = 0; i < halfOfMsgs; i++) {
+ producer.send(String.format("my-message-%d", i).getBytes());
}
- Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderOnLastMessage")
- .startMessageId(MessageId.latest).create();
+ ReaderBuilder<byte[]> readerBuilder = pulsarClient.newReader()
+ .topic(topicName)
+ .startMessageId(MessageId.latest);
- for (int i = 10; i < 20; i++) {
- String message = "my-message-" + i;
- producer.send(message.getBytes());
+ if (startInclusive) {
+ readerBuilder.startMessageIdInclusive();
}
- // Publish more messages and verify the readers only sees new messages
+ Reader<byte[]> reader = readerBuilder.create();
- Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
- for (int i = 10; i < 20; i++) {
- msg = reader.readNext(1, TimeUnit.SECONDS);
+ for (int i = halfOfMsgs; i < numOfMessages; i++) {
+ producer.send(String.format("my-message-%d", i).getBytes());
+ }
- String receivedMessage = new String(msg.getData());
- log.debug("Received message: [{}]", receivedMessage);
- String expectedMessage = "my-message-" + i;
+ // Publish more messages and verify the readers only sees new messages
+ Set<String> messageSet = Sets.newHashSet();
+ for (int i = halfOfMsgs; i < numOfMessages; i++) {
+ Message<byte[]> message = reader.readNext();
+ String receivedMessage = new String(message.getData());
+ String expectedMessage = String.format("my-message-%d", i);
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
+ assertTrue(reader.isConnected());
+ assertEquals(((ReaderImpl) reader).getConsumer().numMessagesInQueue(), 0);
+ assertEquals(messageSet.size(), halfOfMsgs);
+
// Acknowledge the consumption of all messages at once
reader.close();
producer.close();
@@ -650,4 +683,51 @@ public class TopicReaderTest extends ProducerConsumerBase {
reader.close();
producer.close();
}
+
+ @Test(dataProvider = "variationsForExpectedPos")
+ public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages)
+ throws Exception {
+ final String topicName = "persistent://my-property/my-ns/ReaderStartMessageIdAtExpectedPos";
+ final int resetIndex = new Random().nextInt(numOfMessages); // Choose some random index to reset
+ final int firstMessage = startInclusive ? resetIndex : resetIndex + 1; // First message of reset
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(batching)
+ .create();
+
+ MessageId resetPos = null;
+ for (int i = 0; i < numOfMessages; i++) {
+ MessageId msgId = producer.send(String.format("msg num %d", i).getBytes());
+ if (resetIndex == i) {
+ resetPos = msgId;
+ }
+ }
+
+ ReaderBuilder<byte[]> readerBuilder = pulsarClient.newReader()
+ .topic(topicName)
+ .startMessageId(resetPos);
+
+ if (startInclusive) {
+ readerBuilder.startMessageIdInclusive();
+ }
+
+ Reader<byte[]> reader = readerBuilder.create();
+ Set<String> messageSet = Sets.newHashSet();
+ for (int i = firstMessage; i < numOfMessages; i++) {
+ Message<byte[]> message = reader.readNext();
+ String receivedMessage = new String(message.getData());
+ String expectedMessage = String.format("msg num %d", i);
+ testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+ }
+
+ assertTrue(reader.isConnected());
+ assertEquals(((ReaderImpl) reader).getConsumer().numMessagesInQueue(), 0);
+
+ // Processed messages should be the number of messages in the range: [FirstResetMessage..TotalNumOfMessages]
+ assertEquals(messageSet.size(), numOfMessages - firstMessage);
+
+ reader.close();
+ producer.close();
+ }
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
index 792cf7b..8793eac 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -108,6 +108,7 @@ public interface ReaderBuilder<T> extends Cloneable {
/**
* The initial reader positioning is done by specifying a message id. The options are:
+ * <p>
* <ul>
* <li>{@link MessageId#earliest}: Start reading from the earliest message available in the topic</li>
* <li>{@link MessageId#latest}: Start reading from end of the topic. The first message read will be the one
@@ -116,12 +117,25 @@ public interface ReaderBuilder<T> extends Cloneable {
* immediately <b>*after*</b> the specified message</li>
* </ul>
*
+ * <p>
+ * If the first message <b>*after*</b> the specified message is not the desired behaviour, use
+ * {@link ReaderBuilder#startMessageIdInclusive()}.
+ *
* @param startMessageId the message id where the reader will be initially positioned on
* @return the reader builder instance
*/
ReaderBuilder<T> startMessageId(MessageId startMessageId);
/**
+ * Set the reader to include the given position of {@link ReaderBuilder#startMessageId(MessageId)}
+ * <p>
+ * This configuration option also applies for any cursor reset operation like {@link Reader#seek(MessageId)}.
+ *
+ * @return the reader builder instance
+ */
+ ReaderBuilder<T> startMessageIdInclusive();
+
+ /**
* Sets a {@link ReaderListener} for the reader
* <p>
* When a {@link ReaderListener} is set, application will receive messages through it. Calls to
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index 6e4a14b..36d7fca 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -370,26 +370,33 @@ class PulsarTest(TestCase):
client.close()
def test_reader_on_specific_message(self):
+ num_of_msgs = 10
client = Client(self.serviceUrl)
producer = client.create_producer(
'my-python-topic-reader-on-specific-message')
- for i in range(10):
+ for i in range(num_of_msgs):
producer.send(b'hello-%d' % i)
reader1 = client.create_reader(
'my-python-topic-reader-on-specific-message',
MessageId.earliest)
- for i in range(5):
+ for i in range(num_of_msgs/2):
msg = reader1.read_next()
+ self.assertTrue(msg)
+ self.assertEqual(msg.data(), b'hello-%d' % i)
last_msg_id = msg.message_id()
+ last_msg_idx = i
reader2 = client.create_reader(
'my-python-topic-reader-on-specific-message',
last_msg_id)
- for i in range(5, 10):
+ # The reset would be effectively done on the next position relative to reset.
+ # When available, we should test this behaviour with `startMessageIdInclusive` opt.
+ from_msg_idx = last_msg_idx
+ for i in range(from_msg_idx, num_of_msgs):
msg = reader2.read_next()
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello-%d' % i)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 5548356..f329cd7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -124,6 +124,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final Map<String, String> metadata;
private final boolean readCompacted;
+ private final boolean resetIncludeHead;
private final SubscriptionInitialPosition subscriptionInitialPosition;
private final ConnectionHandler connectionHandler;
@@ -187,6 +188,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
this.readCompacted = conf.isReadCompacted();
this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
this.negativeAcksTracker = new NegativeAcksTracker(this, conf);
+ this.resetIncludeHead = conf.isResetIncludeHead();
if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
stats = new ConsumerStatsRecorderImpl(client, conf, this);
@@ -795,9 +797,21 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
// if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
// and return undecrypted payload
if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {
- final MessageImpl<T> message = new MessageImpl<>(topicName.toString(), msgId,
- msgMetadata, uncompressedPayload,
- createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount);
+
+ if (isNonDurableAndSameEntryAndLedger(messageId) && isPriorEntryIndex(messageId.getEntryId())) {
+ // We need to discard entries that were prior to startMessageId
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
+ consumerName, startMessageId);
+ }
+
+ uncompressedPayload.release();
+ msgMetadata.recycle();
+ return;
+ }
+
+ final MessageImpl<T> message = new MessageImpl<>(topicName.toString(), msgId, msgMetadata,
+ uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount);
uncompressedPayload.release();
msgMetadata.recycle();
@@ -938,15 +952,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
singleMessageMetadataBuilder, i, batchSize);
- if (subscriptionMode == SubscriptionMode.NonDurable && startMessageId != null
- && messageId.getLedgerId() == startMessageId.getLedgerId()
- && messageId.getEntryId() == startMessageId.getEntryId()
- && i <= startMessageId.getBatchIndex()) {
+ if (isNonDurableAndSameEntryAndLedger(messageId) && isPriorBatchIndex(i)) {
// If we are receiving a batch message, we need to discard messages that were prior
// to the startMessageId
if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Ignoring message from before the startMessageId", subscription,
- consumerName);
+ log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
+ consumerName, startMessageId);
}
singleMessagePayload.release();
singleMessageMetadataBuilder.recycle();
@@ -954,6 +965,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
++skippedMessages;
continue;
}
+
if (singleMessageMetadataBuilder.getCompactedOut()) {
// message has been compacted out, so don't send to the user
singleMessagePayload.release();
@@ -988,6 +1000,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName);
discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError);
}
+
if (possibleToDeadLetter != null && possibleSendToDeadLetterTopicMessages != null) {
possibleSendToDeadLetterTopicMessages.put(batchMessage, possibleToDeadLetter);
}
@@ -1002,6 +1015,20 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
}
+ private boolean isPriorEntryIndex(long idx) {
+ return resetIncludeHead ? idx < startMessageId.getEntryId() : idx <= startMessageId.getEntryId();
+ }
+
+ private boolean isPriorBatchIndex(long idx) {
+ return resetIncludeHead ? idx < startMessageId.getBatchIndex() : idx <= startMessageId.getBatchIndex();
+ }
+
+ private boolean isNonDurableAndSameEntryAndLedger(MessageIdData messageId) {
+ return subscriptionMode == SubscriptionMode.NonDurable && startMessageId != null
+ && messageId.getLedgerId() == startMessageId.getLedgerId()
+ && messageId.getEntryId() == startMessageId.getEntryId();
+ }
+
/**
* Record the event that one message has been processed by the application.
*
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index 9c28e03..dc898cc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -115,6 +115,12 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> {
}
@Override
+ public ReaderBuilder<T> startMessageIdInclusive() {
+ conf.setResetIncludeHead(true);
+ return this;
+ }
+
+ @Override
public ReaderBuilder<T> readerListener(ReaderListener<T> readerListener) {
conf.setReaderListener(readerListener);
return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 26a2ad7..71953b9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -49,10 +49,15 @@ public class ReaderImpl<T> implements Reader<T> {
consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
+
if (readerConfiguration.getReaderName() != null) {
consumerConfiguration.setConsumerName(readerConfiguration.getReaderName());
}
+ if (readerConfiguration.isResetIncludeHead()) {
+ consumerConfiguration.setResetIncludeHead(true);
+ }
+
if (readerConfiguration.getReaderListener() != null) {
ReaderListener<T> readerListener = readerConfiguration.getReaderListener();
consumerConfiguration.setMessageListener(new MessageListener<T>() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 8aca7b2..bb7c92d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -101,6 +101,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
private boolean replicateSubscriptionState = false;
+ private boolean resetIncludeHead = false;
+
@JsonIgnore
public String getSingleTopic() {
checkArgument(topicNames.size() == 1);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
index 6645c9c..d6cf1cb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -47,6 +47,7 @@ public class ReaderConfigurationData<T> implements Serializable, Cloneable {
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
private boolean readCompacted = false;
+ private boolean resetIncludeHead = false;
@SuppressWarnings("unchecked")
public ReaderConfigurationData<T> clone() {