You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:33:53 UTC

[pulsar] 07/38: fix_admin_getIndividualMsgsFromBatch_bug (#6715)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e75e0a06cc0ab3af38f88ba5544202b624e4d69d
Author: liudezhi <33...@users.noreply.github.com>
AuthorDate: Tue Apr 14 12:10:28 2020 +0800

    fix_admin_getIndividualMsgsFromBatch_bug (#6715)
    
    **Motivation**
    fix when get batch message from http response, only get the first message. #6714
    ```javascript
    for (int i = 0; i < batchSize; i++) {
                String batchMsgId = msgId + ":" + i;
                PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
                        .newBuilder();
                ByteBuf buf = Unpooled.wrappedBuffer(data); // here you need to move out of the loop
                try {
                    ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(buf, singleMessageMetadataBuilder, i,
                            batchSize);
                    SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.build();
                    if (singleMessageMetadata.getPropertiesCount() > 0) {
                        for (KeyValue entry : singleMessageMetadata.getPropertiesList()) {
                            properties.put(entry.getKey(), entry.getValue());
                        }
                    }
                    ret.add(new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload, Schema.BYTES));
                } catch (Exception ex) {
                    log.error("Exception occured while trying to get BatchMsgId: {}", batchMsgId, ex);
                }
                buf.release();
                singleMessageMetadataBuilder.recycle();
            }
    ```
    ByteBuf buf  need to move out of the loop
    
    **Changes**
    Replace old value with new value
    ```javascript
     ByteBuf buf = Unpooled.wrappedBuffer(data);
            for (int i = 0; i < batchSize; i++) {
                String batchMsgId = msgId + ":" + i;
                PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
                        .newBuilder();
                try {
                    ByteBuf singleMessagePayload =
                            Commands.deSerializeSingleMessageInBatch(buf, singleMessageMetadataBuilder, i, batchSize);
                    SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.build();
                    if (singleMessageMetadata.getPropertiesCount() > 0) {
                        for (KeyValue entry : singleMessageMetadata.getPropertiesList()) {
                            properties.put(entry.getKey(), entry.getValue());
                        }
                    }
                    ret.add(new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload, Schema.BYTES));
                } catch (Exception ex) {
                    log.error("Exception occured while trying to get BatchMsgId: {}", batchMsgId, ex);
                }
                singleMessageMetadataBuilder.recycle();
            }
            buf.release();
    ```
    (cherry picked from commit 565e3196e0be0b52f78137fddcb0cfde655ee010)
---
 .../pulsar/broker/admin/AdminTopicApiTest.java     | 103 +++++++++++++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   |   4 +-
 2 files changed, 105 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
new file mode 100644
index 0000000..2c4b7fc
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import lombok.Cleanup;
+
+import org.apache.pulsar.broker.admin.v2.PersistentTopics;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.core.Response;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class AdminTopicApiTest extends ProducerConsumerBase {
+    private static final Logger log = LoggerFactory.getLogger(AdminTopicApiTest.class);
+
+    @Override
+    @BeforeMethod
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    @AfterMethod
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testPeekMessages() throws Exception {
+        @Cleanup
+        PulsarClient newPulsarClient = PulsarClient.builder()
+            .serviceUrl(lookupUrl.toString())
+            .build();
+
+        final String topic = "persistent://my-property/my-ns/test-publish-timestamp";
+
+        @Cleanup
+        Consumer<byte[]> consumer = newPulsarClient.newConsumer()
+            .topic(topic)
+            .subscriptionName("my-sub")
+            .subscribe();
+
+        final int numMessages = 5;
+
+        @Cleanup
+        Producer<byte[]> producer = newPulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(true)
+                .batchingMaxPublishDelay(3, TimeUnit.SECONDS)
+                .batchingMaxMessages(5)
+                .create();
+
+        for (int i = 0; i < numMessages; i++) {
+            producer.newMessage()
+                .value(("value-" + i).getBytes(UTF_8))
+                .sendAsync();
+        }
+        producer.flush();
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<byte[]> msg = consumer.receive();
+            log.info("Received message '{}'.", new String(msg.getValue(), UTF_8));
+        }
+        List<Message<byte[]>> messages = admin.topics().peekMessages(topic, "my-sub", 5);
+        Assert.assertEquals(new String(messages.get(0).getValue(), UTF_8), "value-0");
+        Assert.assertEquals(new String(messages.get(1).getValue(), UTF_8), "value-1");
+        Assert.assertEquals(new String(messages.get(2).getValue(), UTF_8), "value-2");
+        Assert.assertEquals(new String(messages.get(3).getValue(), UTF_8), "value-3");
+        Assert.assertEquals(new String(messages.get(4).getValue(), UTF_8), "value-4");
+    }
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 3a1e685..0b9b3d6 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1011,11 +1011,11 @@ public class TopicsImpl extends BaseResource implements Topics {
                                                              Map<String, String> properties) {
         List<Message<byte[]>> ret = new ArrayList<>();
         int batchSize = Integer.parseInt(properties.get(BATCH_HEADER));
+        ByteBuf buf = Unpooled.wrappedBuffer(data);
         for (int i = 0; i < batchSize; i++) {
             String batchMsgId = msgId + ":" + i;
             PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
                     .newBuilder();
-            ByteBuf buf = Unpooled.wrappedBuffer(data);
             try {
                 ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(buf, singleMessageMetadataBuilder, i,
                         batchSize);
@@ -1029,9 +1029,9 @@ public class TopicsImpl extends BaseResource implements Topics {
             } catch (Exception ex) {
                 log.error("Exception occured while trying to get BatchMsgId: {}", batchMsgId, ex);
             }
-            buf.release();
             singleMessageMetadataBuilder.recycle();
         }
+        buf.release();
         return ret;
     }