You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/06 03:32:21 UTC
[pulsar] branch master updated: [Transactoin] Add default handler
to handle transaction related commands (#4891)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 822d8ec [Transactoin] Add default handler to handle transaction related commands (#4891)
822d8ec is described below
commit 822d8ecacce9ac368250a863c35f238d5f40fda1
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Tue Aug 6 11:32:16 2019 +0800
[Transactoin] Add default handler to handle transaction related commands (#4891)
---
*Motivation*
Add default handler to handle transaction related commands.
---
.../pulsar/common/protocol/PulsarDecoder.java | 134 +++++++++++++++++++++
1 file changed, 134 insertions(+)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 9e45c67..904cd95 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -28,6 +28,10 @@ import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAddPartitionToTxn;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAddPartitionToTxnResponse;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAddSubscriptionToTxn;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAddSubscriptionToTxnResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
@@ -36,6 +40,12 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandEndTxn;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandEndTxnOnPartition;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandEndTxnOnPartitionResponse;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandEndTxnOnSubscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandEndTxnOnSubscriptionResponse;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandEndTxnResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
@@ -45,6 +55,8 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceR
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandNewTxn;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandNewTxnResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandPing;
@@ -312,6 +324,78 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
handleAuthResponse(cmd.getAuthResponse());
cmd.getAuthResponse().recycle();
break;
+
+ case NEW_TXN:
+ checkArgument(cmd.hasNewTxn());
+ handleNewTxn(cmd.getNewTxn());
+ cmd.getNewTxn().recycle();
+ break;
+
+ case NEW_TXN_RESPONSE:
+ checkArgument(cmd.hasNewTxnResponse());
+ handleNewTxnResponse(cmd.getNewTxnResponse());
+ cmd.getNewTxnResponse().recycle();
+ break;
+
+ case ADD_PARTITION_TO_TXN:
+ checkArgument(cmd.hasAddPartitionToTxn());
+ handleAddPartitionToTxn(cmd.getAddPartitionToTxn());
+ cmd.getAddPartitionToTxn().recycle();
+ break;
+
+ case ADD_PARTITION_TO_TXN_RESPONSE:
+ checkArgument(cmd.hasAddPartitionToTxnResponse());
+ handleAddPartitionToTxnResponse(cmd.getAddPartitionToTxnResponse());
+ cmd.getAddPartitionToTxnResponse().recycle();
+ break;
+
+ case ADD_SUBSCRIPTION_TO_TXN:
+ checkArgument(cmd.hasAddSubscriptionToTxn());
+ handleAddSubscriptionToTxn(cmd.getAddSubscriptionToTxn());
+ cmd.getAddSubscriptionToTxn().recycle();
+ break;
+
+ case ADD_SUBSCRIPTION_TO_TXN_RESPONSE:
+ checkArgument(cmd.hasAddSubscriptionToTxnResponse());
+ handleAddSubsciptionToTxnResponse(cmd.getAddSubscriptionToTxnResponse());
+ cmd.getAddSubscriptionToTxnResponse().recycle();
+ break;
+
+ case END_TXN:
+ checkArgument(cmd.hasEndTxn());
+ handleEndTxn(cmd.getEndTxn());
+ cmd.getEndTxn().recycle();
+ break;
+
+ case END_TXN_RESPONSE:
+ checkArgument(cmd.hasEndTxnResponse());
+ handleEndTxnResponse(cmd.getEndTxnResponse());
+ cmd.getEndTxnResponse().recycle();
+ break;
+
+ case END_TXN_ON_PARTITION:
+ checkArgument(cmd.hasEndTxnOnPartition());
+ handleEndTxnOnPartition(cmd.getEndTxnOnPartition());
+ cmd.getEndTxnOnPartition().recycle();
+ break;
+
+ case END_TXN_ON_PARTITION_RESPONSE:
+ checkArgument(cmd.hasEndTxnOnPartitionResponse());
+ handleEndTxnOnPartitionResponse(cmd.getEndTxnOnPartitionResponse());
+ cmd.getEndTxnOnPartitionResponse().recycle();
+ break;
+
+ case END_TXN_ON_SUBSCRIPTION:
+ checkArgument(cmd.hasEndTxnOnSubscription());
+ handleEndTxnOnSubscription(cmd.getEndTxnOnSubscription());
+ cmd.getEndTxnOnSubscription().recycle();
+ break;
+
+ case END_TXN_ON_SUBSCRIPTION_RESPONSE:
+ checkArgument(cmd.hasEndTxnOnSubscriptionResponse());
+ handleEndTxnOnsubscriptionResponse(cmd.getEndTxnOnSubscriptionResponse());
+ cmd.getEndTxnOnSubscriptionResponse().recycle();
+ break;
}
} finally {
if (cmdBuilder != null) {
@@ -471,5 +555,55 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
throw new UnsupportedOperationException();
}
+ protected void handleNewTxn(CommandNewTxn commandNewTxn) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleNewTxnResponse(CommandNewTxnResponse commandNewTxnResponse) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleAddPartitionToTxn(CommandAddPartitionToTxn commandAddPartitionToTxn) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleAddPartitionToTxnResponse(CommandAddPartitionToTxnResponse commandAddPartitionToTxnResponse) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn commandAddSubscriptionToTxn) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleAddSubsciptionToTxnResponse(
+ CommandAddSubscriptionToTxnResponse commandAddSubscriptionToTxnResponse) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleEndTxn(CommandEndTxn commandEndTxn) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleEndTxnResponse(CommandEndTxnResponse commandEndTxnResponse) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleEndTxnOnPartition(CommandEndTxnOnPartition commandEndTxnOnPartition) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleEndTxnOnPartitionResponse(CommandEndTxnOnPartitionResponse commandEndTxnOnPartitionResponse) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription commandEndTxnOnSubscription) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleEndTxnOnsubscriptionResponse(
+ CommandEndTxnOnSubscriptionResponse commandEndTxnOnSubscriptionResponse) {
+ throw new UnsupportedOperationException();
+ }
+
private static final Logger log = LoggerFactory.getLogger(PulsarDecoder.class);
}