You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/10/26 17:53:13 UTC
[pulsar] branch master updated: Fix Websocket Consume Messages in
Partitioned Topics (#2829)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d79499d Fix Websocket Consume Messages in Partitioned Topics (#2829)
d79499d is described below
commit d79499dbe24fbd8b065489875ba86a47761ec33f
Author: Yuto Furuta <mz...@gmail.com>
AuthorDate: Sat Oct 27 02:53:09 2018 +0900
Fix Websocket Consume Messages in Partitioned Topics (#2829)
* fix consume messages in partitioned topics on websocket
* add consumeMessagesInPartitionedTopicTest
* add fromByteArrayWithTopic
* remove public
---
.../websocket/proxy/ProxyPublishConsumeTest.java | 43 +++++++++++++++++++++-
.../org/apache/pulsar/client/api/MessageId.java | 5 +++
.../apache/pulsar/client/impl/MessageIdImpl.java | 31 ++++++++++++++++
.../apache/pulsar/websocket/ConsumerHandler.java | 3 +-
4 files changed, 80 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index e084b04..7f1b5aa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -358,7 +358,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
+ "/my-sub?subscriptionType=Failover";
final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic + "/";
final String readerUri = "ws://localhost:" + port + "/ws/v2/reader/persistent/" + topic;
- System.out.println(consumerUri+", "+producerUri);
+ System.out.println(consumerUri + ", " + producerUri);
URI consumeUri = URI.create(consumerUri);
URI produceUri = URI.create(producerUri);
URI readUri = URI.create(readerUri);
@@ -424,6 +424,47 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
}
}
+ @Test(timeOut = 10000)
+ public void consumeMessagesInPartitionedTopicTest() throws Exception {
+ final String namespace = "my-property/my-ns";
+ final String topic = namespace + "/" + "my-topic7";
+ admin.topics().createPartitionedTopic("persistent://" + topic, 3);
+
+ final String subscription = "my-sub";
+ final String consumerUri = "ws://localhost:" + port + "/ws/v2/consumer/persistent/" + topic + "/" + subscription;
+ final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic;
+
+ URI consumeUri = URI.create(consumerUri);
+ URI produceUri = URI.create(producerUri);
+
+ WebSocketClient consumeClient = new WebSocketClient();
+ WebSocketClient produceClient = new WebSocketClient();
+
+ SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+ SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+ try {
+ produceClient.start();
+ ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+ Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
+ producerFuture.get();
+ produceSocket.sendMessage(100);
+ } finally {
+ stopWebSocketClient(produceClient);
+ }
+
+ Thread.sleep(500);
+
+ try {
+ consumeClient.start();
+ ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
+ Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
+ consumerFuture.get();
+ } finally {
+ stopWebSocketClient(consumeClient);
+ }
+ }
+
private void verifyTopicStat(Client client, String baseUrl, String topic) {
String statUrl = baseUrl + topic + "/stats";
WebTarget webTarget = client.target(statUrl);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
index 1bb3e08..4d38aa9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.Serializable;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
/**
* Opaque unique identifier of a single message
@@ -49,6 +50,10 @@ public interface MessageId extends Comparable<MessageId>, Serializable {
return MessageIdImpl.fromByteArray(data);
}
+ public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName) throws IOException {
+ return MessageIdImpl.fromByteArrayWithTopic(data, topicName);
+ }
+
public static final MessageId earliest = new MessageIdImpl(-1, -1, -1);
public static final MessageId latest = new MessageIdImpl(Long.MAX_VALUE, Long.MAX_VALUE, -1);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
index 3686b12..5a53436 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
import org.apache.pulsar.shaded.com.google.protobuf.v241.UninitializedMessageException;
@@ -113,6 +114,36 @@ public class MessageIdImpl implements MessageId {
return messageId;
}
+ public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName) throws IOException {
+ checkNotNull(data);
+ ByteBufCodedInputStream inputStream = ByteBufCodedInputStream.get(Unpooled.wrappedBuffer(data, 0, data.length));
+ PulsarApi.MessageIdData.Builder builder = PulsarApi.MessageIdData.newBuilder();
+
+ PulsarApi.MessageIdData idData;
+ try {
+ idData = builder.mergeFrom(inputStream, null).build();
+ } catch (UninitializedMessageException e) {
+ throw new IOException(e);
+ }
+
+ MessageId messageId;
+ if (idData.hasBatchIndex()) {
+ messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
+ idData.getBatchIndex());
+ } else {
+ messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition());
+ }
+ if (idData.getPartition() > -1 && topicName != null) {
+ messageId = new TopicMessageIdImpl(
+ topicName.getPartition(idData.getPartition()).toString(), topicName.toString(), messageId);
+ }
+
+ inputStream.recycle();
+ builder.recycle();
+ idData.recycle();
+ return messageId;
+ }
+
// batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message
protected byte[] toByteArray(int batchIndex) {
MessageIdData.Builder builder = MessageIdData.newBuilder();
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index cb21f6f..740a4e1 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException
import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerAck;
@@ -220,7 +221,7 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
MessageId msgId;
try {
ConsumerAck ack = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerAck.class);
- msgId = MessageId.fromByteArray(Base64.getDecoder().decode(ack.messageId));
+ msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(ack.messageId), topic);
} catch (IOException e) {
log.warn("Failed to deserialize message id: {}", message, e);
close(WebSocketError.FailedToDeserializeFromJSON);