You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/14 11:34:02 UTC

[GitHub] [pulsar] eolivelli commented on a change in pull request #11357: [Transaction] Transaction coordinator fence mechanism.

eolivelli commented on a change in pull request #11357:
URL: https://github.com/apache/pulsar/pull/11357#discussion_r708173091



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -691,7 +692,11 @@ protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authorit
         }
         // If the topic name is a partition name, no need to get partition topic metadata again
         if (topicName.isPartitioned()) {
-            internalUnloadNonPartitionedTopic(asyncResponse, authoritative);
+            if (topicName.toString().startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString())) {

Review comment:
       we should have a better way to detect a system topic, IIRC we added something about this

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -66,11 +75,17 @@ protected void cleanup() throws Exception {
     }
 
     @Test
-    public void testAddAndRemoveTransactionMetadataStore() {
+    public void testCloseLock() {

Review comment:
       remove empty test case ?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
##########
@@ -396,6 +396,7 @@ private static void verifyCoordinatorStats(String state,
     private void initTransaction(int coordinatorSize) throws Exception {
         admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), coordinatorSize);
         admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+        pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();

Review comment:
       please close this client, otherwise the new variable below will override the reference and we will leak a PulsarClient

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -400,13 +432,14 @@ public void endTransactionForTimeout(TxnID txnID) {
     }
 
     private static boolean isRetryableException(Throwable e) {
-        return e instanceof TransactionMetadataStoreStateException
+        return (e instanceof TransactionMetadataStoreStateException
                 || e instanceof RequestTimeoutException
                 || e instanceof ManagedLedgerException
                 || e instanceof BrokerPersistenceException
                 || e instanceof LookupException
                 || e instanceof ReachMaxPendingOpsException
-                || e instanceof ConnectException;
+                || e instanceof ConnectException)
+                && !(e instanceof ManagedLedgerException.ManagedLedgerFencedException);

Review comment:
       we should add some "isRetryable" method to ManagedLedgerException
   otherwise this code will be hard to be maintained 
   
   cc @merlimat 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1680,6 +1682,17 @@ public boolean isTopicNsOwnedByBroker(TopicName topicName) {
                                 : CompletableFuture.completedFuture(null)));
             }
         });
+        if (getPulsar().getConfig().isTransactionCoordinatorEnabled()
+                && serviceUnit.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)) {
+            TransactionMetadataStoreService metadataStoreService =
+                    this.getPulsar().getTransactionMetadataStoreService();
+            // if have store belongs to this bundle, remove and close the store

Review comment:
       typo "if the store" and not "if have store"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org