You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/30 14:48:18 UTC

[pulsar] branch master updated: [Transaction][buffer] Add data ledger position in txn commit marker (#4826)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a06c97  [Transaction][buffer] Add data ledger position in txn commit marker (#4826)
3a06c97 is described below

commit 3a06c97a10cc4dcf42b62f41a6add0d6ffa01ca4
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Tue Jul 30 22:48:13 2019 +0800

    [Transaction][buffer] Add data ledger position in txn commit marker (#4826)
    
    * [Transaction][buffer] Add data ledger position in txn commit marker
    ---
    
    *Motivation*
    
    Add the data ledger position into the commit marker of the transaction log.
    
    *Modifications*
    
    Create a new TxnCommitMarker in the proto.
---
 .../pulsar/common/api/proto/PulsarMarkers.java     | 363 +++++++++++++++++++++
 .../org/apache/pulsar/common/protocol/Markers.java |  37 ++-
 pulsar-common/src/main/proto/PulsarMarkers.proto   |   6 +
 .../apache/pulsar/common/protocol/MarkersTest.java |  10 +-
 4 files changed, 409 insertions(+), 7 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java
index dbab061..5ef2434 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java
@@ -2986,6 +2986,369 @@ public final class PulsarMarkers {
     // @@protoc_insertion_point(class_scope:pulsar.proto.MessageIdData)
   }
   
+  public interface TxnCommitMarkerOrBuilder
+      extends org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
+    
+    // required .pulsar.proto.MessageIdData message_id = 1;
+    boolean hasMessageId();
+    org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData getMessageId();
+  }
+  public static final class TxnCommitMarker extends
+      org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
+      implements TxnCommitMarkerOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
+    // Use TxnCommitMarker.newBuilder() to construct.
+    private io.netty.util.Recycler.Handle handle;
+    private TxnCommitMarker(io.netty.util.Recycler.Handle handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<TxnCommitMarker> RECYCLER = new io.netty.util.Recycler<TxnCommitMarker>() {
+            protected TxnCommitMarker newObject(Handle handle) {
+              return new TxnCommitMarker(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            if (handle != null) { RECYCLER.recycle(this, handle); }
+        }
+         
+    private TxnCommitMarker(boolean noInit) {}
+    
+    private static final TxnCommitMarker defaultInstance;
+    public static TxnCommitMarker getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public TxnCommitMarker getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required .pulsar.proto.MessageIdData message_id = 1;
+    public static final int MESSAGE_ID_FIELD_NUMBER = 1;
+    private org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData messageId_;
+    public boolean hasMessageId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData getMessageId() {
+      return messageId_;
+    }
+    
+    private void initFields() {
+      messageId_ = org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData.getDefaultInstance();
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasMessageId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!getMessageId().isInitialized()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeMessage(1, messageId_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeMessageSize(1, messageId_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseFrom(byte[] data)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseFrom(
+        byte[] data,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseFrom(
+        java.io.InputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseDelimitedFrom(
+        java.io.InputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite.Builder<
+          org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker, Builder>
+        implements org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarkerOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
+      // Construct using org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker.newBuilder()
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                if (handle != null) {RECYCLER.recycle(this, handle);}
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        messageId_ = org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker getDefaultInstanceForType() {
+        return org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker.getDefaultInstance();
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker build() {
+        org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker buildParsed()
+          throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+        org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker buildPartial() {
+        org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker result = org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.messageId_ = messageId_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker other) {
+        if (other == org.apache.pulsar.common.api.proto.PulsarMarkers.TxnCommitMarker.getDefaultInstance()) return this;
+        if (other.hasMessageId()) {
+          mergeMessageId(other.getMessageId());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasMessageId()) {
+          
+          return false;
+        }
+        if (!getMessageId().isInitialized()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+                              org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 10: {
+              org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData.newBuilder();
+              if (hasMessageId()) {
+                subBuilder.mergeFrom(getMessageId());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setMessageId(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required .pulsar.proto.MessageIdData message_id = 1;
+      private org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData messageId_ = org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData.getDefaultInstance();
+      public boolean hasMessageId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData getMessageId() {
+        return messageId_;
+      }
+      public Builder setMessageId(org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        messageId_ = value;
+        
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder setMessageId(
+          org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData.Builder builderForValue) {
+        messageId_ = builderForValue.build();
+        
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder mergeMessageId(org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData value) {
+        if (((bitField0_ & 0x00000001) == 0x00000001) &&
+            messageId_ != org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData.getDefaultInstance()) {
+          messageId_ =
+            org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData.newBuilder(messageId_).mergeFrom(value).buildPartial();
+        } else {
+          messageId_ = value;
+        }
+        
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder clearMessageId() {
+        messageId_ = org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.proto.TxnCommitMarker)
+    }
+    
+    static {
+      defaultInstance = new TxnCommitMarker(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.proto.TxnCommitMarker)
+  }
+  
   
   static {
   }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
index c2c1f57..597ae9b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
@@ -28,6 +28,7 @@ import java.util.Optional;
 import lombok.SneakyThrows;
 import lombok.experimental.UtilityClass;
 
+import org.apache.pulsar.common.api.proto.PulsarMarkers;
 import org.apache.pulsar.common.protocol.Commands.ChecksumType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarMarkers.ClusterMessageId;
@@ -262,8 +263,8 @@ public class Markers {
     }
 
     public static ByteBuf newTxnCommitMarker(long sequenceId, long txnMostBits,
-                                             long txnLeastBits) {
-        return newTxnMarker(MarkerType.TXN_COMMIT, sequenceId, txnMostBits, txnLeastBits);
+                                             long txnLeastBits, MessageIdData messageIdData) {
+        return newTxnMarker(MarkerType.TXN_COMMIT, sequenceId, txnMostBits, txnLeastBits, Optional.of(messageIdData));
     }
 
     public static boolean isTxnAbortMarker(MessageMetadata msgMetadata) {
@@ -274,11 +275,26 @@ public class Markers {
 
     public static ByteBuf newTxnAbortMarker(long sequenceId, long txnMostBits,
                                             long txnLeastBits) {
-        return newTxnMarker(MarkerType.TXN_ABORT, sequenceId, txnMostBits, txnLeastBits);
+        return newTxnMarker(MarkerType.TXN_ABORT, sequenceId, txnMostBits, txnLeastBits, Optional.empty());
     }
 
+    public static PulsarMarkers.TxnCommitMarker parseCommitMarker(ByteBuf payload) throws IOException {
+        ByteBufCodedInputStream inStream = ByteBufCodedInputStream.get(payload);
+
+        PulsarMarkers.TxnCommitMarker.Builder builder = null;
+
+        try {
+            builder = PulsarMarkers.TxnCommitMarker.newBuilder();
+            return builder.mergeFrom(inStream, null).build();
+        } finally {
+            builder.recycle();
+            inStream.recycle();
+        }
+    }
+
+    @SneakyThrows
     private static ByteBuf newTxnMarker(MarkerType markerType, long sequenceId, long txnMostBits,
-                                        long txnLeastBits) {
+                                        long txnLeastBits, Optional<MessageIdData> messageIdData) {
         MessageMetadata.Builder msgMetadataBuilder = MessageMetadata.newBuilder();
         msgMetadataBuilder.setPublishTime(System.currentTimeMillis());
         msgMetadataBuilder.setProducerName("pulsar.txn.marker");
@@ -289,7 +305,18 @@ public class Markers {
 
         MessageMetadata msgMetadata = msgMetadataBuilder.build();
 
-        ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer();
+        ByteBuf payload;
+        if (messageIdData.isPresent()) {
+            PulsarMarkers.TxnCommitMarker commitMarker = PulsarMarkers.TxnCommitMarker.newBuilder()
+                                                                                      .setMessageId(messageIdData.get())
+                                                                                      .build();
+            int size = commitMarker.getSerializedSize();
+            payload = PooledByteBufAllocator.DEFAULT.buffer(size);
+            ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(payload);
+            commitMarker.writeTo(outStream);
+        } else {
+            payload = PooledByteBufAllocator.DEFAULT.buffer();
+        }
 
         try {
             return Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, msgMetadata, payload);
diff --git a/pulsar-common/src/main/proto/PulsarMarkers.proto b/pulsar-common/src/main/proto/PulsarMarkers.proto
index f86cba1..8e2ac21 100644
--- a/pulsar-common/src/main/proto/PulsarMarkers.proto
+++ b/pulsar-common/src/main/proto/PulsarMarkers.proto
@@ -80,3 +80,9 @@ message MessageIdData {
 	required uint64 ledger_id = 1;
 	required uint64 entry_id  = 2;
 }
+
+
+/// --- Transaction marker ---
+message TxnCommitMarker {
+    required MessageIdData message_id = 1;
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
index 795c8ca..1427f86 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
 
 import io.netty.buffer.ByteBuf;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -117,12 +118,13 @@ public class MarkersTest {
     }
 
     @Test
-    public void testTxnCommitMarker() {
+    public void testTxnCommitMarker() throws IOException {
         long sequenceId = 1L;
         long mostBits = 1234L;
         long leastBits = 2345L;
 
-        ByteBuf buf = Markers.newTxnCommitMarker(sequenceId, mostBits, leastBits);
+        ByteBuf buf = Markers.newTxnCommitMarker(sequenceId, mostBits, leastBits,
+                                                 MessageIdData.newBuilder().setLedgerId(10).setEntryId(11).build());
 
         MessageMetadata msgMetadata = Commands.parseMessageMetadata(buf);
 
@@ -130,6 +132,10 @@ public class MarkersTest {
         assertEquals(msgMetadata.getSequenceId(), sequenceId);
         assertEquals(msgMetadata.getTxnidMostBits(), mostBits);
         assertEquals(msgMetadata.getTxnidLeastBits(), leastBits);
+
+        PulsarMarkers.TxnCommitMarker marker = Markers.parseCommitMarker(buf);
+        assertEquals(marker.getMessageId().getLedgerId(), 10);
+        assertEquals(marker.getMessageId().getEntryId(), 11);
     }
 
     @Test