You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/10/26 17:53:13 UTC

[pulsar] branch master updated: Fix Websocket Consume Messages in Partitioned Topics (#2829)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d79499d  Fix Websocket Consume Messages in Partitioned Topics (#2829)
d79499d is described below

commit d79499dbe24fbd8b065489875ba86a47761ec33f
Author: Yuto Furuta <mz...@gmail.com>
AuthorDate: Sat Oct 27 02:53:09 2018 +0900

    Fix Websocket Consume Messages in Partitioned Topics (#2829)
    
    * fix consume messages in partitioned topics on websocket
    
    * add consumeMessagesInPartitionedTopicTest
    
    * add fromByteArrayWithTopic
    
    * remove public
---
 .../websocket/proxy/ProxyPublishConsumeTest.java   | 43 +++++++++++++++++++++-
 .../org/apache/pulsar/client/api/MessageId.java    |  5 +++
 .../apache/pulsar/client/impl/MessageIdImpl.java   | 31 ++++++++++++++++
 .../apache/pulsar/websocket/ConsumerHandler.java   |  3 +-
 4 files changed, 80 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index e084b04..7f1b5aa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -358,7 +358,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
                 + "/my-sub?subscriptionType=Failover";
         final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic + "/";
         final String readerUri = "ws://localhost:" + port + "/ws/v2/reader/persistent/" + topic;
-        System.out.println(consumerUri+", "+producerUri);
+        System.out.println(consumerUri + ", " + producerUri);
         URI consumeUri = URI.create(consumerUri);
         URI produceUri = URI.create(producerUri);
         URI readUri = URI.create(readerUri);
@@ -424,6 +424,47 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
         }
     }
 
+    @Test(timeOut = 10000)
+    public void consumeMessagesInPartitionedTopicTest() throws Exception {
+        final String namespace = "my-property/my-ns";
+        final String topic = namespace + "/" + "my-topic7";
+        admin.topics().createPartitionedTopic("persistent://" + topic, 3);
+
+        final String subscription = "my-sub";
+        final String consumerUri = "ws://localhost:" + port + "/ws/v2/consumer/persistent/" + topic + "/" + subscription;
+        final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic;
+
+        URI consumeUri = URI.create(consumerUri);
+        URI produceUri = URI.create(producerUri);
+
+        WebSocketClient consumeClient = new WebSocketClient();
+        WebSocketClient produceClient = new WebSocketClient();
+
+        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+        try {
+            produceClient.start();
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
+            producerFuture.get();
+            produceSocket.sendMessage(100);
+        } finally {
+            stopWebSocketClient(produceClient);
+        }
+
+        Thread.sleep(500);
+
+        try {
+            consumeClient.start();
+            ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
+            Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
+            consumerFuture.get();
+        } finally {
+            stopWebSocketClient(consumeClient);
+        }
+    }
+
     private void verifyTopicStat(Client client, String baseUrl, String topic) {
         String statUrl = baseUrl + topic + "/stats";
         WebTarget webTarget = client.target(statUrl);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
index 1bb3e08..4d38aa9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import java.io.Serializable;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
 
 /**
  * Opaque unique identifier of a single message
@@ -49,6 +50,10 @@ public interface MessageId extends Comparable<MessageId>, Serializable {
         return MessageIdImpl.fromByteArray(data);
     }
 
+    public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName) throws IOException {
+        return MessageIdImpl.fromByteArrayWithTopic(data, topicName);
+    }
+
     public static final MessageId earliest = new MessageIdImpl(-1, -1, -1);
 
     public static final MessageId latest = new MessageIdImpl(Long.MAX_VALUE, Long.MAX_VALUE, -1);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
index 3686b12..5a53436 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
 import org.apache.pulsar.shaded.com.google.protobuf.v241.UninitializedMessageException;
@@ -113,6 +114,36 @@ public class MessageIdImpl implements MessageId {
         return messageId;
     }
 
+    public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName) throws IOException {
+        checkNotNull(data);
+        ByteBufCodedInputStream inputStream = ByteBufCodedInputStream.get(Unpooled.wrappedBuffer(data, 0, data.length));
+        PulsarApi.MessageIdData.Builder builder = PulsarApi.MessageIdData.newBuilder();
+
+        PulsarApi.MessageIdData idData;
+        try {
+            idData = builder.mergeFrom(inputStream, null).build();
+        } catch (UninitializedMessageException e) {
+            throw new IOException(e);
+        }
+
+        MessageId messageId;
+        if (idData.hasBatchIndex()) {
+            messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
+                    idData.getBatchIndex());
+        } else {
+            messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition());
+        }
+        if (idData.getPartition() > -1 && topicName != null) {
+            messageId = new TopicMessageIdImpl(
+                    topicName.getPartition(idData.getPartition()).toString(), topicName.toString(), messageId);
+        }
+
+        inputStream.recycle();
+        builder.recycle();
+        idData.recycle();
+        return messageId;
+    }
+
     // batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message
     protected byte[] toByteArray(int batchIndex) {
         MessageIdData.Builder builder = MessageIdData.newBuilder();
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index cb21f6f..740a4e1 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException
 import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ConsumerAck;
@@ -220,7 +221,7 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
         MessageId msgId;
         try {
             ConsumerAck ack = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerAck.class);
-            msgId = MessageId.fromByteArray(Base64.getDecoder().decode(ack.messageId));
+            msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(ack.messageId), topic);
         } catch (IOException e) {
             log.warn("Failed to deserialize message id: {}", message, e);
             close(WebSocketError.FailedToDeserializeFromJSON);