You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/07/02 18:46:56 UTC
[pulsar] branch master updated: Allows consumer retrieve the
sequence id that the producer set. (#4645)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d107f67 Allows consumer retrieve the sequence id that the producer set. (#4645)
d107f67 is described below
commit d107f67662f2c7dcd0d4023095ec5bfc21eecfa4
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Jul 3 02:46:50 2019 +0800
Allows consumer retrieve the sequence id that the producer set. (#4645)
* Allows consumer retrieve the sequence id that the producer set.
* fix comments.
---
.../pulsar/broker/service/BatchMessageTest.java | 64 ++++++++++++++++++++++
.../org/apache/pulsar/client/impl/MessageImpl.java | 4 ++
.../apache/pulsar/common/api/proto/PulsarApi.java | 57 +++++++++++++++++++
.../apache/pulsar/common/protocol/Commands.java | 4 ++
pulsar-common/src/main/proto/PulsarApi.proto | 2 +
5 files changed, 131 insertions(+)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 265e3ec..463837d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -737,5 +737,69 @@ public class BatchMessageTest extends BrokerTestBase {
producer.close();
}
+ @Test(dataProvider = "containerBuilder")
+ private void testRetrieveSequenceIdGenerated(BatcherBuilder builder) throws Exception {
+
+ int numMsgs = 10;
+ final String topicName = "persistent://prop/ns-abc/testRetrieveSequenceIdGenerated-" + UUID.randomUUID();
+ final String subscriptionName = "sub-1";
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared).subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+ .batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgs).enableBatching(true)
+ .batcherBuilder(builder)
+ .create();
+
+ List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
+ for (int i = 0; i < numMsgs; i++) {
+ byte[] message = ("my-message-" + i).getBytes();
+ sendFutureList.add(producer.sendAsync(message));
+ }
+ FutureUtil.waitForAll(sendFutureList).get();
+
+ for (int i = 0; i < numMsgs; i++) {
+ Message<byte[]> received = consumer.receive();
+ Assert.assertEquals(received.getSequenceId(), i);
+ consumer.acknowledge(received);
+ }
+
+ producer.close();
+ consumer.close();
+ }
+
+ @Test(dataProvider = "containerBuilder")
+ private void testRetrieveSequenceIdSpecify(BatcherBuilder builder) throws Exception {
+
+ int numMsgs = 10;
+ final String topicName = "persistent://prop/ns-abc/testRetrieveSequenceIdSpecify-" + UUID.randomUUID();
+ final String subscriptionName = "sub-1";
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared).subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+ .batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgs).enableBatching(true)
+ .batcherBuilder(builder)
+ .create();
+
+ List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
+ for (int i = 0; i < numMsgs; i++) {
+ byte[] message = ("my-message-" + i).getBytes();
+ sendFutureList.add(producer.newMessage().sequenceId(i + 100).value(message).sendAsync());
+ }
+ FutureUtil.waitForAll(sendFutureList).get();
+
+ for (int i = 0; i < numMsgs; i++) {
+ Message<byte[]> received = consumer.receive();
+ Assert.assertEquals(received.getSequenceId(), i + 100);
+ consumer.acknowledge(received);
+ }
+
+ producer.close();
+ consumer.close();
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(BatchMessageTest.class);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index fab8683..dea384f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -148,6 +148,10 @@ public class MessageImpl<T> implements Message<T> {
msgMetadataBuilder.setEventTime(singleMessageMetadata.getEventTime());
}
+ if (singleMessageMetadata.hasSequenceId()) {
+ msgMetadataBuilder.setSequenceId(singleMessageMetadata.getSequenceId());
+ }
+
this.schema = schema;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 3dcc367..4c156e5 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -4844,6 +4844,10 @@ public final class PulsarApi {
// optional bytes ordering_key = 7;
boolean hasOrderingKey();
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString getOrderingKey();
+
+ // optional uint64 sequence_id = 8;
+ boolean hasSequenceId();
+ long getSequenceId();
}
public static final class SingleMessageMetadata extends
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -4983,6 +4987,16 @@ public final class PulsarApi {
return orderingKey_;
}
+ // optional uint64 sequence_id = 8;
+ public static final int SEQUENCE_ID_FIELD_NUMBER = 8;
+ private long sequenceId_;
+ public boolean hasSequenceId() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ public long getSequenceId() {
+ return sequenceId_;
+ }
+
private void initFields() {
properties_ = java.util.Collections.emptyList();
partitionKey_ = "";
@@ -4991,6 +5005,7 @@ public final class PulsarApi {
eventTime_ = 0L;
partitionKeyB64Encoded_ = false;
orderingKey_ = org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+ sequenceId_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -5040,6 +5055,9 @@ public final class PulsarApi {
if (((bitField0_ & 0x00000020) == 0x00000020)) {
output.writeBytes(7, orderingKey_);
}
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ output.writeUInt64(8, sequenceId_);
+ }
}
private int memoizedSerializedSize = -1;
@@ -5076,6 +5094,10 @@ public final class PulsarApi {
size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
.computeBytesSize(7, orderingKey_);
}
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeUInt64Size(8, sequenceId_);
+ }
memoizedSerializedSize = size;
return size;
}
@@ -5203,6 +5225,8 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00000020);
orderingKey_ = org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000040);
+ sequenceId_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000080);
return this;
}
@@ -5265,6 +5289,10 @@ public final class PulsarApi {
to_bitField0_ |= 0x00000020;
}
result.orderingKey_ = orderingKey_;
+ if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+ to_bitField0_ |= 0x00000040;
+ }
+ result.sequenceId_ = sequenceId_;
result.bitField0_ = to_bitField0_;
return result;
}
@@ -5299,6 +5327,9 @@ public final class PulsarApi {
if (other.hasOrderingKey()) {
setOrderingKey(other.getOrderingKey());
}
+ if (other.hasSequenceId()) {
+ setSequenceId(other.getSequenceId());
+ }
return this;
}
@@ -5374,6 +5405,11 @@ public final class PulsarApi {
orderingKey_ = input.readBytes();
break;
}
+ case 64: {
+ bitField0_ |= 0x00000080;
+ sequenceId_ = input.readUInt64();
+ break;
+ }
}
}
}
@@ -5613,6 +5649,27 @@ public final class PulsarApi {
return this;
}
+ // optional uint64 sequence_id = 8;
+ private long sequenceId_ ;
+ public boolean hasSequenceId() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ public long getSequenceId() {
+ return sequenceId_;
+ }
+ public Builder setSequenceId(long value) {
+ bitField0_ |= 0x00000080;
+ sequenceId_ = value;
+
+ return this;
+ }
+ public Builder clearSequenceId() {
+ bitField0_ = (bitField0_ & ~0x00000080);
+ sequenceId_ = 0L;
+
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:pulsar.proto.SingleMessageMetadata)
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index ba801d9..5af961c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1170,6 +1170,10 @@ public class Commands {
singleMessageMetadataBuilder.setEventTime(msgBuilder.getEventTime());
}
+ if (msgBuilder.hasSequenceId()) {
+ singleMessageMetadataBuilder.setSequenceId(msgBuilder.getSequenceId());
+ }
+
try {
return serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder, payload, batchBuffer);
} finally {
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index d6db53a..b9c7754 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -137,6 +137,8 @@ message SingleMessageMetadata {
optional bool partition_key_b64_encoded = 6 [ default = false ];
// Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode.
optional bytes ordering_key = 7;
+ // Allows consumer retrieve the sequence id that the producer set.
+ optional uint64 sequence_id = 8;
}
enum ServerError {