You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/08/20 03:01:53 UTC

[GitHub] [pulsar] gaoran10 opened a new pull request #7856: [Transaction] Handle Acknowledge In The Transaction

gaoran10 opened a new pull request #7856:
URL: https://github.com/apache/pulsar/pull/7856


   Fix https://github.com/streamnative/pulsar/issues/1307
   
   Master Issue: #2664 
   
   ### Motivation
   
   Handle acknowledge in the transaction.
   
   ### Modifications
   
   1. Register subscription to the transaction metadata store.
   2. Make the ack command carry the transaction information.
   3. When committing a transaction, commit every subscription in the transaction.
   
   ### Verifying this change
   
   This change added tests and can be verified as follows:
   
   - *org.apache.pulsar.broker.transaction.TransactionProduceTest.ackCommitTest*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (yes)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #7856: [Transaction] Handle Acknowledge In The Transaction

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #7856:
URL: https://github.com/apache/pulsar/pull/7856#issuecomment-678019819


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #7856: [Transaction] Handle Acknowledge In The Transaction

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #7856:
URL: https://github.com/apache/pulsar/pull/7856#discussion_r482867197



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -1197,8 +1207,8 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
         // Reset txdID and position for cumulative ack.
         PENDING_CUMULATIVE_ACK_TXNID_UPDATER.set(this, null);
         POSITION_UPDATER.set(this, null);
-        dispatcher.redeliverUnacknowledgedMessages(consumer, (List<PositionImpl>)
-                                                                    (List<?>)pendingAckMessageForCurrentTxn.values());
+//        dispatcher.redeliverUnacknowledgedMessages(consumer, (List<PositionImpl>)
+//                                                                    (List<?>)pendingAckMessageForCurrentTxn.values());

Review comment:
       No, I'll revert this.

