You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/20 01:34:18 UTC
[pulsar] branch master updated: Fix read batching message by pulsar
reader (#3830)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9ab80c8 Fix read batching message by pulsar reader (#3830)
9ab80c8 is described below
commit 9ab80c8180d09e034d779444c4fb62b874d9720f
Author: penghui <co...@gmail.com>
AuthorDate: Wed Mar 20 09:34:12 2019 +0800
Fix read batching message by pulsar reader (#3830)
### Motivation
Fix bug of reader.hasMessageAvailable().
```java
while (reader.hasMessageAvailable()) {
Message msg = reader.readNext();
// Do something
}
```
If lastDequeuedMessage with a batchMessageId, reader.hasMessageAvailable() will return false after read first message of batchMessage, because compared message id is a MessageIdImpl(lastMessageIdInBroker)
### Modifications
Add batching message check.
### Verifying this change
- [x] This change added tests and can be verified as follows:
- *Added UT tests for read batch message and non-batch message*
---
.../org/apache/pulsar/client/impl/ReaderTest.java | 115 +++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 20 ++--
2 files changed, 129 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
new file mode 100644
index 0000000..41ed9cf
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+public class ReaderTest extends MockedPulsarServiceBaseTest {
+
+ private static final String subscription = "reader-sub";
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+
+ admin.clusters().createCluster("test",
+ new ClusterData("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT));
+ admin.tenants().createTenant("my-property",
+ new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+ admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+ }
+
+ @AfterMethod
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ private Set<String> publishMessages(String topic, int count, boolean enableBatch) throws Exception {
+ Set<String> keys = new HashSet<>();
+ ProducerBuilder<byte[]> builder = pulsarClient.newProducer();
+ builder.messageRoutingMode(MessageRoutingMode.SinglePartition);
+ builder.maxPendingMessages(count);
+ builder.topic(topic);
+ if (enableBatch) {
+ builder.enableBatching(true);
+ builder.batchingMaxMessages(count);
+ } else {
+ builder.enableBatching(false);
+ }
+ try (Producer<byte[]> producer = builder.create()) {
+ Future<?> lastFuture = null;
+ for (int i = 0; i < count; i++) {
+ String key = "key"+i;
+ byte[] data = ("my-message-" + i).getBytes();
+ lastFuture = producer.newMessage().key(key).value(data).sendAsync();
+ keys.add(key);
+ }
+ lastFuture.get();
+ }
+ return keys;
+ }
+
+ @Test
+ public void testReadMessageWithoutBatching() throws Exception {
+ String topic = "persistent://my-property/my-ns/my-reader-topic";
+ testReadMessages(topic, false);
+ }
+
+ @Test
+ public void testReadMessageWithBatching() throws Exception {
+ String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching";
+ testReadMessages(topic, true);
+ }
+
+ private void testReadMessages(String topic, boolean enableBatch) throws Exception {
+ int numKeys = 10;
+
+ Set<String> keys = publishMessages(topic, numKeys, enableBatch);
+ Reader<byte[]> reader = pulsarClient.newReader()
+ .topic(topic)
+ .startMessageId(MessageId.earliest)
+ .readerName(subscription)
+ .create();
+
+ while (reader.hasMessageAvailable()) {
+ Message<byte[]> message = reader.readNext();
+ Assert.assertTrue(keys.remove(message.getKey()));
+ }
+ Assert.assertTrue(keys.isEmpty());
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 499562f..a384d82 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1396,8 +1396,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
public boolean hasMessageAvailable() throws PulsarClientException {
try {
- if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
- ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+ if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) {
return true;
}
@@ -1410,14 +1409,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
public CompletableFuture<Boolean> hasMessageAvailableAsync() {
final CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>();
- if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
- ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+ if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) {
booleanFuture.complete(true);
} else {
getLastMessageIdAsync().thenAccept(messageId -> {
lastMessageIdInBroker = messageId;
- if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
- ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+ if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) {
booleanFuture.complete(true);
} else {
booleanFuture.complete(false);
@@ -1431,6 +1428,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
return booleanFuture;
}
+ private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId lastDequeuedMessage) {
+ if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+ ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+ return true;
+ } else {
+ // Make sure batching message can be read completely.
+ return lastMessageIdInBroker.compareTo(lastDequeuedMessage) == 0
+ && incomingMessages.size() > 0;
+ }
+ }
+
CompletableFuture<MessageId> getLastMessageIdAsync() {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil