You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/11/13 13:00:56 UTC

[GitHub] [pulsar] gaoran10 opened a new pull request #8563: [Transaction] Guarantee transaction metadata handlers connect

gaoran10 opened a new pull request #8563:
URL: https://github.com/apache/pulsar/pull/8563


   ### Motivation
   
   Currently, the transaction metadata handlers start with pulsar client start, but the handlers connect with the broker asynchronously, if the client restart, the metadata handler may not be available.
   
   ### Modifications
   
   Add the connection future for the metadata handler.
   
   ### Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *org.apache.pulsar.client.impl.TransactionEndToEndTest#txnMetadataHandlerRecoverTest*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #8563: [Transaction] Guarantee transaction metadata handlers connect

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #8563:
URL: https://github.com/apache/pulsar/pull/8563#issuecomment-727149745


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] reswqa commented on pull request #8563: [Transaction] Guarantee transaction metadata handlers connect

Posted by GitBox <gi...@apache.org>.
reswqa commented on pull request #8563:
URL: https://github.com/apache/pulsar/pull/8563#issuecomment-727172106


   Thanks @gaoran10 for fixing this problem so quickly, this is very valuable to me.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #8563: [Transaction] Guarantee transaction metadata handlers connect

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #8563:
URL: https://github.com/apache/pulsar/pull/8563#discussion_r522951764



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
##########
@@ -75,34 +76,48 @@ public void start() throws TransactionCoordinatorClientException {
     public CompletableFuture<Void> startAsync() {
         if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) {
             return pulsarClient.getLookup().getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN)
-                .thenAccept(partitionMeta -> {
+                .thenCompose(partitionMeta -> {
+                    List<CompletableFuture<Void>> connectFutureList = new ArrayList<>();
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Transaction meta store assign partition is {}.", partitionMeta.partitions);
                     }
                     if (partitionMeta.partitions > 0) {
                         handlers = new TransactionMetaStoreHandler[partitionMeta.partitions];
                         for (int i = 0; i < partitionMeta.partitions; i++) {
-                            TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler(i, pulsarClient,
-                                    TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + i);
+                            CompletableFuture<Void> connectFuture = new CompletableFuture<>();
+                            connectFutureList.add(connectFuture);
+                            TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler(
+                                    i, pulsarClient, getTCAssignTopicName(i), connectFuture);
                             handlers[i] = handler;
                             handlerMap.put(i, handler);
                         }
                     } else {
                         handlers = new TransactionMetaStoreHandler[1];
+                        CompletableFuture<Void> connectFuture = new CompletableFuture<>();
+                        connectFutureList.add(connectFuture);
                         TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler(0, pulsarClient,
-                                TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+                                getTCAssignTopicName(-1), connectFuture);
                         handlers[0] = handler;
                         handlerMap.put(0, handler);
                     }
 
                     STATE_UPDATER.set(TransactionCoordinatorClientImpl.this, State.READY);
 
+                    return FutureUtil.waitForAll(connectFutureList);
                 });
         } else {
             return FutureUtil.failedFuture(new CoordinatorClientStateException("Can not start while current state is " + state));
         }
     }
 
+    private String getTCAssignTopicName(int partition) {
+        if (partition > 0) {

Review comment:
       Ok, I'll fix this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #8563: [Transaction] Guarantee transaction metadata handlers connect

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #8563:
URL: https://github.com/apache/pulsar/pull/8563


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #8563: [Transaction] Guarantee transaction metadata handlers connect

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #8563:
URL: https://github.com/apache/pulsar/pull/8563#issuecomment-727123031


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #8563: [Transaction] Guarantee transaction metadata handlers connect

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #8563:
URL: https://github.com/apache/pulsar/pull/8563#discussion_r522950492



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
##########
@@ -75,34 +76,48 @@ public void start() throws TransactionCoordinatorClientException {
     public CompletableFuture<Void> startAsync() {
         if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) {
             return pulsarClient.getLookup().getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN)
-                .thenAccept(partitionMeta -> {
+                .thenCompose(partitionMeta -> {
+                    List<CompletableFuture<Void>> connectFutureList = new ArrayList<>();
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Transaction meta store assign partition is {}.", partitionMeta.partitions);
                     }
                     if (partitionMeta.partitions > 0) {
                         handlers = new TransactionMetaStoreHandler[partitionMeta.partitions];
                         for (int i = 0; i < partitionMeta.partitions; i++) {
-                            TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler(i, pulsarClient,
-                                    TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + i);
+                            CompletableFuture<Void> connectFuture = new CompletableFuture<>();
+                            connectFutureList.add(connectFuture);
+                            TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler(
+                                    i, pulsarClient, getTCAssignTopicName(i), connectFuture);
                             handlers[i] = handler;
                             handlerMap.put(i, handler);
                         }
                     } else {
                         handlers = new TransactionMetaStoreHandler[1];
+                        CompletableFuture<Void> connectFuture = new CompletableFuture<>();
+                        connectFutureList.add(connectFuture);
                         TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler(0, pulsarClient,
-                                TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+                                getTCAssignTopicName(-1), connectFuture);
                         handlers[0] = handler;
                         handlerMap.put(0, handler);
                     }
 
                     STATE_UPDATER.set(TransactionCoordinatorClientImpl.this, State.READY);
 
+                    return FutureUtil.waitForAll(connectFutureList);
                 });
         } else {
             return FutureUtil.failedFuture(new CoordinatorClientStateException("Can not start while current state is " + state));
         }
     }
 
+    private String getTCAssignTopicName(int partition) {
+        if (partition > 0) {

Review comment:
       ```suggestion
           if (partition >= 0) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org