You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 10:41:14 UTC

[pulsar] 05/25: [PROTOBUF] Fix protobuf generation on handling repeated long number … (#7540)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 56ea8a887178167f7a437d62a26489fb6ea8e436
Author: Sijie Guo <si...@apache.org>
AuthorDate: Thu Jul 16 05:53:30 2020 -0700

    [PROTOBUF] Fix protobuf generation on handling repeated long number … (#7540)
    
    *Motivation*
    
    The code generation for `repeated long` is not handled properly. (I am not sure how changes were made to PulsarApi.proto)
    
    *Modification*
    
    This pull request adds the code to handle generating code for `repeated long`.
    
    *Test*
    
    Add unit test to ensure `repeated long` is processed. Add test cases to cover both packed and non-package serialization for `repeated long`.
    
    See more details about packed serialization: https://developers.google.com/protocol-buffers/docs/encoding#optional
    
    (cherry picked from commit 4e358ef9f3fa8c6164286d6e80f6e75f66c31eab)
---
 pom.xml                                            |   2 +
 .../apache/pulsar/common/api/proto/PulsarApi.java  |  18 +
 .../util/protobuf/ByteBufCodedInputStream.java     |  30 +
 .../apache/pulsar/common/api/proto/TestApi.java    | 641 +++++++++++++++++++++
 .../common/protocol/RepeatedLongNonPackedTest.java |  65 +++
 .../common/protocol/RepeatedLongPackedTest.java    |  65 +++
 pulsar-common/src/test/proto/TestApi.proto         |  32 +
 7 files changed, 853 insertions(+)

diff --git a/pom.xml b/pom.xml
index 822dd65..246edd6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1272,6 +1272,7 @@ flexible messaging model and an intuitive client API.</description>
             <exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/common/api/proto/*.java</exclude>
+            <exclude>src/test/java/org/apache/pulsar/common/api/proto/*.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java</exclude>
@@ -1347,6 +1348,7 @@ flexible messaging model and an intuitive client API.</description>
                  and are included in source tree for convenience -->
             <exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude>
+            <exclude>src/test/java/org/apache/pulsar/common/api/proto/TestApi.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
             <exclude>bin/proto/MLDataFormats_pb2.py</exclude>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 4d1babf..2f822e8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -1578,6 +1578,15 @@ public final class PulsarApi {
               ackSet_.add(input.readInt64());
               break;
             }
+            case 42: {
+              int length = input.readRawVarint32();
+              int limit = input.pushLimit(length);
+              while (input.getBytesUntilLimit() > 0) {
+                addAckSet(input.readInt64());
+              }
+              input.popLimit(limit);
+              break;
+            }
           }
         }
       }
@@ -18860,6 +18869,15 @@ public final class PulsarApi {
               ackSet_.add(input.readInt64());
               break;
             }
+            case 34: {
+              int length = input.readRawVarint32();
+              int limit = input.pushLimit(length);
+              while (input.getBytesUntilLimit() > 0) {
+                addAckSet(input.readInt64());
+              }
+              input.popLimit(limit);
+              break;
+            }
           }
         }
       }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java
index e4dad0c..caac6d6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java
@@ -346,4 +346,34 @@ public class ByteBufCodedInputStream {
 
         buf.readerIndex(buf.readerIndex() + size);
     }
+
+    public int pushLimit(int byteLimit) throws InvalidProtocolBufferException {
+        if (byteLimit < 0) {
+            throw new InvalidProtocolBufferException("CodedInputStream encountered an embedded string or message"
+                + " which claimed to have negative size.");
+        }
+
+        byteLimit += buf.readerIndex();
+        final int oldLimit = buf.writerIndex();
+        if (byteLimit > oldLimit) {
+            throw new InvalidProtocolBufferException("While parsing a protocol message, the input ended unexpectedly"
+                + " in the middle of a field.  This could mean either than the input has been truncated or that an"
+                + " embedded message misreported its own length.");
+        }
+        buf.writerIndex(byteLimit);
+        return oldLimit;
+    }
+
+    /**
+     * Discards the current limit, returning to the previous limit.
+     *
+     * @param oldLimit The old limit, as returned by {@code pushLimit}.
+     */
+    public void popLimit(final int oldLimit) {
+        buf.writerIndex(oldLimit);
+    }
+
+    public int getBytesUntilLimit() {
+        return buf.readableBytes();
+    }
 }
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/api/proto/TestApi.java b/pulsar-common/src/test/java/org/apache/pulsar/common/api/proto/TestApi.java
new file mode 100644
index 0000000..f165a94
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/api/proto/TestApi.java
@@ -0,0 +1,641 @@
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: src/test/proto/TestApi.proto
+
+package org.apache.pulsar.common.api.proto;
+
+public final class TestApi {
+  private TestApi() {}
+  public static void registerAllExtensions(
+      org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite registry) {
+  }
+  public interface MessageIdDataOrBuilder
+      extends org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
+    
+    // required uint64 ledgerId = 1;
+    boolean hasLedgerId();
+    long getLedgerId();
+    
+    // required uint64 entryId = 2;
+    boolean hasEntryId();
+    long getEntryId();
+    
+    // optional int32 partition = 3 [default = -1];
+    boolean hasPartition();
+    int getPartition();
+    
+    // optional int32 batch_index = 4 [default = -1];
+    boolean hasBatchIndex();
+    int getBatchIndex();
+    
+    // repeated int64 ack_set = 5 [packed = true];
+    java.util.List<java.lang.Long> getAckSetList();
+    int getAckSetCount();
+    long getAckSet(int index);
+  }
+  public static final class MessageIdData extends
+      org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
+      implements MessageIdDataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
+    // Use MessageIdData.newBuilder() to construct.
+    private io.netty.util.Recycler.Handle handle;
+    private MessageIdData(io.netty.util.Recycler.Handle handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<MessageIdData> RECYCLER = new io.netty.util.Recycler<MessageIdData>() {
+            protected MessageIdData newObject(Handle handle) {
+              return new MessageIdData(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            if (handle != null) { RECYCLER.recycle(this, handle); }
+        }
+         
+    private MessageIdData(boolean noInit) {}
+    
+    private static final MessageIdData defaultInstance;
+    public static MessageIdData getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public MessageIdData getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required uint64 ledgerId = 1;
+    public static final int LEDGERID_FIELD_NUMBER = 1;
+    private long ledgerId_;
+    public boolean hasLedgerId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public long getLedgerId() {
+      return ledgerId_;
+    }
+    
+    // required uint64 entryId = 2;
+    public static final int ENTRYID_FIELD_NUMBER = 2;
+    private long entryId_;
+    public boolean hasEntryId() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public long getEntryId() {
+      return entryId_;
+    }
+    
+    // optional int32 partition = 3 [default = -1];
+    public static final int PARTITION_FIELD_NUMBER = 3;
+    private int partition_;
+    public boolean hasPartition() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public int getPartition() {
+      return partition_;
+    }
+    
+    // optional int32 batch_index = 4 [default = -1];
+    public static final int BATCH_INDEX_FIELD_NUMBER = 4;
+    private int batchIndex_;
+    public boolean hasBatchIndex() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public int getBatchIndex() {
+      return batchIndex_;
+    }
+    
+    // repeated int64 ack_set = 5 [packed = true];
+    public static final int ACK_SET_FIELD_NUMBER = 5;
+    private java.util.List<java.lang.Long> ackSet_;
+    public java.util.List<java.lang.Long>
+        getAckSetList() {
+      return ackSet_;
+    }
+    public int getAckSetCount() {
+      return ackSet_.size();
+    }
+    public long getAckSet(int index) {
+      return ackSet_.get(index);
+    }
+    private int ackSetMemoizedSerializedSize = -1;
+    
+    private void initFields() {
+      ledgerId_ = 0L;
+      entryId_ = 0L;
+      partition_ = -1;
+      batchIndex_ = -1;
+      ackSet_ = java.util.Collections.emptyList();;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasLedgerId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasEntryId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeUInt64(1, ledgerId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeUInt64(2, entryId_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeInt32(3, partition_);
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeInt32(4, batchIndex_);
+      }
+      if (getAckSetList().size() > 0) {
+        output.writeRawVarint32(42);
+        output.writeRawVarint32(ackSetMemoizedSerializedSize);
+      }
+      for (int i = 0; i < ackSet_.size(); i++) {
+        output.writeInt64NoTag(ackSet_.get(i));
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(1, ledgerId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(2, entryId_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeInt32Size(3, partition_);
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeInt32Size(4, batchIndex_);
+      }
+      {
+        int dataSize = 0;
+        for (int i = 0; i < ackSet_.size(); i++) {
+          dataSize += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+            .computeInt64SizeNoTag(ackSet_.get(i));
+        }
+        size += dataSize;
+        if (!getAckSetList().isEmpty()) {
+          size += 1;
+          size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+              .computeInt32SizeNoTag(dataSize);
+        }
+        ackSetMemoizedSerializedSize = dataSize;
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(byte[] data)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(
+        byte[] data,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(
+        java.io.InputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseDelimitedFrom(
+        java.io.InputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.pulsar.common.api.proto.TestApi.MessageIdData prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite.Builder<
+          org.apache.pulsar.common.api.proto.TestApi.MessageIdData, Builder>
+        implements org.apache.pulsar.common.api.proto.TestApi.MessageIdDataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
+      // Construct using org.apache.pulsar.common.api.proto.TestApi.MessageIdData.newBuilder()
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                if (handle != null) {RECYCLER.recycle(this, handle);}
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        ledgerId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        entryId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        partition_ = -1;
+        bitField0_ = (bitField0_ & ~0x00000004);
+        batchIndex_ = -1;
+        bitField0_ = (bitField0_ & ~0x00000008);
+        ackSet_ = java.util.Collections.emptyList();;
+        bitField0_ = (bitField0_ & ~0x00000010);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public org.apache.pulsar.common.api.proto.TestApi.MessageIdData getDefaultInstanceForType() {
+        return org.apache.pulsar.common.api.proto.TestApi.MessageIdData.getDefaultInstance();
+      }
+      
+      public org.apache.pulsar.common.api.proto.TestApi.MessageIdData build() {
+        org.apache.pulsar.common.api.proto.TestApi.MessageIdData result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.pulsar.common.api.proto.TestApi.MessageIdData buildParsed()
+          throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+        org.apache.pulsar.common.api.proto.TestApi.MessageIdData result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.pulsar.common.api.proto.TestApi.MessageIdData buildPartial() {
+        org.apache.pulsar.common.api.proto.TestApi.MessageIdData result = org.apache.pulsar.common.api.proto.TestApi.MessageIdData.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.ledgerId_ = ledgerId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.entryId_ = entryId_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.partition_ = partition_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.batchIndex_ = batchIndex_;
+        if (((bitField0_ & 0x00000010) == 0x00000010)) {
+          ackSet_ = java.util.Collections.unmodifiableList(ackSet_);
+          bitField0_ = (bitField0_ & ~0x00000010);
+        }
+        result.ackSet_ = ackSet_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder mergeFrom(org.apache.pulsar.common.api.proto.TestApi.MessageIdData other) {
+        if (other == org.apache.pulsar.common.api.proto.TestApi.MessageIdData.getDefaultInstance()) return this;
+        if (other.hasLedgerId()) {
+          setLedgerId(other.getLedgerId());
+        }
+        if (other.hasEntryId()) {
+          setEntryId(other.getEntryId());
+        }
+        if (other.hasPartition()) {
+          setPartition(other.getPartition());
+        }
+        if (other.hasBatchIndex()) {
+          setBatchIndex(other.getBatchIndex());
+        }
+        if (!other.ackSet_.isEmpty()) {
+          if (ackSet_.isEmpty()) {
+            ackSet_ = other.ackSet_;
+            bitField0_ = (bitField0_ & ~0x00000010);
+          } else {
+            ensureAckSetIsMutable();
+            ackSet_.addAll(other.ackSet_);
+          }
+          
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasLedgerId()) {
+          
+          return false;
+        }
+        if (!hasEntryId()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+                              org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              ledgerId_ = input.readUInt64();
+              break;
+            }
+            case 16: {
+              bitField0_ |= 0x00000002;
+              entryId_ = input.readUInt64();
+              break;
+            }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              partition_ = input.readInt32();
+              break;
+            }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              batchIndex_ = input.readInt32();
+              break;
+            }
+            case 40: {
+              ensureAckSetIsMutable();
+              ackSet_.add(input.readInt64());
+              break;
+            }
+            case 42: {
+              int length = input.readRawVarint32();
+              int limit = input.pushLimit(length);
+              while (input.getBytesUntilLimit() > 0) {
+                addAckSet(input.readInt64());
+              }
+              input.popLimit(limit);
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required uint64 ledgerId = 1;
+      private long ledgerId_ ;
+      public boolean hasLedgerId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public long getLedgerId() {
+        return ledgerId_;
+      }
+      public Builder setLedgerId(long value) {
+        bitField0_ |= 0x00000001;
+        ledgerId_ = value;
+        
+        return this;
+      }
+      public Builder clearLedgerId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        ledgerId_ = 0L;
+        
+        return this;
+      }
+      
+      // required uint64 entryId = 2;
+      private long entryId_ ;
+      public boolean hasEntryId() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public long getEntryId() {
+        return entryId_;
+      }
+      public Builder setEntryId(long value) {
+        bitField0_ |= 0x00000002;
+        entryId_ = value;
+        
+        return this;
+      }
+      public Builder clearEntryId() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        entryId_ = 0L;
+        
+        return this;
+      }
+      
+      // optional int32 partition = 3 [default = -1];
+      private int partition_ = -1;
+      public boolean hasPartition() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public int getPartition() {
+        return partition_;
+      }
+      public Builder setPartition(int value) {
+        bitField0_ |= 0x00000004;
+        partition_ = value;
+        
+        return this;
+      }
+      public Builder clearPartition() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        partition_ = -1;
+        
+        return this;
+      }
+      
+      // optional int32 batch_index = 4 [default = -1];
+      private int batchIndex_ = -1;
+      public boolean hasBatchIndex() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public int getBatchIndex() {
+        return batchIndex_;
+      }
+      public Builder setBatchIndex(int value) {
+        bitField0_ |= 0x00000008;
+        batchIndex_ = value;
+        
+        return this;
+      }
+      public Builder clearBatchIndex() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        batchIndex_ = -1;
+        
+        return this;
+      }
+      
+      // repeated int64 ack_set = 5 [packed = true];
+      private java.util.List<java.lang.Long> ackSet_ = java.util.Collections.emptyList();;
+      private void ensureAckSetIsMutable() {
+        if (!((bitField0_ & 0x00000010) == 0x00000010)) {
+          ackSet_ = new java.util.ArrayList<java.lang.Long>(ackSet_);
+          bitField0_ |= 0x00000010;
+         }
+      }
+      public java.util.List<java.lang.Long>
+          getAckSetList() {
+        return java.util.Collections.unmodifiableList(ackSet_);
+      }
+      public int getAckSetCount() {
+        return ackSet_.size();
+      }
+      public long getAckSet(int index) {
+        return ackSet_.get(index);
+      }
+      public Builder setAckSet(
+          int index, long value) {
+        ensureAckSetIsMutable();
+        ackSet_.set(index, value);
+        
+        return this;
+      }
+      public Builder addAckSet(long value) {
+        ensureAckSetIsMutable();
+        ackSet_.add(value);
+        
+        return this;
+      }
+      public Builder addAllAckSet(
+          java.lang.Iterable<? extends java.lang.Long> values) {
+        ensureAckSetIsMutable();
+        super.addAll(values, ackSet_);
+        
+        return this;
+      }
+      public Builder clearAckSet() {
+        ackSet_ = java.util.Collections.emptyList();;
+        bitField0_ = (bitField0_ & ~0x00000010);
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.proto.MessageIdData)
+    }
+    
+    static {
+      defaultInstance = new MessageIdData(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.proto.MessageIdData)
+  }
+  
+  
+  static {
+  }
+  
+  // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongNonPackedTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongNonPackedTest.java
new file mode 100644
index 0000000..7c50d13
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongNonPackedTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol;
+
+import static org.testng.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
+import org.testng.annotations.Test;
+
+public class RepeatedLongNonPackedTest {
+
+    @Test
+    public void testRepeatedLongPacked() throws Exception {
+        MessageIdData messageIdData = MessageIdData.newBuilder()
+            .setLedgerId(0L)
+            .setEntryId(0L)
+            .setPartition(0)
+            .setBatchIndex(0)
+            .addAckSet(1000)
+            .addAckSet(1001)
+            .addAckSet(1003)
+            .build();
+
+        int cmdSize = messageIdData.getSerializedSize();
+        ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(cmdSize);
+        ByteBufCodedOutputStream outputStream = ByteBufCodedOutputStream.get(buf);
+        messageIdData.writeTo(outputStream);
+
+        messageIdData.recycle();
+        outputStream.recycle();
+
+        ByteBufCodedInputStream inputStream = ByteBufCodedInputStream.get(buf);
+        MessageIdData newMessageIdData = MessageIdData.newBuilder()
+            .mergeFrom(inputStream, null)
+            .build();
+        inputStream.recycle();
+
+        assertEquals(3, newMessageIdData.getAckSetCount());
+        assertEquals(1000, newMessageIdData.getAckSet(0));
+        assertEquals(1001, newMessageIdData.getAckSet(1));
+        assertEquals(1003, newMessageIdData.getAckSet(2));
+        newMessageIdData.recycle();
+    }
+
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongPackedTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongPackedTest.java
new file mode 100644
index 0000000..e0569a8
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongPackedTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol;
+
+import static org.testng.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.TestApi.MessageIdData;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
+import org.testng.annotations.Test;
+
+public class RepeatedLongPackedTest {
+
+    @Test
+    public void testRepeatedLongPacked() throws Exception {
+        MessageIdData messageIdData = MessageIdData.newBuilder()
+            .setLedgerId(0L)
+            .setEntryId(0L)
+            .setPartition(0)
+            .setBatchIndex(0)
+            .addAckSet(1000)
+            .addAckSet(1001)
+            .addAckSet(1003)
+            .build();
+
+        int cmdSize = messageIdData.getSerializedSize();
+        ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(cmdSize);
+        ByteBufCodedOutputStream outputStream = ByteBufCodedOutputStream.get(buf);
+        messageIdData.writeTo(outputStream);
+
+        messageIdData.recycle();
+        outputStream.recycle();
+
+        ByteBufCodedInputStream inputStream = ByteBufCodedInputStream.get(buf);
+        MessageIdData newMessageIdData = MessageIdData.newBuilder()
+            .mergeFrom(inputStream, null)
+            .build();
+        inputStream.recycle();
+
+        assertEquals(3, newMessageIdData.getAckSetCount());
+        assertEquals(1000, newMessageIdData.getAckSet(0));
+        assertEquals(1001, newMessageIdData.getAckSet(1));
+        assertEquals(1003, newMessageIdData.getAckSet(2));
+        newMessageIdData.recycle();
+    }
+
+}
diff --git a/pulsar-common/src/test/proto/TestApi.proto b/pulsar-common/src/test/proto/TestApi.proto
new file mode 100644
index 0000000..24c90e4
--- /dev/null
+++ b/pulsar-common/src/test/proto/TestApi.proto
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto2";
+
+package pulsar.proto;
+option java_package = "org.apache.pulsar.common.api.proto";
+option optimize_for = LITE_RUNTIME;
+
+message MessageIdData {
+    required uint64 ledgerId = 1;
+    required uint64 entryId  = 2;
+    optional int32 partition = 3 [default = -1];
+    optional int32 batch_index = 4 [default = -1];
+    repeated int64 ack_set = 5 [packed = true];
+}
+