You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/09/02 14:26:27 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #7856: [Transaction] Handle Acknowledge In The Transaction

codelipenghui commented on a change in pull request #7856:
URL: https://github.com/apache/pulsar/pull/7856#discussion_r482094605



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
##########
@@ -176,6 +178,30 @@ public void addPublishPartitionToTxn(TxnID txnID, List<String> partitions) throw
         return handler.addPublishPartitionToTxnAsync(txnID, partitions);
     }
 
+    @Override
+    public void addSubscriptionToTxn(TxnID txnID, String topic, String subscription)
+            throws TransactionCoordinatorClientException {
+        try {
+            addSubscriptionToTxnAsync(txnID, topic, subscription);

Review comment:
       The method will return before processing the add subscription to the txn, is this an expected behavior?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
##########
@@ -176,6 +178,30 @@ public void addPublishPartitionToTxn(TxnID txnID, List<String> partitions) throw
         return handler.addPublishPartitionToTxnAsync(txnID, partitions);
     }
 
+    @Override
+    public void addSubscriptionToTxn(TxnID txnID, String topic, String subscription)
+            throws TransactionCoordinatorClientException {
+        try {
+            addSubscriptionToTxnAsync(txnID, topic, subscription);
+        } catch (Exception e) {
+            throw TransactionCoordinatorClientException.unwrap(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> addSubscriptionToTxnAsync(TxnID txnID, String topic, String subscription) {
+        TransactionMetaStoreHandler handler = handlerMap.get(txnID.getMostSigBits());
+        if (handler == null) {
+            return FutureUtil.failedFuture(
+                    new TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException(txnID.getMostSigBits()));
+        }
+        PulsarApi.Subscription sub = PulsarApi.Subscription.newBuilder()
+                .setTopic(topic)
+                .setSubscription(subscription)
+                .build();
+        return handler.addSubscriptionToTxn(txnID, Lists.newArrayList(sub));

Review comment:
       The Singleton list is more suitable here. Or maybe you add a thread-local list to avoid introduce many list instance

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -1197,8 +1207,8 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
         // Reset txdID and position for cumulative ack.
         PENDING_CUMULATIVE_ACK_TXNID_UPDATER.set(this, null);
         POSITION_UPDATER.set(this, null);
-        dispatcher.redeliverUnacknowledgedMessages(consumer, (List<PositionImpl>)
-                                                                    (List<?>)pendingAckMessageForCurrentTxn.values());
+//        dispatcher.redeliverUnacknowledgedMessages(consumer, (List<PositionImpl>)
+//                                                                    (List<?>)pendingAckMessageForCurrentTxn.values());

Review comment:
       Is this a useful change?

##########
File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionSubscription.java
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import com.google.common.base.Objects;
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * A class for representing acked topic subscription info.
+ */
+@Data
+@Builder
+public class TransactionSubscription implements Comparable<TransactionSubscription> {

Review comment:
       There is the same struct in PulsarApi.proto named `Subscription`, maybe you can use that one directly.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -207,26 +208,39 @@ public void removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
 
     private CompletableFuture<Void> endToTB(TxnID txnID, TxnStatus newStatus) {
         CompletableFuture<Void> resultFuture = new CompletableFuture<>();
-        List<CompletableFuture<TxnID>> commitFutureList = new ArrayList<>();
+        List<CompletableFuture<TxnID>> completableFutureList = new ArrayList<>();
         this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
             if (throwable != null) {
                 resultFuture.completeExceptionally(throwable);
                 return;
             }
+
+            txnMeta.ackedPartitions().forEach(tbSub -> {
+                CompletableFuture<TxnID> commitFuture = new CompletableFuture<>();
+                if (TxnStatus.COMMITTING.equals(newStatus)) {
+                    commitFuture = tbClient.commitTxnOnSubscription(
+                            tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits());
+                } else if (TxnStatus.ABORTING.equals(newStatus)) {
+                    commitFuture = tbClient.abortTxnOnSubscription(
+                            tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits());
+                }
+                completableFutureList.add(commitFuture);
+            });
+
             txnMeta.producedPartitions().forEach(partition -> {
                 CompletableFuture<TxnID> commitFuture = new CompletableFuture<>();
                 if (TxnStatus.COMMITTING.equals(newStatus)) {
                     commitFuture = tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits());
+                    // TODO commitTxnOnSubscription
                 } else if (TxnStatus.ABORTING.equals(newStatus)) {
+                    // TODO abortTxnOnTopic
                     commitFuture.completeExceptionally(new Throwable("Unsupported operation."));
-                } else {
-                    // Unsupported txnStatus
-                    commitFuture.completeExceptionally(new Throwable("Unsupported txnStatus."));

Review comment:
       Why remove this branch?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org