You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/04/25 07:57:48 UTC

[pulsar] branch master updated: Remove unused start method in tx-metadata-store. (#15286)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 35c4b68f586 Remove unused start method in tx-metadata-store. (#15286)
35c4b68f586 is described below

commit 35c4b68f586774d0e2b7a3a2a6ab1d6a20ac1452
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Apr 25 15:57:38 2022 +0800

    Remove unused start method in tx-metadata-store. (#15286)
---
 .../broker/TransactionMetadataStoreService.java    | 85 ++++------------------
 1 file changed, 15 insertions(+), 70 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 569f5900342..8aff792efec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.transaction.exception.coordinator.TransactionCoordinatorException;
 import org.apache.pulsar.broker.transaction.recover.TransactionRecoverTrackerImpl;
@@ -54,10 +53,7 @@ import org.apache.pulsar.client.api.transaction.TransactionBufferClientException
 import org.apache.pulsar.client.api.transaction.TransactionBufferClientException.RequestTimeoutException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.TxnAction;
-import org.apache.pulsar.common.naming.NamespaceBundle;
-import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -114,57 +110,6 @@ public class TransactionMetadataStoreService {
         this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
     }
 
-    @Deprecated
-    public void start() {
-        pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() {
-
-            @Override
-            public void onLoad(NamespaceBundle bundle) {
-                pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
-                        .whenComplete((topics, ex) -> {
-                            if (ex == null) {
-                                for (String topic : topics) {
-                                    TopicName name = TopicName.get(topic);
-                                    if (SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
-                                            .equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
-                                            && name.isPartitioned()) {
-                                        handleTcClientConnect(TransactionCoordinatorID.get(name.getPartitionIndex()));
-                                    }
-                                }
-                            } else {
-                                LOG.error("Failed to get owned topic list when triggering on-loading bundle {}.",
-                                        bundle, ex);
-                            }
-                        });
-            }
-
-            @Override
-            public void unLoad(NamespaceBundle bundle) {
-                pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
-                        .whenComplete((topics, ex) -> {
-                            if (ex == null) {
-                                for (String topic : topics) {
-                                    TopicName name = TopicName.get(topic);
-                                    if (SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
-                                            .equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
-                                            && name.isPartitioned()) {
-                                        removeTransactionMetadataStore(
-                                                TransactionCoordinatorID.get(name.getPartitionIndex()));
-                                    }
-                                }
-                            } else {
-                                LOG.error("Failed to get owned topic list error when triggering un-loading bundle {}.",
-                                        bundle, ex);
-                            }
-                        });
-            }
-            @Override
-            public boolean test(NamespaceBundle namespaceBundle) {
-                return namespaceBundle.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE);
-            }
-        });
-    }
-
     public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tcId) {
         CompletableFuture<Void> completableFuture = new CompletableFuture<>();
         internalPinnedExecutor.execute(() -> {
@@ -381,16 +326,15 @@ public class TransactionMetadataStoreService {
                         future.complete(null);
                         return;
                     }
-                    Throwable realCause = FutureUtil.unwrapCompletionException(ex);
-                    if (!isRetryableException(realCause)) {
+                    if (!isRetryableException(ex)) {
                         LOG.error("End transaction fail! TxnId : {}, "
-                                + "TxnAction : {}", txnID, txnAction, realCause);
+                                + "TxnAction : {}", txnID, txnAction, ex);
                         future.completeExceptionally(ex);
                         return;
                     }
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, "
-                                + "TxnAction : {}", txnID, txnAction, realCause);
+                                + "TxnAction : {}", txnID, txnAction, ex);
                     }
                     transactionOpRetryTimer.newTimeout(timeout ->
                                     endTransaction(txnID, txnAction, isTimeout, future),
@@ -436,7 +380,7 @@ public class TransactionMetadataStoreService {
                 return null;
             }
         }).exceptionally(e -> {
-            if (isRetryableException(e.getCause())) {
+            if (isRetryableException(e)) {
                 endTransaction(txnID, TxnAction.ABORT_VALUE, true);
             } else {
                 if (LOG.isDebugEnabled()) {
@@ -487,15 +431,16 @@ public class TransactionMetadataStoreService {
                 });
     }
 
-    private static boolean isRetryableException(Throwable e) {
-        return (e instanceof TransactionMetadataStoreStateException
-                || e instanceof RequestTimeoutException
-                || e instanceof ManagedLedgerException
-                || e instanceof BrokerPersistenceException
-                || e instanceof LookupException
-                || e instanceof ReachMaxPendingOpsException
-                || e instanceof ConnectException)
-                && !(e instanceof ManagedLedgerException.ManagedLedgerFencedException);
+    private static boolean isRetryableException(Throwable ex) {
+        Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+        return (realCause instanceof TransactionMetadataStoreStateException
+                || realCause instanceof RequestTimeoutException
+                || realCause instanceof ManagedLedgerException
+                || realCause instanceof BrokerPersistenceException
+                || realCause instanceof LookupException
+                || realCause instanceof ReachMaxPendingOpsException
+                || realCause instanceof ConnectException)
+                && !(realCause instanceof ManagedLedgerException.ManagedLedgerFencedException);
     }
 
     private CompletableFuture<Void> endTxnInTransactionMetadataStore(TxnID txnID, int txnAction) {
@@ -517,7 +462,7 @@ public class TransactionMetadataStoreService {
         return Collections.unmodifiableMap(stores);
     }
 
-    public synchronized void close () {
+    public void close () {
         this.internalPinnedExecutor.shutdown();
         stores.forEach((tcId, metadataStore) -> {
             metadataStore.closeAsync().whenComplete((v, ex) -> {