You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 10:41:14 UTC
[pulsar] 05/25: [PROTOBUF] Fix protobuf generation on handling repeated long number … (#7540)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 56ea8a887178167f7a437d62a26489fb6ea8e436
Author: Sijie Guo <si...@apache.org>
AuthorDate: Thu Jul 16 05:53:30 2020 -0700
[PROTOBUF] Fix protobuf generation on handling repeated long number … (#7540)
*Motivation*
The code generation for `repeated long` is not handled properly. (I am not sure how changes were made to PulsarApi.proto)
*Modification*
This pull request adds the code to handle generating code for `repeated long`.
*Test*
Add unit test to ensure `repeated long` is processed. Add test cases to cover both packed and non-package serialization for `repeated long`.
See more details about packed serialization: https://developers.google.com/protocol-buffers/docs/encoding#optional
(cherry picked from commit 4e358ef9f3fa8c6164286d6e80f6e75f66c31eab)
---
pom.xml | 2 +
.../apache/pulsar/common/api/proto/PulsarApi.java | 18 +
.../util/protobuf/ByteBufCodedInputStream.java | 30 +
.../apache/pulsar/common/api/proto/TestApi.java | 641 +++++++++++++++++++++
.../common/protocol/RepeatedLongNonPackedTest.java | 65 +++
.../common/protocol/RepeatedLongPackedTest.java | 65 +++
pulsar-common/src/test/proto/TestApi.proto | 32 +
7 files changed, 853 insertions(+)
diff --git a/pom.xml b/pom.xml
index 822dd65..246edd6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1272,6 +1272,7 @@ flexible messaging model and an intuitive client API.</description>
<exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
<exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/*.java</exclude>
+ <exclude>src/test/java/org/apache/pulsar/common/api/proto/*.java</exclude>
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java</exclude>
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java</exclude>
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java</exclude>
@@ -1347,6 +1348,7 @@ flexible messaging model and an intuitive client API.</description>
and are included in source tree for convenience -->
<exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude>
+ <exclude>src/test/java/org/apache/pulsar/common/api/proto/TestApi.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java</exclude>
<exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
<exclude>bin/proto/MLDataFormats_pb2.py</exclude>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 4d1babf..2f822e8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -1578,6 +1578,15 @@ public final class PulsarApi {
ackSet_.add(input.readInt64());
break;
}
+ case 42: {
+ int length = input.readRawVarint32();
+ int limit = input.pushLimit(length);
+ while (input.getBytesUntilLimit() > 0) {
+ addAckSet(input.readInt64());
+ }
+ input.popLimit(limit);
+ break;
+ }
}
}
}
@@ -18860,6 +18869,15 @@ public final class PulsarApi {
ackSet_.add(input.readInt64());
break;
}
+ case 34: {
+ int length = input.readRawVarint32();
+ int limit = input.pushLimit(length);
+ while (input.getBytesUntilLimit() > 0) {
+ addAckSet(input.readInt64());
+ }
+ input.popLimit(limit);
+ break;
+ }
}
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java
index e4dad0c..caac6d6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java
@@ -346,4 +346,34 @@ public class ByteBufCodedInputStream {
buf.readerIndex(buf.readerIndex() + size);
}
+
+ public int pushLimit(int byteLimit) throws InvalidProtocolBufferException {
+ if (byteLimit < 0) {
+ throw new InvalidProtocolBufferException("CodedInputStream encountered an embedded string or message"
+ + " which claimed to have negative size.");
+ }
+
+ byteLimit += buf.readerIndex();
+ final int oldLimit = buf.writerIndex();
+ if (byteLimit > oldLimit) {
+ throw new InvalidProtocolBufferException("While parsing a protocol message, the input ended unexpectedly"
+ + " in the middle of a field. This could mean either than the input has been truncated or that an"
+ + " embedded message misreported its own length.");
+ }
+ buf.writerIndex(byteLimit);
+ return oldLimit;
+ }
+
+ /**
+ * Discards the current limit, returning to the previous limit.
+ *
+ * @param oldLimit The old limit, as returned by {@code pushLimit}.
+ */
+ public void popLimit(final int oldLimit) {
+ buf.writerIndex(oldLimit);
+ }
+
+ public int getBytesUntilLimit() {
+ return buf.readableBytes();
+ }
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/api/proto/TestApi.java b/pulsar-common/src/test/java/org/apache/pulsar/common/api/proto/TestApi.java
new file mode 100644
index 0000000..f165a94
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/api/proto/TestApi.java
@@ -0,0 +1,641 @@
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: src/test/proto/TestApi.proto
+
+package org.apache.pulsar.common.api.proto;
+
+public final class TestApi {
+ private TestApi() {}
+ public static void registerAllExtensions(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite registry) {
+ }
+ public interface MessageIdDataOrBuilder
+ extends org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
+
+ // required uint64 ledgerId = 1;
+ boolean hasLedgerId();
+ long getLedgerId();
+
+ // required uint64 entryId = 2;
+ boolean hasEntryId();
+ long getEntryId();
+
+ // optional int32 partition = 3 [default = -1];
+ boolean hasPartition();
+ int getPartition();
+
+ // optional int32 batch_index = 4 [default = -1];
+ boolean hasBatchIndex();
+ int getBatchIndex();
+
+ // repeated int64 ack_set = 5 [packed = true];
+ java.util.List<java.lang.Long> getAckSetList();
+ int getAckSetCount();
+ long getAckSet(int index);
+ }
+ public static final class MessageIdData extends
+ org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
+ implements MessageIdDataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
+ // Use MessageIdData.newBuilder() to construct.
+ private io.netty.util.Recycler.Handle handle;
+ private MessageIdData(io.netty.util.Recycler.Handle handle) {
+ this.handle = handle;
+ }
+
+ private static final io.netty.util.Recycler<MessageIdData> RECYCLER = new io.netty.util.Recycler<MessageIdData>() {
+ protected MessageIdData newObject(Handle handle) {
+ return new MessageIdData(handle);
+ }
+ };
+
+ public void recycle() {
+ this.initFields();
+ this.memoizedIsInitialized = -1;
+ this.bitField0_ = 0;
+ this.memoizedSerializedSize = -1;
+ if (handle != null) { RECYCLER.recycle(this, handle); }
+ }
+
+ private MessageIdData(boolean noInit) {}
+
+ private static final MessageIdData defaultInstance;
+ public static MessageIdData getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public MessageIdData getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private int bitField0_;
+ // required uint64 ledgerId = 1;
+ public static final int LEDGERID_FIELD_NUMBER = 1;
+ private long ledgerId_;
+ public boolean hasLedgerId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public long getLedgerId() {
+ return ledgerId_;
+ }
+
+ // required uint64 entryId = 2;
+ public static final int ENTRYID_FIELD_NUMBER = 2;
+ private long entryId_;
+ public boolean hasEntryId() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public long getEntryId() {
+ return entryId_;
+ }
+
+ // optional int32 partition = 3 [default = -1];
+ public static final int PARTITION_FIELD_NUMBER = 3;
+ private int partition_;
+ public boolean hasPartition() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public int getPartition() {
+ return partition_;
+ }
+
+ // optional int32 batch_index = 4 [default = -1];
+ public static final int BATCH_INDEX_FIELD_NUMBER = 4;
+ private int batchIndex_;
+ public boolean hasBatchIndex() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public int getBatchIndex() {
+ return batchIndex_;
+ }
+
+ // repeated int64 ack_set = 5 [packed = true];
+ public static final int ACK_SET_FIELD_NUMBER = 5;
+ private java.util.List<java.lang.Long> ackSet_;
+ public java.util.List<java.lang.Long>
+ getAckSetList() {
+ return ackSet_;
+ }
+ public int getAckSetCount() {
+ return ackSet_.size();
+ }
+ public long getAckSet(int index) {
+ return ackSet_.get(index);
+ }
+ private int ackSetMemoizedSerializedSize = -1;
+
+ private void initFields() {
+ ledgerId_ = 0L;
+ entryId_ = 0L;
+ partition_ = -1;
+ batchIndex_ = -1;
+ ackSet_ = java.util.Collections.emptyList();;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasLedgerId()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasEntryId()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream output)
+ throws java.io.IOException {
+ throw new RuntimeException("Cannot use CodedOutputStream");
+ }
+
+ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeUInt64(1, ledgerId_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeUInt64(2, entryId_);
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeInt32(3, partition_);
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeInt32(4, batchIndex_);
+ }
+ if (getAckSetList().size() > 0) {
+ output.writeRawVarint32(42);
+ output.writeRawVarint32(ackSetMemoizedSerializedSize);
+ }
+ for (int i = 0; i < ackSet_.size(); i++) {
+ output.writeInt64NoTag(ackSet_.get(i));
+ }
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeUInt64Size(1, ledgerId_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeUInt64Size(2, entryId_);
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeInt32Size(3, partition_);
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeInt32Size(4, batchIndex_);
+ }
+ {
+ int dataSize = 0;
+ for (int i = 0; i < ackSet_.size(); i++) {
+ dataSize += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeInt64SizeNoTag(ackSet_.get(i));
+ }
+ size += dataSize;
+ if (!getAckSetList().isEmpty()) {
+ size += 1;
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeInt32SizeNoTag(dataSize);
+ }
+ ackSetMemoizedSerializedSize = dataSize;
+ }
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data)
+ throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+ throw new RuntimeException("Disabled");
+ }
+ public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+ throw new RuntimeException("Disabled");
+ }
+ public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(byte[] data)
+ throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(
+ byte[] data,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(
+ java.io.InputStream input,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseDelimitedFrom(
+ java.io.InputStream input,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.pulsar.common.api.proto.TestApi.MessageIdData prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ public static final class Builder extends
+ org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite.Builder<
+ org.apache.pulsar.common.api.proto.TestApi.MessageIdData, Builder>
+ implements org.apache.pulsar.common.api.proto.TestApi.MessageIdDataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
+ // Construct using org.apache.pulsar.common.api.proto.TestApi.MessageIdData.newBuilder()
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
+ this.handle = handle;
+ maybeForceBuilderInitialization();
+ }
+ private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
+ return new Builder(handle);
+ }
+ };
+
+ public void recycle() {
+ clear();
+ if (handle != null) {RECYCLER.recycle(this, handle);}
+ }
+
+ private void maybeForceBuilderInitialization() {
+ }
+ private static Builder create() {
+ return RECYCLER.get();
+ }
+
+ public Builder clear() {
+ super.clear();
+ ledgerId_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ entryId_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ partition_ = -1;
+ bitField0_ = (bitField0_ & ~0x00000004);
+ batchIndex_ = -1;
+ bitField0_ = (bitField0_ & ~0x00000008);
+ ackSet_ = java.util.Collections.emptyList();;
+ bitField0_ = (bitField0_ & ~0x00000010);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public org.apache.pulsar.common.api.proto.TestApi.MessageIdData getDefaultInstanceForType() {
+ return org.apache.pulsar.common.api.proto.TestApi.MessageIdData.getDefaultInstance();
+ }
+
+ public org.apache.pulsar.common.api.proto.TestApi.MessageIdData build() {
+ org.apache.pulsar.common.api.proto.TestApi.MessageIdData result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ private org.apache.pulsar.common.api.proto.TestApi.MessageIdData buildParsed()
+ throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+ org.apache.pulsar.common.api.proto.TestApi.MessageIdData result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(
+ result).asInvalidProtocolBufferException();
+ }
+ return result;
+ }
+
+ public org.apache.pulsar.common.api.proto.TestApi.MessageIdData buildPartial() {
+ org.apache.pulsar.common.api.proto.TestApi.MessageIdData result = org.apache.pulsar.common.api.proto.TestApi.MessageIdData.RECYCLER.get();
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.ledgerId_ = ledgerId_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.entryId_ = entryId_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.partition_ = partition_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.batchIndex_ = batchIndex_;
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ ackSet_ = java.util.Collections.unmodifiableList(ackSet_);
+ bitField0_ = (bitField0_ & ~0x00000010);
+ }
+ result.ackSet_ = ackSet_;
+ result.bitField0_ = to_bitField0_;
+ return result;
+ }
+
+ public Builder mergeFrom(org.apache.pulsar.common.api.proto.TestApi.MessageIdData other) {
+ if (other == org.apache.pulsar.common.api.proto.TestApi.MessageIdData.getDefaultInstance()) return this;
+ if (other.hasLedgerId()) {
+ setLedgerId(other.getLedgerId());
+ }
+ if (other.hasEntryId()) {
+ setEntryId(other.getEntryId());
+ }
+ if (other.hasPartition()) {
+ setPartition(other.getPartition());
+ }
+ if (other.hasBatchIndex()) {
+ setBatchIndex(other.getBatchIndex());
+ }
+ if (!other.ackSet_.isEmpty()) {
+ if (ackSet_.isEmpty()) {
+ ackSet_ = other.ackSet_;
+ bitField0_ = (bitField0_ & ~0x00000010);
+ } else {
+ ensureAckSetIsMutable();
+ ackSet_.addAll(other.ackSet_);
+ }
+
+ }
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasLedgerId()) {
+
+ return false;
+ }
+ if (!hasEntryId()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ throw new java.io.IOException("Merge from CodedInputStream is disabled");
+ }
+ public Builder mergeFrom(
+ org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ while (true) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+
+ return this;
+ default: {
+ if (!input.skipField(tag)) {
+
+ return this;
+ }
+ break;
+ }
+ case 8: {
+ bitField0_ |= 0x00000001;
+ ledgerId_ = input.readUInt64();
+ break;
+ }
+ case 16: {
+ bitField0_ |= 0x00000002;
+ entryId_ = input.readUInt64();
+ break;
+ }
+ case 24: {
+ bitField0_ |= 0x00000004;
+ partition_ = input.readInt32();
+ break;
+ }
+ case 32: {
+ bitField0_ |= 0x00000008;
+ batchIndex_ = input.readInt32();
+ break;
+ }
+ case 40: {
+ ensureAckSetIsMutable();
+ ackSet_.add(input.readInt64());
+ break;
+ }
+ case 42: {
+ int length = input.readRawVarint32();
+ int limit = input.pushLimit(length);
+ while (input.getBytesUntilLimit() > 0) {
+ addAckSet(input.readInt64());
+ }
+ input.popLimit(limit);
+ break;
+ }
+ }
+ }
+ }
+
+ private int bitField0_;
+
+ // required uint64 ledgerId = 1;
+ private long ledgerId_ ;
+ public boolean hasLedgerId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public long getLedgerId() {
+ return ledgerId_;
+ }
+ public Builder setLedgerId(long value) {
+ bitField0_ |= 0x00000001;
+ ledgerId_ = value;
+
+ return this;
+ }
+ public Builder clearLedgerId() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ ledgerId_ = 0L;
+
+ return this;
+ }
+
+ // required uint64 entryId = 2;
+ private long entryId_ ;
+ public boolean hasEntryId() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public long getEntryId() {
+ return entryId_;
+ }
+ public Builder setEntryId(long value) {
+ bitField0_ |= 0x00000002;
+ entryId_ = value;
+
+ return this;
+ }
+ public Builder clearEntryId() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ entryId_ = 0L;
+
+ return this;
+ }
+
+ // optional int32 partition = 3 [default = -1];
+ private int partition_ = -1;
+ public boolean hasPartition() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public int getPartition() {
+ return partition_;
+ }
+ public Builder setPartition(int value) {
+ bitField0_ |= 0x00000004;
+ partition_ = value;
+
+ return this;
+ }
+ public Builder clearPartition() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ partition_ = -1;
+
+ return this;
+ }
+
+ // optional int32 batch_index = 4 [default = -1];
+ private int batchIndex_ = -1;
+ public boolean hasBatchIndex() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public int getBatchIndex() {
+ return batchIndex_;
+ }
+ public Builder setBatchIndex(int value) {
+ bitField0_ |= 0x00000008;
+ batchIndex_ = value;
+
+ return this;
+ }
+ public Builder clearBatchIndex() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ batchIndex_ = -1;
+
+ return this;
+ }
+
+ // repeated int64 ack_set = 5 [packed = true];
+ private java.util.List<java.lang.Long> ackSet_ = java.util.Collections.emptyList();;
+ private void ensureAckSetIsMutable() {
+ if (!((bitField0_ & 0x00000010) == 0x00000010)) {
+ ackSet_ = new java.util.ArrayList<java.lang.Long>(ackSet_);
+ bitField0_ |= 0x00000010;
+ }
+ }
+ public java.util.List<java.lang.Long>
+ getAckSetList() {
+ return java.util.Collections.unmodifiableList(ackSet_);
+ }
+ public int getAckSetCount() {
+ return ackSet_.size();
+ }
+ public long getAckSet(int index) {
+ return ackSet_.get(index);
+ }
+ public Builder setAckSet(
+ int index, long value) {
+ ensureAckSetIsMutable();
+ ackSet_.set(index, value);
+
+ return this;
+ }
+ public Builder addAckSet(long value) {
+ ensureAckSetIsMutable();
+ ackSet_.add(value);
+
+ return this;
+ }
+ public Builder addAllAckSet(
+ java.lang.Iterable<? extends java.lang.Long> values) {
+ ensureAckSetIsMutable();
+ super.addAll(values, ackSet_);
+
+ return this;
+ }
+ public Builder clearAckSet() {
+ ackSet_ = java.util.Collections.emptyList();;
+ bitField0_ = (bitField0_ & ~0x00000010);
+
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:pulsar.proto.MessageIdData)
+ }
+
+ static {
+ defaultInstance = new MessageIdData(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:pulsar.proto.MessageIdData)
+ }
+
+
+ static {
+ }
+
+ // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongNonPackedTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongNonPackedTest.java
new file mode 100644
index 0000000..7c50d13
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongNonPackedTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol;
+
+import static org.testng.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
+import org.testng.annotations.Test;
+
+public class RepeatedLongNonPackedTest {
+
+ @Test
+ public void testRepeatedLongPacked() throws Exception {
+ MessageIdData messageIdData = MessageIdData.newBuilder()
+ .setLedgerId(0L)
+ .setEntryId(0L)
+ .setPartition(0)
+ .setBatchIndex(0)
+ .addAckSet(1000)
+ .addAckSet(1001)
+ .addAckSet(1003)
+ .build();
+
+ int cmdSize = messageIdData.getSerializedSize();
+ ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(cmdSize);
+ ByteBufCodedOutputStream outputStream = ByteBufCodedOutputStream.get(buf);
+ messageIdData.writeTo(outputStream);
+
+ messageIdData.recycle();
+ outputStream.recycle();
+
+ ByteBufCodedInputStream inputStream = ByteBufCodedInputStream.get(buf);
+ MessageIdData newMessageIdData = MessageIdData.newBuilder()
+ .mergeFrom(inputStream, null)
+ .build();
+ inputStream.recycle();
+
+ assertEquals(3, newMessageIdData.getAckSetCount());
+ assertEquals(1000, newMessageIdData.getAckSet(0));
+ assertEquals(1001, newMessageIdData.getAckSet(1));
+ assertEquals(1003, newMessageIdData.getAckSet(2));
+ newMessageIdData.recycle();
+ }
+
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongPackedTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongPackedTest.java
new file mode 100644
index 0000000..e0569a8
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongPackedTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol;
+
+import static org.testng.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.TestApi.MessageIdData;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
+import org.testng.annotations.Test;
+
+public class RepeatedLongPackedTest {
+
+ @Test
+ public void testRepeatedLongPacked() throws Exception {
+ MessageIdData messageIdData = MessageIdData.newBuilder()
+ .setLedgerId(0L)
+ .setEntryId(0L)
+ .setPartition(0)
+ .setBatchIndex(0)
+ .addAckSet(1000)
+ .addAckSet(1001)
+ .addAckSet(1003)
+ .build();
+
+ int cmdSize = messageIdData.getSerializedSize();
+ ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(cmdSize);
+ ByteBufCodedOutputStream outputStream = ByteBufCodedOutputStream.get(buf);
+ messageIdData.writeTo(outputStream);
+
+ messageIdData.recycle();
+ outputStream.recycle();
+
+ ByteBufCodedInputStream inputStream = ByteBufCodedInputStream.get(buf);
+ MessageIdData newMessageIdData = MessageIdData.newBuilder()
+ .mergeFrom(inputStream, null)
+ .build();
+ inputStream.recycle();
+
+ assertEquals(3, newMessageIdData.getAckSetCount());
+ assertEquals(1000, newMessageIdData.getAckSet(0));
+ assertEquals(1001, newMessageIdData.getAckSet(1));
+ assertEquals(1003, newMessageIdData.getAckSet(2));
+ newMessageIdData.recycle();
+ }
+
+}
diff --git a/pulsar-common/src/test/proto/TestApi.proto b/pulsar-common/src/test/proto/TestApi.proto
new file mode 100644
index 0000000..24c90e4
--- /dev/null
+++ b/pulsar-common/src/test/proto/TestApi.proto
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto2";
+
+package pulsar.proto;
+option java_package = "org.apache.pulsar.common.api.proto";
+option optimize_for = LITE_RUNTIME;
+
+message MessageIdData {
+ required uint64 ledgerId = 1;
+ required uint64 entryId = 2;
+ optional int32 partition = 3 [default = -1];
+ optional int32 batch_index = 4 [default = -1];
+ repeated int64 ack_set = 5 [packed = true];
+}
+