You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/06 03:32:21 UTC

[pulsar] branch master updated: [Transactoin] Add default handler to handle transaction related commands (#4891)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 822d8ec  [Transactoin] Add default handler to handle transaction related commands (#4891)
822d8ec is described below

commit 822d8ecacce9ac368250a863c35f238d5f40fda1
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Tue Aug 6 11:32:16 2019 +0800

    [Transactoin] Add default handler to handle transaction related commands (#4891)
    
    ---
    
    *Motivation*
    
    Add default handler to handle transaction related commands.
---
 .../pulsar/common/protocol/PulsarDecoder.java      | 134 +++++++++++++++++++++
 1 file changed, 134 insertions(+)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 9e45c67..904cd95 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -28,6 +28,10 @@ import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAddPartitionToTxn;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAddPartitionToTxnResponse;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAddSubscriptionToTxn;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAddSubscriptionToTxnResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
@@ -36,6 +40,12 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandEndTxn;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandEndTxnOnPartition;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandEndTxnOnPartitionResponse;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandEndTxnOnSubscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandEndTxnOnSubscriptionResponse;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandEndTxnResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
@@ -45,6 +55,8 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceR
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandNewTxn;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandNewTxnResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadataResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandPing;
@@ -312,6 +324,78 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
                 handleAuthResponse(cmd.getAuthResponse());
                 cmd.getAuthResponse().recycle();
                 break;
+
+            case NEW_TXN:
+                checkArgument(cmd.hasNewTxn());
+                handleNewTxn(cmd.getNewTxn());
+                cmd.getNewTxn().recycle();
+                break;
+
+            case NEW_TXN_RESPONSE:
+                checkArgument(cmd.hasNewTxnResponse());
+                handleNewTxnResponse(cmd.getNewTxnResponse());
+                cmd.getNewTxnResponse().recycle();
+                break;
+
+            case ADD_PARTITION_TO_TXN:
+                checkArgument(cmd.hasAddPartitionToTxn());
+                handleAddPartitionToTxn(cmd.getAddPartitionToTxn());
+                cmd.getAddPartitionToTxn().recycle();
+                break;
+
+            case ADD_PARTITION_TO_TXN_RESPONSE:
+                checkArgument(cmd.hasAddPartitionToTxnResponse());
+                handleAddPartitionToTxnResponse(cmd.getAddPartitionToTxnResponse());
+                cmd.getAddPartitionToTxnResponse().recycle();
+                break;
+
+            case ADD_SUBSCRIPTION_TO_TXN:
+                checkArgument(cmd.hasAddSubscriptionToTxn());
+                handleAddSubscriptionToTxn(cmd.getAddSubscriptionToTxn());
+                cmd.getAddSubscriptionToTxn().recycle();
+                break;
+
+            case ADD_SUBSCRIPTION_TO_TXN_RESPONSE:
+                checkArgument(cmd.hasAddSubscriptionToTxnResponse());
+                handleAddSubsciptionToTxnResponse(cmd.getAddSubscriptionToTxnResponse());
+                cmd.getAddSubscriptionToTxnResponse().recycle();
+                break;
+
+            case END_TXN:
+                checkArgument(cmd.hasEndTxn());
+                handleEndTxn(cmd.getEndTxn());
+                cmd.getEndTxn().recycle();
+                break;
+
+            case END_TXN_RESPONSE:
+                checkArgument(cmd.hasEndTxnResponse());
+                handleEndTxnResponse(cmd.getEndTxnResponse());
+                cmd.getEndTxnResponse().recycle();
+                break;
+
+            case END_TXN_ON_PARTITION:
+                checkArgument(cmd.hasEndTxnOnPartition());
+                handleEndTxnOnPartition(cmd.getEndTxnOnPartition());
+                cmd.getEndTxnOnPartition().recycle();
+                break;
+
+            case END_TXN_ON_PARTITION_RESPONSE:
+                checkArgument(cmd.hasEndTxnOnPartitionResponse());
+                handleEndTxnOnPartitionResponse(cmd.getEndTxnOnPartitionResponse());
+                cmd.getEndTxnOnPartitionResponse().recycle();
+                break;
+
+            case END_TXN_ON_SUBSCRIPTION:
+                checkArgument(cmd.hasEndTxnOnSubscription());
+                handleEndTxnOnSubscription(cmd.getEndTxnOnSubscription());
+                cmd.getEndTxnOnSubscription().recycle();
+                break;
+
+            case END_TXN_ON_SUBSCRIPTION_RESPONSE:
+                checkArgument(cmd.hasEndTxnOnSubscriptionResponse());
+                handleEndTxnOnsubscriptionResponse(cmd.getEndTxnOnSubscriptionResponse());
+                cmd.getEndTxnOnSubscriptionResponse().recycle();
+                break;
             }
         } finally {
             if (cmdBuilder != null) {
@@ -471,5 +555,55 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
         throw new UnsupportedOperationException();
     }
 
+    protected void handleNewTxn(CommandNewTxn commandNewTxn) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleNewTxnResponse(CommandNewTxnResponse commandNewTxnResponse) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleAddPartitionToTxn(CommandAddPartitionToTxn commandAddPartitionToTxn) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleAddPartitionToTxnResponse(CommandAddPartitionToTxnResponse commandAddPartitionToTxnResponse) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn commandAddSubscriptionToTxn) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleAddSubsciptionToTxnResponse(
+        CommandAddSubscriptionToTxnResponse commandAddSubscriptionToTxnResponse) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleEndTxn(CommandEndTxn commandEndTxn) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleEndTxnResponse(CommandEndTxnResponse commandEndTxnResponse) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleEndTxnOnPartition(CommandEndTxnOnPartition commandEndTxnOnPartition) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleEndTxnOnPartitionResponse(CommandEndTxnOnPartitionResponse commandEndTxnOnPartitionResponse) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription commandEndTxnOnSubscription) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleEndTxnOnsubscriptionResponse(
+        CommandEndTxnOnSubscriptionResponse commandEndTxnOnSubscriptionResponse) {
+        throw new UnsupportedOperationException();
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PulsarDecoder.class);
 }