You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:33:53 UTC
[pulsar] 07/38: fix_admin_getIndividualMsgsFromBatch_bug (#6715)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e75e0a06cc0ab3af38f88ba5544202b624e4d69d
Author: liudezhi <33...@users.noreply.github.com>
AuthorDate: Tue Apr 14 12:10:28 2020 +0800
fix_admin_getIndividualMsgsFromBatch_bug (#6715)
**Motivation**
fix when get batch message from http response, only get the first message. #6714
```javascript
for (int i = 0; i < batchSize; i++) {
String batchMsgId = msgId + ":" + i;
PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
.newBuilder();
ByteBuf buf = Unpooled.wrappedBuffer(data); // here you need to move out of the loop
try {
ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(buf, singleMessageMetadataBuilder, i,
batchSize);
SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.build();
if (singleMessageMetadata.getPropertiesCount() > 0) {
for (KeyValue entry : singleMessageMetadata.getPropertiesList()) {
properties.put(entry.getKey(), entry.getValue());
}
}
ret.add(new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload, Schema.BYTES));
} catch (Exception ex) {
log.error("Exception occured while trying to get BatchMsgId: {}", batchMsgId, ex);
}
buf.release();
singleMessageMetadataBuilder.recycle();
}
```
ByteBuf buf need to move out of the loop
**Changes**
Replace old value with new value
```javascript
ByteBuf buf = Unpooled.wrappedBuffer(data);
for (int i = 0; i < batchSize; i++) {
String batchMsgId = msgId + ":" + i;
PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
.newBuilder();
try {
ByteBuf singleMessagePayload =
Commands.deSerializeSingleMessageInBatch(buf, singleMessageMetadataBuilder, i, batchSize);
SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.build();
if (singleMessageMetadata.getPropertiesCount() > 0) {
for (KeyValue entry : singleMessageMetadata.getPropertiesList()) {
properties.put(entry.getKey(), entry.getValue());
}
}
ret.add(new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload, Schema.BYTES));
} catch (Exception ex) {
log.error("Exception occured while trying to get BatchMsgId: {}", batchMsgId, ex);
}
singleMessageMetadataBuilder.recycle();
}
buf.release();
```
(cherry picked from commit 565e3196e0be0b52f78137fddcb0cfde655ee010)
---
.../pulsar/broker/admin/AdminTopicApiTest.java | 103 +++++++++++++++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 4 +-
2 files changed, 105 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
new file mode 100644
index 0000000..2c4b7fc
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import lombok.Cleanup;
+
+import org.apache.pulsar.broker.admin.v2.PersistentTopics;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.core.Response;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class AdminTopicApiTest extends ProducerConsumerBase {
+ private static final Logger log = LoggerFactory.getLogger(AdminTopicApiTest.class);
+
+ @Override
+ @BeforeMethod
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @Override
+ @AfterMethod
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testPeekMessages() throws Exception {
+ @Cleanup
+ PulsarClient newPulsarClient = PulsarClient.builder()
+ .serviceUrl(lookupUrl.toString())
+ .build();
+
+ final String topic = "persistent://my-property/my-ns/test-publish-timestamp";
+
+ @Cleanup
+ Consumer<byte[]> consumer = newPulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("my-sub")
+ .subscribe();
+
+ final int numMessages = 5;
+
+ @Cleanup
+ Producer<byte[]> producer = newPulsarClient.newProducer()
+ .topic(topic)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(3, TimeUnit.SECONDS)
+ .batchingMaxMessages(5)
+ .create();
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.newMessage()
+ .value(("value-" + i).getBytes(UTF_8))
+ .sendAsync();
+ }
+ producer.flush();
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<byte[]> msg = consumer.receive();
+ log.info("Received message '{}'.", new String(msg.getValue(), UTF_8));
+ }
+ List<Message<byte[]>> messages = admin.topics().peekMessages(topic, "my-sub", 5);
+ Assert.assertEquals(new String(messages.get(0).getValue(), UTF_8), "value-0");
+ Assert.assertEquals(new String(messages.get(1).getValue(), UTF_8), "value-1");
+ Assert.assertEquals(new String(messages.get(2).getValue(), UTF_8), "value-2");
+ Assert.assertEquals(new String(messages.get(3).getValue(), UTF_8), "value-3");
+ Assert.assertEquals(new String(messages.get(4).getValue(), UTF_8), "value-4");
+ }
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 3a1e685..0b9b3d6 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1011,11 +1011,11 @@ public class TopicsImpl extends BaseResource implements Topics {
Map<String, String> properties) {
List<Message<byte[]>> ret = new ArrayList<>();
int batchSize = Integer.parseInt(properties.get(BATCH_HEADER));
+ ByteBuf buf = Unpooled.wrappedBuffer(data);
for (int i = 0; i < batchSize; i++) {
String batchMsgId = msgId + ":" + i;
PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
.newBuilder();
- ByteBuf buf = Unpooled.wrappedBuffer(data);
try {
ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(buf, singleMessageMetadataBuilder, i,
batchSize);
@@ -1029,9 +1029,9 @@ public class TopicsImpl extends BaseResource implements Topics {
} catch (Exception ex) {
log.error("Exception occured while trying to get BatchMsgId: {}", batchMsgId, ex);
}
- buf.release();
singleMessageMetadataBuilder.recycle();
}
+ buf.release();
return ret;
}