You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/30 14:48:18 UTC
[pulsar] branch master updated: [Transaction][buffer] Add data
ledger position in txn commit marker (#4826)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3a06c97 [Transaction][buffer] Add data ledger position in txn commit marker (#4826)
3a06c97 is described below
commit 3a06c97a10cc4dcf42b62f41a6add0d6ffa01ca4
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Tue Jul 30 22:48:13 2019 +0800
[Transaction][buffer] Add data ledger position in txn commit marker (#4826)
* [Transaction][buffer] Add data ledger position in txn commit marker
---
*Motivation*
Add the data ledger position into the commit marker of the transaction log.
*Modifications*
Create a new TxnCommitMarker in the proto.
---
.../pulsar/common/api/proto/PulsarMarkers.java | 363 +++++++++++++++++++++
.../org/apache/pulsar/common/protocol/Markers.java | 37 ++-
pulsar-common/src/main/proto/PulsarMarkers.proto | 6 +
.../apache/pulsar/common/protocol/MarkersTest.java | 10 +-
4 files changed, 409 insertions(+), 7 deletions(-)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java
index dbab061..5ef2434 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java
@@ -2986,6 +2986,369 @@ public final class PulsarMarkers {
// @@protoc_insertion_point(class_scope:pulsar.proto.MessageIdData)
}
+ public interface TxnCommitMarkerOrBuilder
+ extends org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
+
+ // required .pulsar.proto.MessageIdData message_id = 1;
+ boolean hasMessageId();
+ org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData getMessageId();
+ }
+ public static final class TxnCommitMarker extends
+ org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
+ implements TxnCommitMarkerOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
+ // Use TxnCommitMarker.newBuilder() to construct.
+ private io.netty.util.Recycler.Handle handle;
+ private TxnCommitMarker(io.netty.util.Recycler.Handle handle) {
+ this.handle = handle;
+ }
+
+ private static final io.netty.util.Recycler<TxnCommitMarker> RECYCLER = new io.netty.util.Recycler<TxnCommitMarker>() {
+ protected TxnCommitMarker newObject(Handle handle) {
+ return new TxnCommitMarker(handle);
+ }
+ };
+
+ public void recycle() {
+ this.initFields();
+ this.memoizedIsInitialized = -1;
+ this.bitField0_ = 0;
+ this.memoizedSerializedSize = -1;
+ if (handle != null) { RECYCLER.recycle(this, handle); }
+ }
+
+ private TxnCommitMarker(boolean noInit) {}
+
+ private static final TxnCommitMarker defaultInstance;
+ public static TxnCommitMarker getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public TxnCommitMarker getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private int bitField0_;
+ // required .pulsar.proto.MessageIdData message_id = 1;
+ public static final int MESSAGE_ID_FIELD_NUMBER = 1;
+ private org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData messageId_;
+ public boolean hasMessageId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData getMessageId() {
+ return messageId_;
+ }
+
+ private void initFields() {
+ messageId_ = org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData.getDefaultInstance();
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasMessageId()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!getMessageId().isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream output)
+ throws java.io.IOException {
+ throw new RuntimeException("Cannot use CodedOutputStream");
+ }
+
+ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeMessage(1, messageId_);
+ }
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeMessageSize(1, messageId_);
+ }
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data)
+ throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+ throw new RuntimeException("Disabled");
+ }
+ public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+ throw new RuntimeException("Disabled");
+ }
+ public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseFrom(byte[] data)
+ throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseFrom(
+ byte[] data,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseFrom(
+ java.io.InputStream input,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseDelimitedFrom(
+ java.io.InputStream input,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ public static final class Builder extends
+ org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite.Builder<
+ org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker, Builder>
+ implements org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarkerOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
+ // Construct using org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker.newBuilder()
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
+ this.handle = handle;
+ maybeForceBuilderInitialization();
+ }
+ private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
+ return new Builder(handle);
+ }
+ };
+
+ public void recycle() {
+ clear();
+ if (handle != null) {RECYCLER.recycle(this, handle);}
+ }
+
+ private void maybeForceBuilderInitialization() {
+ }
+ private static Builder create() {
+ return RECYCLER.get();
+ }
+
+ public Builder clear() {
+ super.clear();
+ messageId_ = org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData.getDefaultInstance();
+ bitField0_ = (bitField0_ & ~0x00000001);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker getDefaultInstanceForType() {
+ return org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker.getDefaultInstance();
+ }
+
+ public org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker build() {
+ org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ private org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker buildParsed()
+ throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+ org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(
+ result).asInvalidProtocolBufferException();
+ }
+ return result;
+ }
+
+ public org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker buildPartial() {
+ org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker result = org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker.RECYCLER.get();
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.messageId_ = messageId_;
+ result.bitField0_ = to_bitField0_;
+ return result;
+ }
+
+ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker other) {
+ if (other == org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker.getDefaultInstance()) return this;
+ if (other.hasMessageId()) {
+ mergeMessageId(other.getMessageId());
+ }
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasMessageId()) {
+
+ return false;
+ }
+ if (!getMessageId().isInitialized()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ throw new java.io.IOException("Merge from CodedInputStream is disabled");
+ }
+ public Builder mergeFrom(
+ org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ while (true) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+
+ return this;
+ default: {
+ if (!input.skipField(tag)) {
+
+ return this;
+ }
+ break;
+ }
+ case 10: {
+ org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData.newBuilder();
+ if (hasMessageId()) {
+ subBuilder.mergeFrom(getMessageId());
+ }
+ input.readMessage(subBuilder, extensionRegistry);
+ setMessageId(subBuilder.buildPartial());
+ subBuilder.recycle();
+ break;
+ }
+ }
+ }
+ }
+
+ private int bitField0_;
+
+ // required .pulsar.proto.MessageIdData message_id = 1;
+ private org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData messageId_ = org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData.getDefaultInstance();
+ public boolean hasMessageId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData getMessageId() {
+ return messageId_;
+ }
+ public Builder setMessageId(org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ messageId_ = value;
+
+ bitField0_ |= 0x00000001;
+ return this;
+ }
+ public Builder setMessageId(
+ org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData.Builder builderForValue) {
+ messageId_ = builderForValue.build();
+
+ bitField0_ |= 0x00000001;
+ return this;
+ }
+ public Builder mergeMessageId(org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData value) {
+ if (((bitField0_ & 0x00000001) == 0x00000001) &&
+ messageId_ != org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData.getDefaultInstance()) {
+ messageId_ =
+ org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData.newBuilder(messageId_).mergeFrom(value).buildPartial();
+ } else {
+ messageId_ = value;
+ }
+
+ bitField0_ |= 0x00000001;
+ return this;
+ }
+ public Builder clearMessageId() {
+ messageId_ = org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData.getDefaultInstance();
+
+ bitField0_ = (bitField0_ & ~0x00000001);
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:pulsar.proto.TxnCommitMarker)
+ }
+
+ static {
+ defaultInstance = new TxnCommitMarker(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:pulsar.proto.TxnCommitMarker)
+ }
+
static {
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
index c2c1f57..597ae9b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
@@ -28,6 +28,7 @@ import java.util.Optional;
import lombok.SneakyThrows;
import lombok.experimental.UtilityClass;
+import org.apache.pulsar.common.api.proto.PulsarMarkers;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ClusterMessageId;
@@ -262,8 +263,8 @@ public class Markers {
}
public static ByteBuf newTxnCommitMarker(long sequenceId, long txnMostBits,
- long txnLeastBits) {
- return newTxnMarker(MarkerType.TXN_COMMIT, sequenceId, txnMostBits, txnLeastBits);
+ long txnLeastBits, MessageIdData messageIdData) {
+ return newTxnMarker(MarkerType.TXN_COMMIT, sequenceId, txnMostBits, txnLeastBits, Optional.of(messageIdData));
}
public static boolean isTxnAbortMarker(MessageMetadata msgMetadata) {
@@ -274,11 +275,26 @@ public class Markers {
public static ByteBuf newTxnAbortMarker(long sequenceId, long txnMostBits,
long txnLeastBits) {
- return newTxnMarker(MarkerType.TXN_ABORT, sequenceId, txnMostBits, txnLeastBits);
+ return newTxnMarker(MarkerType.TXN_ABORT, sequenceId, txnMostBits, txnLeastBits, Optional.empty());
}
+ public static PulsarMarkers.TxnCommitMarker parseCommitMarker(ByteBuf payload) throws IOException {
+ ByteBufCodedInputStream inStream = ByteBufCodedInputStream.get(payload);
+
+ PulsarMarkers.TxnCommitMarker.Builder builder = null;
+
+ try {
+ builder = PulsarMarkers.TxnCommitMarker.newBuilder();
+ return builder.mergeFrom(inStream, null).build();
+ } finally {
+ builder.recycle();
+ inStream.recycle();
+ }
+ }
+
+ @SneakyThrows
private static ByteBuf newTxnMarker(MarkerType markerType, long sequenceId, long txnMostBits,
- long txnLeastBits) {
+ long txnLeastBits, Optional<MessageIdData> messageIdData) {
MessageMetadata.Builder msgMetadataBuilder = MessageMetadata.newBuilder();
msgMetadataBuilder.setPublishTime(System.currentTimeMillis());
msgMetadataBuilder.setProducerName("pulsar.txn.marker");
@@ -289,7 +305,18 @@ public class Markers {
MessageMetadata msgMetadata = msgMetadataBuilder.build();
- ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer();
+ ByteBuf payload;
+ if (messageIdData.isPresent()) {
+ PulsarMarkers.TxnCommitMarker commitMarker = PulsarMarkers.TxnCommitMarker.newBuilder()
+ .setMessageId(messageIdData.get())
+ .build();
+ int size = commitMarker.getSerializedSize();
+ payload = PooledByteBufAllocator.DEFAULT.buffer(size);
+ ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(payload);
+ commitMarker.writeTo(outStream);
+ } else {
+ payload = PooledByteBufAllocator.DEFAULT.buffer();
+ }
try {
return Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, msgMetadata, payload);
diff --git a/pulsar-common/src/main/proto/PulsarMarkers.proto b/pulsar-common/src/main/proto/PulsarMarkers.proto
index f86cba1..8e2ac21 100644
--- a/pulsar-common/src/main/proto/PulsarMarkers.proto
+++ b/pulsar-common/src/main/proto/PulsarMarkers.proto
@@ -80,3 +80,9 @@ message MessageIdData {
required uint64 ledger_id = 1;
required uint64 entry_id = 2;
}
+
+
+/// --- Transaction marker ---
+message TxnCommitMarker {
+ required MessageIdData message_id = 1;
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
index 795c8ca..1427f86 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
import io.netty.buffer.ByteBuf;
+import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
@@ -117,12 +118,13 @@ public class MarkersTest {
}
@Test
- public void testTxnCommitMarker() {
+ public void testTxnCommitMarker() throws IOException {
long sequenceId = 1L;
long mostBits = 1234L;
long leastBits = 2345L;
- ByteBuf buf = Markers.newTxnCommitMarker(sequenceId, mostBits, leastBits);
+ ByteBuf buf = Markers.newTxnCommitMarker(sequenceId, mostBits, leastBits,
+ MessageIdData.newBuilder().setLedgerId(10).setEntryId(11).build());
MessageMetadata msgMetadata = Commands.parseMessageMetadata(buf);
@@ -130,6 +132,10 @@ public class MarkersTest {
assertEquals(msgMetadata.getSequenceId(), sequenceId);
assertEquals(msgMetadata.getTxnidMostBits(), mostBits);
assertEquals(msgMetadata.getTxnidLeastBits(), leastBits);
+
+ PulsarMarkers.TxnCommitMarker marker = Markers.parseCommitMarker(buf);
+ assertEquals(marker.getMessageId().getLedgerId(), 10);
+ assertEquals(marker.getMessageId().getEntryId(), 11);
}
@Test