##########
File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionSubscription.java
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import com.google.common.base.Objects;
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * A class for representing acked topic subscription info.
+ */
+@Data
+@Builder
+public class TransactionSubscription implements Comparable<TransactionSubscription> {

Review comment:
       OK




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #7856: [Transaction] Handle Acknowledge In The Transaction

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #7856:
URL: https://github.com/apache/pulsar/pull/7856#issuecomment-686887142


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #7856: [Transaction] Handle Acknowledge In The Transaction

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #7856:
URL: https://github.com/apache/pulsar/pull/7856#issuecomment-684167305


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #7856: [Transaction] Handle Acknowledge In The Transaction

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #7856:
URL: https://github.com/apache/pulsar/pull/7856


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #7856: [Transaction] Handle Acknowledge In The Transaction

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #7856:
URL: https://github.com/apache/pulsar/pull/7856#discussion_r482867294



##########
File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionSubscription.java
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import com.google.common.base.Objects;
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * A class for representing acked topic subscription info.
+ */
+@Data
+@Builder
+public class TransactionSubscription implements Comparable<TransactionSubscription> {

Review comment:
       There is a problem, I can't override the `hashCode` method of the `Subscription` and it can't be used in a Set collection.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #7856: [Transaction] Handle Acknowledge In The Transaction

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #7856:
URL: https://github.com/apache/pulsar/pull/7856#discussion_r482848567



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
##########
@@ -176,6 +178,30 @@ public void addPublishPartitionToTxn(TxnID txnID, List<String> partitions) throw
         return handler.addPublishPartitionToTxnAsync(txnID, partitions);
     }
 
+    @Override
+    public void addSubscriptionToTxn(TxnID txnID, String topic, String subscription)
+            throws TransactionCoordinatorClientException {
+        try {
+            addSubscriptionToTxnAsync(txnID, topic, subscription);

Review comment:
       I miss calling the get().




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #7856: [Transaction] Handle Acknowledge In The Transaction

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #7856:
URL: https://github.com/apache/pulsar/pull/7856#discussion_r482094605



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
##########
@@ -176,6 +178,30 @@ public void addPublishPartitionToTxn(TxnID txnID, List<String> partitions) throw
         return handler.addPublishPartitionToTxnAsync(txnID, partitions);
     }
 
+    @Override
+    public void addSubscriptionToTxn(TxnID txnID, String topic, String subscription)
+            throws TransactionCoordinatorClientException {
+        try {
+            addSubscriptionToTxnAsync(txnID, topic, subscription);

Review comment:
       The method will return before processing the add subscription to the txn, is this an expected behavior?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
##########
@@ -176,6 +178,30 @@ public void addPublishPartitionToTxn(TxnID txnID, List<String> partitions) throw
         return handler.addPublishPartitionToTxnAsync(txnID, partitions);
     }
 
+    @Override
+    public void addSubscriptionToTxn(TxnID txnID, String topic, String subscription)
+            throws TransactionCoordinatorClientException {
+        try {
+            addSubscriptionToTxnAsync(txnID, topic, subscription);
+        } catch (Exception e) {
+            throw TransactionCoordinatorClientException.unwrap(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> addSubscriptionToTxnAsync(TxnID txnID, String topic, String subscription) {
+        TransactionMetaStoreHandler handler = handlerMap.get(txnID.getMostSigBits());
+        if (handler == null) {
+            return FutureUtil.failedFuture(
+                    new TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException(txnID.getMostSigBits()));
+        }
+        PulsarApi.Subscription sub = PulsarApi.Subscription.newBuilder()
+                .setTopic(topic)
+                .setSubscription(subscription)
+                .build();
+        return handler.addSubscriptionToTxn(txnID, Lists.newArrayList(sub));

Review comment:
       The Singleton list is more suitable here. Or maybe you add a thread-local list to avoid introduce many list instance

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -1197,8 +1207,8 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
         // Reset txdID and position for cumulative ack.
         PENDING_CUMULATIVE_ACK_TXNID_UPDATER.set(this, null);
         POSITION_UPDATER.set(this, null);
-        dispatcher.redeliverUnacknowledgedMessages(consumer, (List<PositionImpl>)
-                                                                    (List<?>)pendingAckMessageForCurrentTxn.values());
+//        dispatcher.redeliverUnacknowledgedMessages(consumer, (List<PositionImpl>)
+//                                                                    (List<?>)pendingAckMessageForCurrentTxn.values());

Review comment:
       Is this a useful change?

##########
File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionSubscription.java
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import com.google.common.base.Objects;
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * A class for representing acked topic subscription info.
+ */
+@Data
+@Builder
+public class TransactionSubscription implements Comparable<TransactionSubscription> {

Review comment:
       There is the same struct in PulsarApi.proto named `Subscription`, maybe you can use that one directly.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -207,26 +208,39 @@ public void removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
 
     private CompletableFuture<Void> endToTB(TxnID txnID, TxnStatus newStatus) {
         CompletableFuture<Void> resultFuture = new CompletableFuture<>();
-        List<CompletableFuture<TxnID>> commitFutureList = new ArrayList<>();
+        List<CompletableFuture<TxnID>> completableFutureList = new ArrayList<>();
         this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
             if (throwable != null) {
                 resultFuture.completeExceptionally(throwable);
                 return;
             }
+
+            txnMeta.ackedPartitions().forEach(tbSub -> {
+                CompletableFuture<TxnID> commitFuture = new CompletableFuture<>();
+                if (TxnStatus.COMMITTING.equals(newStatus)) {
+                    commitFuture = tbClient.commitTxnOnSubscription(
+                            tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits());
+                } else if (TxnStatus.ABORTING.equals(newStatus)) {
+                    commitFuture = tbClient.abortTxnOnSubscription(
+                            tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits());
+                }
+                completableFutureList.add(commitFuture);
+            });
+
             txnMeta.producedPartitions().forEach(partition -> {
                 CompletableFuture<TxnID> commitFuture = new CompletableFuture<>();
                 if (TxnStatus.COMMITTING.equals(newStatus)) {
                     commitFuture = tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits());
+                    // TODO commitTxnOnSubscription
                 } else if (TxnStatus.ABORTING.equals(newStatus)) {
+                    // TODO abortTxnOnTopic
                     commitFuture.completeExceptionally(new Throwable("Unsupported operation."));
-                } else {
-                    // Unsupported txnStatus
-                    commitFuture.completeExceptionally(new Throwable("Unsupported txnStatus."));

Review comment:
       Why remove this branch?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #7856: [Transaction] Handle Acknowledge In The Transaction

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #7856:
URL: https://github.com/apache/pulsar/pull/7856#discussion_r482867544



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -207,26 +208,39 @@ public void removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
 
     private CompletableFuture<Void> endToTB(TxnID txnID, TxnStatus newStatus) {
         CompletableFuture<Void> resultFuture = new CompletableFuture<>();
-        List<CompletableFuture<TxnID>> commitFutureList = new ArrayList<>();
+        List<CompletableFuture<TxnID>> completableFutureList = new ArrayList<>();
         this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
             if (throwable != null) {
                 resultFuture.completeExceptionally(throwable);
                 return;
             }
+
+            txnMeta.ackedPartitions().forEach(tbSub -> {
+                CompletableFuture<TxnID> commitFuture = new CompletableFuture<>();
+                if (TxnStatus.COMMITTING.equals(newStatus)) {
+                    commitFuture = tbClient.commitTxnOnSubscription(
+                            tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits());
+                } else if (TxnStatus.ABORTING.equals(newStatus)) {
+                    commitFuture = tbClient.abortTxnOnSubscription(
+                            tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits());
+                }
+                completableFutureList.add(commitFuture);
+            });
+
             txnMeta.producedPartitions().forEach(partition -> {
                 CompletableFuture<TxnID> commitFuture = new CompletableFuture<>();
                 if (TxnStatus.COMMITTING.equals(newStatus)) {
                     commitFuture = tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits());
+                    // TODO commitTxnOnSubscription
                 } else if (TxnStatus.ABORTING.equals(newStatus)) {
+                    // TODO abortTxnOnTopic
                     commitFuture.completeExceptionally(new Throwable("Unsupported operation."));
-                } else {
-                    // Unsupported txnStatus
-                    commitFuture.completeExceptionally(new Throwable("Unsupported txnStatus."));

Review comment:
       I'll revert this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org