You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/20 01:34:18 UTC

[pulsar] branch master updated: Fix read batching message by pulsar reader (#3830)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ab80c8  Fix read batching message by pulsar reader (#3830)
9ab80c8 is described below

commit 9ab80c8180d09e034d779444c4fb62b874d9720f
Author: penghui <co...@gmail.com>
AuthorDate: Wed Mar 20 09:34:12 2019 +0800

    Fix read batching message by pulsar reader (#3830)
    
    ### Motivation
    
    Fix bug of reader.hasMessageAvailable().
    
    ```java
    while (reader.hasMessageAvailable()) {
            Message msg = reader.readNext();
        // Do something
    }
    ```
    
    If lastDequeuedMessage with a batchMessageId, reader.hasMessageAvailable() will return false after read first message of batchMessage, because compared message id is a MessageIdImpl(lastMessageIdInBroker)
    
    ### Modifications
    
    Add batching message check.
    
    ### Verifying this change
    
    - [x] This change added tests and can be verified as follows:
    - *Added UT tests for read batch message and non-batch message*
---
 .../org/apache/pulsar/client/impl/ReaderTest.java  | 115 +++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  20 ++--
 2 files changed, 129 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
new file mode 100644
index 0000000..41ed9cf
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+public class ReaderTest extends MockedPulsarServiceBaseTest {
+
+    private static final String subscription = "reader-sub";
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+
+        admin.clusters().createCluster("test",
+                new ClusterData("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT));
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Set<String> publishMessages(String topic, int count, boolean enableBatch) throws Exception {
+        Set<String> keys = new HashSet<>();
+        ProducerBuilder<byte[]> builder = pulsarClient.newProducer();
+        builder.messageRoutingMode(MessageRoutingMode.SinglePartition);
+        builder.maxPendingMessages(count);
+        builder.topic(topic);
+        if (enableBatch) {
+            builder.enableBatching(true);
+            builder.batchingMaxMessages(count);
+        } else {
+            builder.enableBatching(false);
+        }
+        try (Producer<byte[]> producer = builder.create()) {
+            Future<?> lastFuture = null;
+            for (int i = 0; i < count; i++) {
+                String key = "key"+i;
+                byte[] data = ("my-message-" + i).getBytes();
+                lastFuture = producer.newMessage().key(key).value(data).sendAsync();
+                keys.add(key);
+            }
+            lastFuture.get();
+        }
+        return keys;
+    }
+
+    @Test
+    public void testReadMessageWithoutBatching() throws Exception {
+        String topic = "persistent://my-property/my-ns/my-reader-topic";
+        testReadMessages(topic, false);
+    }
+
+    @Test
+    public void testReadMessageWithBatching() throws Exception {
+        String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching";
+        testReadMessages(topic, true);
+    }
+
+    private void testReadMessages(String topic, boolean enableBatch) throws Exception {
+        int numKeys = 10;
+
+        Set<String> keys = publishMessages(topic, numKeys, enableBatch);
+        Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .readerName(subscription)
+                .create();
+
+        while (reader.hasMessageAvailable()) {
+            Message<byte[]> message = reader.readNext();
+            Assert.assertTrue(keys.remove(message.getKey()));
+        }
+        Assert.assertTrue(keys.isEmpty());
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 499562f..a384d82 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1396,8 +1396,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     public boolean hasMessageAvailable() throws PulsarClientException {
         try {
-            if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
-                ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+            if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) {
                 return true;
             }
 
@@ -1410,14 +1409,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     public CompletableFuture<Boolean> hasMessageAvailableAsync() {
         final CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>();
 
-        if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
-            ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+        if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) {
             booleanFuture.complete(true);
         } else {
             getLastMessageIdAsync().thenAccept(messageId -> {
                 lastMessageIdInBroker = messageId;
-                if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
-                    ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+                if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) {
                     booleanFuture.complete(true);
                 } else {
                     booleanFuture.complete(false);
@@ -1431,6 +1428,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         return booleanFuture;
     }
 
+    private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId lastDequeuedMessage) {
+        if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+                ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+            return true;
+        } else {
+            // Make sure batching message can be read completely.
+            return lastMessageIdInBroker.compareTo(lastDequeuedMessage) == 0
+                && incomingMessages.size() > 0;
+        }
+    }
+
     CompletableFuture<MessageId> getLastMessageIdAsync() {
         if (getState() == State.Closing || getState() == State.Closed) {
             return FutureUtil