You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/02 08:18:56 UTC

[GitHub] [pulsar] congbobo184 commented on a change in pull request #11357: [Transaction] Transaction coordinator fence mechanism.

congbobo184 commented on a change in pull request #11357:
URL: https://github.com/apache/pulsar/pull/11357#discussion_r700846016



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -400,13 +432,14 @@ public void endTransactionForTimeout(TxnID txnID) {
     }
 
     private static boolean isRetryableException(Throwable e) {
-        return e instanceof TransactionMetadataStoreStateException
+        return (e instanceof TransactionMetadataStoreStateException
                 || e instanceof RequestTimeoutException
                 || e instanceof ManagedLedgerException
                 || e instanceof BrokerPersistenceException
                 || e instanceof LookupException
                 || e instanceof ReachMaxPendingOpsException
-                || e instanceof ConnectException;
+                || e instanceof ConnectException)
+                && !(e instanceof ManagedLedgerException.ManagedLedgerFencedException);

Review comment:
       it is ManagedLedgerException.ManagedLedgerNotFoundException and many more

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -84,94 +91,111 @@ public TransactionMetadataStoreService(TransactionMetadataStoreProvider transact
         this.tbClient = tbClient;
         this.timeoutTrackerFactory = new TransactionTimeoutTrackerFactoryImpl(this, timer);
         this.transactionOpRetryTimer = timer;
+        this.tcLoadSemaphores = new ConcurrentLongHashMap<>();
+        this.pendingConnectRequests = new ConcurrentLongHashMap<>();
     }
 
-    public void start() {
-        pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() {
-            @Override
-            public void onLoad(NamespaceBundle bundle) {
-                pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
-                    .whenComplete((topics, ex) -> {
-                        if (ex == null) {
-                            for (String topic : topics) {
-                                TopicName name = TopicName.get(topic);
-                                if (TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
-                                        .equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
-                                        && name.isPartitioned()) {
-                                    addTransactionMetadataStore(TransactionCoordinatorID.get(name.getPartitionIndex()));
-                                }
+    public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tcId) {
+        if (stores.get(tcId) != null) {
+            return CompletableFuture.completedFuture(null);
+        } else {
+            return pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
+                    .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString()).thenCompose(v -> {
+                        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+                final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
+                        .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1));
+                Deque<CompletableFuture<Void>> deque = pendingConnectRequests
+                        .computeIfAbsent(tcId.getId(), (id) -> new ConcurrentLinkedDeque<>());
+                if (tcLoadSemaphore.tryAcquire()) {
+                    // when tcLoadSemaphore.release(), this command will acquire semaphore, so we should jude the store
+                    // exist again.
+                    if (stores.get(tcId) != null) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+
+                    openTransactionMetadataStore(tcId).thenAccept((store) -> {
+                        stores.put(tcId, store);
+                        LOG.info("Added new transaction meta store {}", tcId);
+                        while (true) {
+                            CompletableFuture<Void> future = deque.poll();
+                            if (future != null) {
+                                // complete queue request future
+                                future.complete(null);
+                            } else {
+                                break;
                             }
-                        } else {
-                            LOG.error("Failed to get owned topic list when triggering on-loading bundle {}.",
-                                    bundle, ex);
                         }
-                    });
-            }
-            @Override
-            public void unLoad(NamespaceBundle bundle) {
-                pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
-                    .whenComplete((topics, ex) -> {
-                        if (ex == null) {
-                            for (String topic : topics) {
-                                TopicName name = TopicName.get(topic);
-                                if (TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
-                                        .equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
-                                        && name.isPartitioned()) {
-                                    removeTransactionMetadataStore(
-                                            TransactionCoordinatorID.get(name.getPartitionIndex()));
-                                }
+
+                        completableFuture.complete(null);
+                        tcLoadSemaphore.release();
+                    }).exceptionally(e -> {
+                        completableFuture.completeExceptionally(e.getCause());
+                        // release before handle request queue, in order to client reconnect infinite loop
+                        tcLoadSemaphore.release();
+
+                        while (true) {
+                            CompletableFuture<Void> future = deque.poll();
+                            if (future != null) {
+                                // this means that this tc client connection connect fail
+                                future.completeExceptionally(e);
+                            } else {
+                                break;
                             }
-                        } else {
-                            LOG.error("Failed to get owned topic list error when triggering un-loading bundle {}.",
-                                    bundle, ex);
                         }
-                     });
-            }
-            @Override
-            public boolean test(NamespaceBundle namespaceBundle) {
-                return namespaceBundle.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE);
-            }
-        });
-    }
-
-    public void addTransactionMetadataStore(TransactionCoordinatorID tcId) {
-        pulsarService.getBrokerService()
-                .getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX + tcId))
-                .whenComplete((v, e) -> {
-                    if (e != null) {
                         LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e);
-                    } else {
-                        TransactionTimeoutTracker timeoutTracker = timeoutTrackerFactory.newTracker(tcId);
-                        TransactionRecoverTracker recoverTracker =
-                                new TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
-                                        timeoutTracker, tcId.getId());
-                        transactionMetadataStoreProvider.openStore(tcId, pulsarService.getManagedLedgerFactory(), v,
-                                timeoutTracker, recoverTracker)
-                                .whenComplete((store, ex) -> {
-                                    if (ex != null) {
-                                        LOG.error("Add transaction metadata store with id {} error", tcId.getId(), ex);
-                                    } else {
-                                        stores.put(tcId, store);
-                                        LOG.info("Added new transaction meta store {}", tcId);
-                                    }
-                                });
-                    }
-        });
-    }
-
-    public void removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
-        TransactionMetadataStore metadataStore = stores.remove(tcId);
-        if (metadataStore != null) {
-            metadataStore.closeAsync().whenComplete((v, ex) -> {
-                if (ex != null) {
-                    LOG.error("Close transaction metadata store with id " + tcId, ex);
+                        return null;
+                    });
                 } else {
-                    LOG.info("Removed and closed transaction meta store {}", tcId);
+                    // only one command can open transaction metadata store,
+                    // other will be added to the deque, when the op of openTransactionMetadataStore finished
+                    // then handle the requests witch in the queue
+                    deque.add(completableFuture);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Handle tc client connect added into pending queue! tcId : {}", tcId.toString());
+                    }
                 }
+                return completableFuture;
             });
         }
     }
 
+    public CompletableFuture<TransactionMetadataStore> openTransactionMetadataStore(TransactionCoordinatorID tcId) {
+        return pulsarService.getBrokerService()
+                .getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl
+                        .TRANSACTION_LOG_PREFIX + tcId)).thenCompose(v -> {
+                            TransactionTimeoutTracker timeoutTracker = timeoutTrackerFactory.newTracker(tcId);
+                            TransactionRecoverTracker recoverTracker =
+                                    new TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
+                                    timeoutTracker, tcId.getId());
+                            return transactionMetadataStoreProvider
+                                    .openStore(tcId, pulsarService.getManagedLedgerFactory(), v,
+                                            timeoutTracker, recoverTracker);
+                });
+    }
+
+    public CompletableFuture<Void> removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
+        final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
+                .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1));
+        if (tcLoadSemaphore.tryAcquire()) {

Review comment:
       `removeTransactionMetadataStore` will be called in the same time. we only allow one remove op in the same time, we don't need to wait or reschedule a trial.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org