You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/07/02 18:46:56 UTC

[pulsar] branch master updated: Allows consumer retrieve the sequence id that the producer set. (#4645)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d107f67  Allows consumer retrieve the sequence id that the producer set. (#4645)
d107f67 is described below

commit d107f67662f2c7dcd0d4023095ec5bfc21eecfa4
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Jul 3 02:46:50 2019 +0800

    Allows consumer retrieve the sequence id that the producer set. (#4645)
    
    * Allows consumer retrieve the sequence id that the producer set.
    
    * fix comments.
---
 .../pulsar/broker/service/BatchMessageTest.java    | 64 ++++++++++++++++++++++
 .../org/apache/pulsar/client/impl/MessageImpl.java |  4 ++
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 57 +++++++++++++++++++
 .../apache/pulsar/common/protocol/Commands.java    |  4 ++
 pulsar-common/src/main/proto/PulsarApi.proto       |  2 +
 5 files changed, 131 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 265e3ec..463837d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -737,5 +737,69 @@ public class BatchMessageTest extends BrokerTestBase {
         producer.close();
     }
 
+    @Test(dataProvider = "containerBuilder")
+    private void testRetrieveSequenceIdGenerated(BatcherBuilder builder) throws Exception {
+
+        int numMsgs = 10;
+        final String topicName = "persistent://prop/ns-abc/testRetrieveSequenceIdGenerated-" + UUID.randomUUID();
+        final String subscriptionName = "sub-1";
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared).subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgs).enableBatching(true)
+                .batcherBuilder(builder)
+                .create();
+
+        List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
+        for (int i = 0; i < numMsgs; i++) {
+            byte[] message = ("my-message-" + i).getBytes();
+            sendFutureList.add(producer.sendAsync(message));
+        }
+        FutureUtil.waitForAll(sendFutureList).get();
+
+        for (int i = 0; i < numMsgs; i++) {
+            Message<byte[]> received = consumer.receive();
+            Assert.assertEquals(received.getSequenceId(), i);
+            consumer.acknowledge(received);
+        }
+
+        producer.close();
+        consumer.close();
+    }
+
+    @Test(dataProvider = "containerBuilder")
+    private void testRetrieveSequenceIdSpecify(BatcherBuilder builder) throws Exception {
+
+        int numMsgs = 10;
+        final String topicName = "persistent://prop/ns-abc/testRetrieveSequenceIdSpecify-" + UUID.randomUUID();
+        final String subscriptionName = "sub-1";
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared).subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgs).enableBatching(true)
+                .batcherBuilder(builder)
+                .create();
+
+        List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
+        for (int i = 0; i < numMsgs; i++) {
+            byte[] message = ("my-message-" + i).getBytes();
+            sendFutureList.add(producer.newMessage().sequenceId(i + 100).value(message).sendAsync());
+        }
+        FutureUtil.waitForAll(sendFutureList).get();
+
+        for (int i = 0; i < numMsgs; i++) {
+            Message<byte[]> received = consumer.receive();
+            Assert.assertEquals(received.getSequenceId(), i + 100);
+            consumer.acknowledge(received);
+        }
+
+        producer.close();
+        consumer.close();
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(BatchMessageTest.class);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index fab8683..dea384f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -148,6 +148,10 @@ public class MessageImpl<T> implements Message<T> {
             msgMetadataBuilder.setEventTime(singleMessageMetadata.getEventTime());
         }
 
+        if (singleMessageMetadata.hasSequenceId()) {
+            msgMetadataBuilder.setSequenceId(singleMessageMetadata.getSequenceId());
+        }
+
         this.schema = schema;
     }
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 3dcc367..4c156e5 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -4844,6 +4844,10 @@ public final class PulsarApi {
     // optional bytes ordering_key = 7;
     boolean hasOrderingKey();
     org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString getOrderingKey();
+    
+    // optional uint64 sequence_id = 8;
+    boolean hasSequenceId();
+    long getSequenceId();
   }
   public static final class SingleMessageMetadata extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -4983,6 +4987,16 @@ public final class PulsarApi {
       return orderingKey_;
     }
     
+    // optional uint64 sequence_id = 8;
+    public static final int SEQUENCE_ID_FIELD_NUMBER = 8;
+    private long sequenceId_;
+    public boolean hasSequenceId() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    public long getSequenceId() {
+      return sequenceId_;
+    }
+    
     private void initFields() {
       properties_ = java.util.Collections.emptyList();
       partitionKey_ = "";
@@ -4991,6 +5005,7 @@ public final class PulsarApi {
       eventTime_ = 0L;
       partitionKeyB64Encoded_ = false;
       orderingKey_ = org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
+      sequenceId_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -5040,6 +5055,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000020) == 0x00000020)) {
         output.writeBytes(7, orderingKey_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeUInt64(8, sequenceId_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -5076,6 +5094,10 @@ public final class PulsarApi {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeBytesSize(7, orderingKey_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(8, sequenceId_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -5203,6 +5225,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000020);
         orderingKey_ = org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
         bitField0_ = (bitField0_ & ~0x00000040);
+        sequenceId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000080);
         return this;
       }
       
@@ -5265,6 +5289,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000020;
         }
         result.orderingKey_ = orderingKey_;
+        if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        result.sequenceId_ = sequenceId_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -5299,6 +5327,9 @@ public final class PulsarApi {
         if (other.hasOrderingKey()) {
           setOrderingKey(other.getOrderingKey());
         }
+        if (other.hasSequenceId()) {
+          setSequenceId(other.getSequenceId());
+        }
         return this;
       }
       
@@ -5374,6 +5405,11 @@ public final class PulsarApi {
               orderingKey_ = input.readBytes();
               break;
             }
+            case 64: {
+              bitField0_ |= 0x00000080;
+              sequenceId_ = input.readUInt64();
+              break;
+            }
           }
         }
       }
@@ -5613,6 +5649,27 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional uint64 sequence_id = 8;
+      private long sequenceId_ ;
+      public boolean hasSequenceId() {
+        return ((bitField0_ & 0x00000080) == 0x00000080);
+      }
+      public long getSequenceId() {
+        return sequenceId_;
+      }
+      public Builder setSequenceId(long value) {
+        bitField0_ |= 0x00000080;
+        sequenceId_ = value;
+        
+        return this;
+      }
+      public Builder clearSequenceId() {
+        bitField0_ = (bitField0_ & ~0x00000080);
+        sequenceId_ = 0L;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.SingleMessageMetadata)
     }
     
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index ba801d9..5af961c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1170,6 +1170,10 @@ public class Commands {
             singleMessageMetadataBuilder.setEventTime(msgBuilder.getEventTime());
         }
 
+        if (msgBuilder.hasSequenceId()) {
+            singleMessageMetadataBuilder.setSequenceId(msgBuilder.getSequenceId());
+        }
+
         try {
             return serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder, payload, batchBuffer);
         } finally {
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index d6db53a..b9c7754 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -137,6 +137,8 @@ message SingleMessageMetadata {
 	optional bool partition_key_b64_encoded = 6 [ default = false ];
     // Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode.
 	optional bytes ordering_key = 7;
+	// Allows consumer retrieve the sequence id that the producer set.
+	optional uint64 sequence_id = 8;
 }
 
 enum ServerError {