You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/04/25 07:57:48 UTC
[pulsar] branch master updated: Remove unused start method in tx-metadata-store. (#15286)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 35c4b68f586 Remove unused start method in tx-metadata-store. (#15286)
35c4b68f586 is described below
commit 35c4b68f586774d0e2b7a3a2a6ab1d6a20ac1452
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Apr 25 15:57:38 2022 +0800
Remove unused start method in tx-metadata-store. (#15286)
---
.../broker/TransactionMetadataStoreService.java | 85 ++++------------------
1 file changed, 15 insertions(+), 70 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 569f5900342..8aff792efec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.transaction.exception.coordinator.TransactionCoordinatorException;
import org.apache.pulsar.broker.transaction.recover.TransactionRecoverTrackerImpl;
@@ -54,10 +53,7 @@ import org.apache.pulsar.client.api.transaction.TransactionBufferClientException
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException.RequestTimeoutException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.TxnAction;
-import org.apache.pulsar.common.naming.NamespaceBundle;
-import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -114,57 +110,6 @@ public class TransactionMetadataStoreService {
this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
}
- @Deprecated
- public void start() {
- pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() {
-
- @Override
- public void onLoad(NamespaceBundle bundle) {
- pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
- .whenComplete((topics, ex) -> {
- if (ex == null) {
- for (String topic : topics) {
- TopicName name = TopicName.get(topic);
- if (SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
- .equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
- && name.isPartitioned()) {
- handleTcClientConnect(TransactionCoordinatorID.get(name.getPartitionIndex()));
- }
- }
- } else {
- LOG.error("Failed to get owned topic list when triggering on-loading bundle {}.",
- bundle, ex);
- }
- });
- }
-
- @Override
- public void unLoad(NamespaceBundle bundle) {
- pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
- .whenComplete((topics, ex) -> {
- if (ex == null) {
- for (String topic : topics) {
- TopicName name = TopicName.get(topic);
- if (SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
- .equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
- && name.isPartitioned()) {
- removeTransactionMetadataStore(
- TransactionCoordinatorID.get(name.getPartitionIndex()));
- }
- }
- } else {
- LOG.error("Failed to get owned topic list error when triggering un-loading bundle {}.",
- bundle, ex);
- }
- });
- }
- @Override
- public boolean test(NamespaceBundle namespaceBundle) {
- return namespaceBundle.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE);
- }
- });
- }
-
public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tcId) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
@@ -381,16 +326,15 @@ public class TransactionMetadataStoreService {
future.complete(null);
return;
}
- Throwable realCause = FutureUtil.unwrapCompletionException(ex);
- if (!isRetryableException(realCause)) {
+ if (!isRetryableException(ex)) {
LOG.error("End transaction fail! TxnId : {}, "
- + "TxnAction : {}", txnID, txnAction, realCause);
+ + "TxnAction : {}", txnID, txnAction, ex);
future.completeExceptionally(ex);
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, "
- + "TxnAction : {}", txnID, txnAction, realCause);
+ + "TxnAction : {}", txnID, txnAction, ex);
}
transactionOpRetryTimer.newTimeout(timeout ->
endTransaction(txnID, txnAction, isTimeout, future),
@@ -436,7 +380,7 @@ public class TransactionMetadataStoreService {
return null;
}
}).exceptionally(e -> {
- if (isRetryableException(e.getCause())) {
+ if (isRetryableException(e)) {
endTransaction(txnID, TxnAction.ABORT_VALUE, true);
} else {
if (LOG.isDebugEnabled()) {
@@ -487,15 +431,16 @@ public class TransactionMetadataStoreService {
});
}
- private static boolean isRetryableException(Throwable e) {
- return (e instanceof TransactionMetadataStoreStateException
- || e instanceof RequestTimeoutException
- || e instanceof ManagedLedgerException
- || e instanceof BrokerPersistenceException
- || e instanceof LookupException
- || e instanceof ReachMaxPendingOpsException
- || e instanceof ConnectException)
- && !(e instanceof ManagedLedgerException.ManagedLedgerFencedException);
+ private static boolean isRetryableException(Throwable ex) {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ return (realCause instanceof TransactionMetadataStoreStateException
+ || realCause instanceof RequestTimeoutException
+ || realCause instanceof ManagedLedgerException
+ || realCause instanceof BrokerPersistenceException
+ || realCause instanceof LookupException
+ || realCause instanceof ReachMaxPendingOpsException
+ || realCause instanceof ConnectException)
+ && !(realCause instanceof ManagedLedgerException.ManagedLedgerFencedException);
}
private CompletableFuture<Void> endTxnInTransactionMetadataStore(TxnID txnID, int txnAction) {
@@ -517,7 +462,7 @@ public class TransactionMetadataStoreService {
return Collections.unmodifiableMap(stores);
}
- public synchronized void close () {
+ public void close () {
this.internalPinnedExecutor.shutdown();
stores.forEach((tcId, metadataStore) -> {
metadataStore.closeAsync().whenComplete((v, ex) -> {