You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/24 01:50:35 UTC
[pulsar] branch master updated: [Transaction][Buffer]Add new marker
to show which message belongs to transaction (#4776)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e8a95e5 [Transaction][Buffer]Add new marker to show which message belongs to transaction (#4776)
e8a95e5 is described below
commit e8a95e5ba57107c1e08a2610b7c55361a09f812b
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Wed Jul 24 09:50:27 2019 +0800
[Transaction][Buffer]Add new marker to show which message belongs to transaction (#4776)
* [Transaction][Buffer]Add new marker to show which message belongs to transaction
---
*Motivation*
Add new message type in the transaction including data and commit and abort maker in the transaction log.
*Modifications*
Add two new types of transaction messages.
TXN_COMMIT is the commit marker of the transaction.
TXN_ABORT is the abort marker of the transaction.
---
.../apache/pulsar/common/api/proto/PulsarApi.java | 114 +++++++++++++++++++++
.../pulsar/common/api/proto/PulsarMarkers.java | 6 ++
.../org/apache/pulsar/common/protocol/Markers.java | 45 ++++++++
pulsar-common/src/main/proto/PulsarApi.proto | 4 +
pulsar-common/src/main/proto/PulsarMarkers.proto | 2 +
.../apache/pulsar/common/protocol/MarkersTest.java | 33 ++++++
6 files changed, 204 insertions(+)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 4c156e5..035f754 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -3103,6 +3103,14 @@ public final class PulsarApi {
// optional int32 marker_type = 20;
boolean hasMarkerType();
int getMarkerType();
+
+ // optional uint64 txnid_least_bits = 22 [default = 0];
+ boolean hasTxnidLeastBits();
+ long getTxnidLeastBits();
+
+ // optional uint64 txnid_most_bits = 23 [default = 0];
+ boolean hasTxnidMostBits();
+ long getTxnidMostBits();
}
public static final class MessageMetadata extends
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -3443,6 +3451,26 @@ public final class PulsarApi {
return markerType_;
}
+ // optional uint64 txnid_least_bits = 22 [default = 0];
+ public static final int TXNID_LEAST_BITS_FIELD_NUMBER = 22;
+ private long txnidLeastBits_;
+ public boolean hasTxnidLeastBits() {
+ return ((bitField0_ & 0x00010000) == 0x00010000);
+ }
+ public long getTxnidLeastBits() {
+ return txnidLeastBits_;
+ }
+
+ // optional uint64 txnid_most_bits = 23 [default = 0];
+ public static final int TXNID_MOST_BITS_FIELD_NUMBER = 23;
+ private long txnidMostBits_;
+ public boolean hasTxnidMostBits() {
+ return ((bitField0_ & 0x00020000) == 0x00020000);
+ }
+ public long getTxnidMostBits() {
+ return txnidMostBits_;
+ }
+
private void initFields() {
producerName_ = "";
sequenceId_ = 0L;
@@ -3463,6 +3491,8 @@ public final class PulsarApi {
orderingKey_ = org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
deliverAtTime_ = 0L;
markerType_ = 0;
+ txnidLeastBits_ = 0L;
+ txnidMostBits_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -3562,6 +3592,12 @@ public final class PulsarApi {
if (((bitField0_ & 0x00008000) == 0x00008000)) {
output.writeInt32(20, markerType_);
}
+ if (((bitField0_ & 0x00010000) == 0x00010000)) {
+ output.writeUInt64(22, txnidLeastBits_);
+ }
+ if (((bitField0_ & 0x00020000) == 0x00020000)) {
+ output.writeUInt64(23, txnidMostBits_);
+ }
}
private int memoizedSerializedSize = -1;
@@ -3651,6 +3687,14 @@ public final class PulsarApi {
size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
.computeInt32Size(20, markerType_);
}
+ if (((bitField0_ & 0x00010000) == 0x00010000)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeUInt64Size(22, txnidLeastBits_);
+ }
+ if (((bitField0_ & 0x00020000) == 0x00020000)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeUInt64Size(23, txnidMostBits_);
+ }
memoizedSerializedSize = size;
return size;
}
@@ -3802,6 +3846,10 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00020000);
markerType_ = 0;
bitField0_ = (bitField0_ & ~0x00040000);
+ txnidLeastBits_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00080000);
+ txnidMostBits_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00100000);
return this;
}
@@ -3915,6 +3963,14 @@ public final class PulsarApi {
to_bitField0_ |= 0x00008000;
}
result.markerType_ = markerType_;
+ if (((from_bitField0_ & 0x00080000) == 0x00080000)) {
+ to_bitField0_ |= 0x00010000;
+ }
+ result.txnidLeastBits_ = txnidLeastBits_;
+ if (((from_bitField0_ & 0x00100000) == 0x00100000)) {
+ to_bitField0_ |= 0x00020000;
+ }
+ result.txnidMostBits_ = txnidMostBits_;
result.bitField0_ = to_bitField0_;
return result;
}
@@ -3999,6 +4055,12 @@ public final class PulsarApi {
if (other.hasMarkerType()) {
setMarkerType(other.getMarkerType());
}
+ if (other.hasTxnidLeastBits()) {
+ setTxnidLeastBits(other.getTxnidLeastBits());
+ }
+ if (other.hasTxnidMostBits()) {
+ setTxnidMostBits(other.getTxnidMostBits());
+ }
return this;
}
@@ -4153,6 +4215,16 @@ public final class PulsarApi {
markerType_ = input.readInt32();
break;
}
+ case 176: {
+ bitField0_ |= 0x00080000;
+ txnidLeastBits_ = input.readUInt64();
+ break;
+ }
+ case 184: {
+ bitField0_ |= 0x00100000;
+ txnidMostBits_ = input.readUInt64();
+ break;
+ }
}
}
}
@@ -4801,6 +4873,48 @@ public final class PulsarApi {
return this;
}
+ // optional uint64 txnid_least_bits = 22 [default = 0];
+ private long txnidLeastBits_ ;
+ public boolean hasTxnidLeastBits() {
+ return ((bitField0_ & 0x00080000) == 0x00080000);
+ }
+ public long getTxnidLeastBits() {
+ return txnidLeastBits_;
+ }
+ public Builder setTxnidLeastBits(long value) {
+ bitField0_ |= 0x00080000;
+ txnidLeastBits_ = value;
+
+ return this;
+ }
+ public Builder clearTxnidLeastBits() {
+ bitField0_ = (bitField0_ & ~0x00080000);
+ txnidLeastBits_ = 0L;
+
+ return this;
+ }
+
+ // optional uint64 txnid_most_bits = 23 [default = 0];
+ private long txnidMostBits_ ;
+ public boolean hasTxnidMostBits() {
+ return ((bitField0_ & 0x00100000) == 0x00100000);
+ }
+ public long getTxnidMostBits() {
+ return txnidMostBits_;
+ }
+ public Builder setTxnidMostBits(long value) {
+ bitField0_ |= 0x00100000;
+ txnidMostBits_ = value;
+
+ return this;
+ }
+ public Builder clearTxnidMostBits() {
+ bitField0_ = (bitField0_ & ~0x00100000);
+ txnidMostBits_ = 0L;
+
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:pulsar.proto.MessageMetadata)
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java
index 6af8d41..dbab061 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java
@@ -15,6 +15,8 @@ public final class PulsarMarkers {
REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE(2, 11),
REPLICATED_SUBSCRIPTION_SNAPSHOT(3, 12),
REPLICATED_SUBSCRIPTION_UPDATE(4, 13),
+ TXN_COMMIT(5, 20),
+ TXN_ABORT(6, 21),
;
public static final int UNKNOWN_MARKER_VALUE = 0;
@@ -22,6 +24,8 @@ public final class PulsarMarkers {
public static final int REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE_VALUE = 11;
public static final int REPLICATED_SUBSCRIPTION_SNAPSHOT_VALUE = 12;
public static final int REPLICATED_SUBSCRIPTION_UPDATE_VALUE = 13;
+ public static final int TXN_COMMIT_VALUE = 20;
+ public static final int TXN_ABORT_VALUE = 21;
public final int getNumber() { return value; }
@@ -33,6 +37,8 @@ public final class PulsarMarkers {
case 11: return REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE;
case 12: return REPLICATED_SUBSCRIPTION_SNAPSHOT;
case 13: return REPLICATED_SUBSCRIPTION_UPDATE;
+ case 20: return TXN_COMMIT;
+ case 21: return TXN_ABORT;
default: return null;
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
index 6d495be..c2c1f57 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
@@ -254,4 +254,49 @@ public class Markers {
inStream.recycle();
}
}
+
+ public static boolean isTxnCommitMarker(MessageMetadata msgMetadata) {
+ return msgMetadata != null
+ && msgMetadata.hasMarkerType()
+ && msgMetadata.getMarkerType() == MarkerType.TXN_COMMIT_VALUE;
+ }
+
+ public static ByteBuf newTxnCommitMarker(long sequenceId, long txnMostBits,
+ long txnLeastBits) {
+ return newTxnMarker(MarkerType.TXN_COMMIT, sequenceId, txnMostBits, txnLeastBits);
+ }
+
+ public static boolean isTxnAbortMarker(MessageMetadata msgMetadata) {
+ return msgMetadata != null
+ && msgMetadata.hasMarkerType()
+ && msgMetadata.getMarkerType() == MarkerType.TXN_ABORT_VALUE;
+ }
+
+ public static ByteBuf newTxnAbortMarker(long sequenceId, long txnMostBits,
+ long txnLeastBits) {
+ return newTxnMarker(MarkerType.TXN_ABORT, sequenceId, txnMostBits, txnLeastBits);
+ }
+
+ private static ByteBuf newTxnMarker(MarkerType markerType, long sequenceId, long txnMostBits,
+ long txnLeastBits) {
+ MessageMetadata.Builder msgMetadataBuilder = MessageMetadata.newBuilder();
+ msgMetadataBuilder.setPublishTime(System.currentTimeMillis());
+ msgMetadataBuilder.setProducerName("pulsar.txn.marker");
+ msgMetadataBuilder.setSequenceId(sequenceId);
+ msgMetadataBuilder.setMarkerType(markerType.getNumber());
+ msgMetadataBuilder.setTxnidMostBits(txnMostBits);
+ msgMetadataBuilder.setTxnidLeastBits(txnLeastBits);
+
+ MessageMetadata msgMetadata = msgMetadataBuilder.build();
+
+ ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer();
+
+ try {
+ return Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, msgMetadata, payload);
+ } finally {
+ payload.release();
+ msgMetadata.recycle();
+ msgMetadataBuilder.recycle();
+ }
+ }
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index b9c7754..ce86f96 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -123,6 +123,10 @@ message MessageMetadata {
// internal metadata instead of application published data.
// Markers will generally not be propagated back to clients
optional int32 marker_type = 20;
+
+ // transaction related message info
+ optional uint64 txnid_least_bits = 22 [default = 0];
+ optional uint64 txnid_most_bits = 23 [default = 0];
}
message SingleMessageMetadata {
diff --git a/pulsar-common/src/main/proto/PulsarMarkers.proto b/pulsar-common/src/main/proto/PulsarMarkers.proto
index 9243361..f86cba1 100644
--- a/pulsar-common/src/main/proto/PulsarMarkers.proto
+++ b/pulsar-common/src/main/proto/PulsarMarkers.proto
@@ -32,6 +32,8 @@ enum MarkerType {
REPLICATED_SUBSCRIPTION_UPDATE = 13;
// Next markers start at 20
+ TXN_COMMIT = 20;
+ TXN_ABORT = 21;
}
/// --- Replicated subscriptions ---
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
index da0bb36..795c8ca 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.api.proto.PulsarMarkers;
import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotRequest;
@@ -115,4 +116,36 @@ public class MarkersTest {
assertEquals(snapshot.getClusters(1).getMessageId().getEntryId(), 11);
}
+ @Test
+ public void testTxnCommitMarker() {
+ long sequenceId = 1L;
+ long mostBits = 1234L;
+ long leastBits = 2345L;
+
+ ByteBuf buf = Markers.newTxnCommitMarker(sequenceId, mostBits, leastBits);
+
+ MessageMetadata msgMetadata = Commands.parseMessageMetadata(buf);
+
+ assertEquals(msgMetadata.getMarkerType(), PulsarMarkers.MarkerType.TXN_COMMIT_VALUE);
+ assertEquals(msgMetadata.getSequenceId(), sequenceId);
+ assertEquals(msgMetadata.getTxnidMostBits(), mostBits);
+ assertEquals(msgMetadata.getTxnidLeastBits(), leastBits);
+ }
+
+ @Test
+ public void testTxnAbortMarker() {
+ long sequenceId = 1L;
+ long mostBits = 1234L;
+ long leastBits = 2345L;
+
+ ByteBuf buf = Markers.newTxnAbortMarker(sequenceId, mostBits, leastBits);
+
+ MessageMetadata msgMetadata = Commands.parseMessageMetadata(buf);
+
+ assertEquals(msgMetadata.getMarkerType(), PulsarMarkers.MarkerType.TXN_ABORT_VALUE);
+ assertEquals(msgMetadata.getSequenceId(), sequenceId);
+ assertEquals(msgMetadata.getTxnidMostBits(), mostBits);
+ assertEquals(msgMetadata.getTxnidLeastBits(), leastBits);
+ }
+
}