You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/24 01:50:35 UTC

[pulsar] branch master updated: [Transaction][Buffer]Add new marker to show which message belongs to transaction (#4776)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e8a95e5  [Transaction][Buffer]Add new marker to show which message belongs to transaction (#4776)
e8a95e5 is described below

commit e8a95e5ba57107c1e08a2610b7c55361a09f812b
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Wed Jul 24 09:50:27 2019 +0800

    [Transaction][Buffer]Add new marker to show which message belongs to transaction (#4776)
    
    * [Transaction][Buffer]Add new marker to show which message belongs to transaction
    ---
    
    *Motivation*
    
    Add new message type in the transaction including data and commit and abort maker in the transaction log.
    
    *Modifications*
    
    Add two new types of transaction messages.
    TXN_COMMIT is the commit marker of the transaction.
    TXN_ABORT is the abort marker of the transaction.
---
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 114 +++++++++++++++++++++
 .../pulsar/common/api/proto/PulsarMarkers.java     |   6 ++
 .../org/apache/pulsar/common/protocol/Markers.java |  45 ++++++++
 pulsar-common/src/main/proto/PulsarApi.proto       |   4 +
 pulsar-common/src/main/proto/PulsarMarkers.proto   |   2 +
 .../apache/pulsar/common/protocol/MarkersTest.java |  33 ++++++
 6 files changed, 204 insertions(+)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 4c156e5..035f754 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -3103,6 +3103,14 @@ public final class PulsarApi {
     // optional int32 marker_type = 20;
     boolean hasMarkerType();
     int getMarkerType();
+    
+    // optional uint64 txnid_least_bits = 22 [default = 0];
+    boolean hasTxnidLeastBits();
+    long getTxnidLeastBits();
+    
+    // optional uint64 txnid_most_bits = 23 [default = 0];
+    boolean hasTxnidMostBits();
+    long getTxnidMostBits();
   }
   public static final class MessageMetadata extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -3443,6 +3451,26 @@ public final class PulsarApi {
       return markerType_;
     }
     
+    // optional uint64 txnid_least_bits = 22 [default = 0];
+    public static final int TXNID_LEAST_BITS_FIELD_NUMBER = 22;
+    private long txnidLeastBits_;
+    public boolean hasTxnidLeastBits() {
+      return ((bitField0_ & 0x00010000) == 0x00010000);
+    }
+    public long getTxnidLeastBits() {
+      return txnidLeastBits_;
+    }
+    
+    // optional uint64 txnid_most_bits = 23 [default = 0];
+    public static final int TXNID_MOST_BITS_FIELD_NUMBER = 23;
+    private long txnidMostBits_;
+    public boolean hasTxnidMostBits() {
+      return ((bitField0_ & 0x00020000) == 0x00020000);
+    }
+    public long getTxnidMostBits() {
+      return txnidMostBits_;
+    }
+    
     private void initFields() {
       producerName_ = "";
       sequenceId_ = 0L;
@@ -3463,6 +3491,8 @@ public final class PulsarApi {
       orderingKey_ = org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
       deliverAtTime_ = 0L;
       markerType_ = 0;
+      txnidLeastBits_ = 0L;
+      txnidMostBits_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -3562,6 +3592,12 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00008000) == 0x00008000)) {
         output.writeInt32(20, markerType_);
       }
+      if (((bitField0_ & 0x00010000) == 0x00010000)) {
+        output.writeUInt64(22, txnidLeastBits_);
+      }
+      if (((bitField0_ & 0x00020000) == 0x00020000)) {
+        output.writeUInt64(23, txnidMostBits_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -3651,6 +3687,14 @@ public final class PulsarApi {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeInt32Size(20, markerType_);
       }
+      if (((bitField0_ & 0x00010000) == 0x00010000)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(22, txnidLeastBits_);
+      }
+      if (((bitField0_ & 0x00020000) == 0x00020000)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(23, txnidMostBits_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -3802,6 +3846,10 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00020000);
         markerType_ = 0;
         bitField0_ = (bitField0_ & ~0x00040000);
+        txnidLeastBits_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00080000);
+        txnidMostBits_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00100000);
         return this;
       }
       
@@ -3915,6 +3963,14 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00008000;
         }
         result.markerType_ = markerType_;
+        if (((from_bitField0_ & 0x00080000) == 0x00080000)) {
+          to_bitField0_ |= 0x00010000;
+        }
+        result.txnidLeastBits_ = txnidLeastBits_;
+        if (((from_bitField0_ & 0x00100000) == 0x00100000)) {
+          to_bitField0_ |= 0x00020000;
+        }
+        result.txnidMostBits_ = txnidMostBits_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -3999,6 +4055,12 @@ public final class PulsarApi {
         if (other.hasMarkerType()) {
           setMarkerType(other.getMarkerType());
         }
+        if (other.hasTxnidLeastBits()) {
+          setTxnidLeastBits(other.getTxnidLeastBits());
+        }
+        if (other.hasTxnidMostBits()) {
+          setTxnidMostBits(other.getTxnidMostBits());
+        }
         return this;
       }
       
@@ -4153,6 +4215,16 @@ public final class PulsarApi {
               markerType_ = input.readInt32();
               break;
             }
+            case 176: {
+              bitField0_ |= 0x00080000;
+              txnidLeastBits_ = input.readUInt64();
+              break;
+            }
+            case 184: {
+              bitField0_ |= 0x00100000;
+              txnidMostBits_ = input.readUInt64();
+              break;
+            }
           }
         }
       }
@@ -4801,6 +4873,48 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional uint64 txnid_least_bits = 22 [default = 0];
+      private long txnidLeastBits_ ;
+      public boolean hasTxnidLeastBits() {
+        return ((bitField0_ & 0x00080000) == 0x00080000);
+      }
+      public long getTxnidLeastBits() {
+        return txnidLeastBits_;
+      }
+      public Builder setTxnidLeastBits(long value) {
+        bitField0_ |= 0x00080000;
+        txnidLeastBits_ = value;
+        
+        return this;
+      }
+      public Builder clearTxnidLeastBits() {
+        bitField0_ = (bitField0_ & ~0x00080000);
+        txnidLeastBits_ = 0L;
+        
+        return this;
+      }
+      
+      // optional uint64 txnid_most_bits = 23 [default = 0];
+      private long txnidMostBits_ ;
+      public boolean hasTxnidMostBits() {
+        return ((bitField0_ & 0x00100000) == 0x00100000);
+      }
+      public long getTxnidMostBits() {
+        return txnidMostBits_;
+      }
+      public Builder setTxnidMostBits(long value) {
+        bitField0_ |= 0x00100000;
+        txnidMostBits_ = value;
+        
+        return this;
+      }
+      public Builder clearTxnidMostBits() {
+        bitField0_ = (bitField0_ & ~0x00100000);
+        txnidMostBits_ = 0L;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.MessageMetadata)
     }
     
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java
index 6af8d41..dbab061 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java
@@ -15,6 +15,8 @@ public final class PulsarMarkers {
     REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE(2, 11),
     REPLICATED_SUBSCRIPTION_SNAPSHOT(3, 12),
     REPLICATED_SUBSCRIPTION_UPDATE(4, 13),
+    TXN_COMMIT(5, 20),
+    TXN_ABORT(6, 21),
     ;
     
     public static final int UNKNOWN_MARKER_VALUE = 0;
@@ -22,6 +24,8 @@ public final class PulsarMarkers {
     public static final int REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE_VALUE = 11;
     public static final int REPLICATED_SUBSCRIPTION_SNAPSHOT_VALUE = 12;
     public static final int REPLICATED_SUBSCRIPTION_UPDATE_VALUE = 13;
+    public static final int TXN_COMMIT_VALUE = 20;
+    public static final int TXN_ABORT_VALUE = 21;
     
     
     public final int getNumber() { return value; }
@@ -33,6 +37,8 @@ public final class PulsarMarkers {
         case 11: return REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE;
         case 12: return REPLICATED_SUBSCRIPTION_SNAPSHOT;
         case 13: return REPLICATED_SUBSCRIPTION_UPDATE;
+        case 20: return TXN_COMMIT;
+        case 21: return TXN_ABORT;
         default: return null;
       }
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
index 6d495be..c2c1f57 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
@@ -254,4 +254,49 @@ public class Markers {
             inStream.recycle();
         }
     }
+
+    public static boolean isTxnCommitMarker(MessageMetadata msgMetadata) {
+        return msgMetadata != null
+               && msgMetadata.hasMarkerType()
+               && msgMetadata.getMarkerType() == MarkerType.TXN_COMMIT_VALUE;
+    }
+
+    public static ByteBuf newTxnCommitMarker(long sequenceId, long txnMostBits,
+                                             long txnLeastBits) {
+        return newTxnMarker(MarkerType.TXN_COMMIT, sequenceId, txnMostBits, txnLeastBits);
+    }
+
+    public static boolean isTxnAbortMarker(MessageMetadata msgMetadata) {
+        return msgMetadata != null
+               && msgMetadata.hasMarkerType()
+               && msgMetadata.getMarkerType() == MarkerType.TXN_ABORT_VALUE;
+    }
+
+    public static ByteBuf newTxnAbortMarker(long sequenceId, long txnMostBits,
+                                            long txnLeastBits) {
+        return newTxnMarker(MarkerType.TXN_ABORT, sequenceId, txnMostBits, txnLeastBits);
+    }
+
+    private static ByteBuf newTxnMarker(MarkerType markerType, long sequenceId, long txnMostBits,
+                                        long txnLeastBits) {
+        MessageMetadata.Builder msgMetadataBuilder = MessageMetadata.newBuilder();
+        msgMetadataBuilder.setPublishTime(System.currentTimeMillis());
+        msgMetadataBuilder.setProducerName("pulsar.txn.marker");
+        msgMetadataBuilder.setSequenceId(sequenceId);
+        msgMetadataBuilder.setMarkerType(markerType.getNumber());
+        msgMetadataBuilder.setTxnidMostBits(txnMostBits);
+        msgMetadataBuilder.setTxnidLeastBits(txnLeastBits);
+
+        MessageMetadata msgMetadata = msgMetadataBuilder.build();
+
+        ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer();
+
+        try {
+            return Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, msgMetadata, payload);
+        } finally {
+            payload.release();
+            msgMetadata.recycle();
+            msgMetadataBuilder.recycle();
+        }
+    }
 }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index b9c7754..ce86f96 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -123,6 +123,10 @@ message MessageMetadata {
     // internal metadata instead of application published data.
     // Markers will generally not be propagated back to clients
     optional int32 marker_type = 20;
+
+    // transaction related message info
+    optional uint64 txnid_least_bits = 22 [default = 0];
+    optional uint64 txnid_most_bits = 23 [default = 0];
 }
 
 message SingleMessageMetadata {
diff --git a/pulsar-common/src/main/proto/PulsarMarkers.proto b/pulsar-common/src/main/proto/PulsarMarkers.proto
index 9243361..f86cba1 100644
--- a/pulsar-common/src/main/proto/PulsarMarkers.proto
+++ b/pulsar-common/src/main/proto/PulsarMarkers.proto
@@ -32,6 +32,8 @@ enum MarkerType {
     REPLICATED_SUBSCRIPTION_UPDATE            = 13;
 
     // Next markers start at 20
+    TXN_COMMIT = 20;
+    TXN_ABORT = 21;
 }
 
 /// --- Replicated subscriptions ---
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
index da0bb36..795c8ca 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.api.proto.PulsarMarkers;
 import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
 import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotRequest;
@@ -115,4 +116,36 @@ public class MarkersTest {
         assertEquals(snapshot.getClusters(1).getMessageId().getEntryId(), 11);
     }
 
+    @Test
+    public void testTxnCommitMarker() {
+        long sequenceId = 1L;
+        long mostBits = 1234L;
+        long leastBits = 2345L;
+
+        ByteBuf buf = Markers.newTxnCommitMarker(sequenceId, mostBits, leastBits);
+
+        MessageMetadata msgMetadata = Commands.parseMessageMetadata(buf);
+
+        assertEquals(msgMetadata.getMarkerType(), PulsarMarkers.MarkerType.TXN_COMMIT_VALUE);
+        assertEquals(msgMetadata.getSequenceId(), sequenceId);
+        assertEquals(msgMetadata.getTxnidMostBits(), mostBits);
+        assertEquals(msgMetadata.getTxnidLeastBits(), leastBits);
+    }
+
+    @Test
+    public void testTxnAbortMarker() {
+        long sequenceId = 1L;
+        long mostBits = 1234L;
+        long leastBits = 2345L;
+
+        ByteBuf buf = Markers.newTxnAbortMarker(sequenceId, mostBits, leastBits);
+
+        MessageMetadata msgMetadata = Commands.parseMessageMetadata(buf);
+
+        assertEquals(msgMetadata.getMarkerType(), PulsarMarkers.MarkerType.TXN_ABORT_VALUE);
+        assertEquals(msgMetadata.getSequenceId(), sequenceId);
+        assertEquals(msgMetadata.getTxnidMostBits(), mostBits);
+        assertEquals(msgMetadata.getTxnidLeastBits(), leastBits);
+    }
+
 }