You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/12/04 13:06:46 UTC

[GitHub] [pulsar] liangyepianzhou opened a new pull request #13135: [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect

liangyepianzhou opened a new pull request #13135:
URL: https://github.com/apache/pulsar/pull/13135


   ### Motivation
   we should not throw a TransactionCoordinatorNotFound to client. Beacuse that exception is  a normal behavior.
   ### Modifications
   Try the operation again when the client receives a TransactionCoordinatorNotFound.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
   Check the box below and label this PR (if you have committer privilege).
   
   Need to update docs? 
   
   - [ ] `doc-required` 
     
     (If you need help on updating docs, create a doc issue)
     
   - [x] `no-need-doc` 
     
     (Please explain why)
     
   - [ ] `doc` 
     
     (If this PR contains doc changes)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #13135: [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #13135:
URL: https://github.com/apache/pulsar/pull/13135#discussion_r764043726



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -206,8 +210,25 @@ void handleNewTxnResponse(CommandNewTxnResponse response) {
             }
             op.callback.complete(txnID);
         } else {
-            LOG.error("Got new txn for request {} error {}", response.getRequestId(), response.getError());
             handleTransactionFailOp(response.getError(), response.getMessage(), op);
+            if (response.getError() == ServerError.TransactionCoordinatorNotFound) {

Review comment:
       Should retry to send the command after the TC client is connected to the TC? I think it should be better to move to `TransactionMetaStoreHandler.connectionOpened`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #13135: [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #13135:
URL: https://github.com/apache/pulsar/pull/13135#discussion_r767102340



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1997,7 +1997,31 @@ private boolean checkTransactionEnableAndSendError(long requestId) {
             return true;
         }
     }
+    private Throwable handleException(Throwable ex, String op, long requestId) {

Review comment:
       this is only for Transactions, can you please refer to transactions in the method name?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionRetryTest.java
##########
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.junit.Assert.fail;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
+import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test
+public class TransactionRetryTest extends TransactionTestBase {
+
+    private static final int TOPIC_PARTITION = 3;
+    private static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
+    private static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test";
+    private static final int NUM_PARTITIONS = 16;
+    @BeforeMethod
+    protected void setup() throws Exception {
+        setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
+        admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() {
+        super.internalCleanup();
+    }
+
+
+    @Test
+    public void testTryNewTxnAgainWhenTCNotReadyOrConnecting () throws Exception {
+        Callable<CompletableFuture<Transaction>> callable = ()
+                -> {
+            try {
+                return pulsarClient
+                        .newTransaction()
+                        .withTransactionTimeout(5, TimeUnit.SECONDS)
+                        .build();
+            } catch (PulsarClientException e) {
+                e.printStackTrace();

Review comment:
       nit: use logger




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #13135: [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #13135:
URL: https://github.com/apache/pulsar/pull/13135#discussion_r767102340



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1997,7 +1997,31 @@ private boolean checkTransactionEnableAndSendError(long requestId) {
             return true;
         }
     }
+    private Throwable handleException(Throwable ex, String op, long requestId) {

Review comment:
       this is only for Transactions, can you please refer to transactions in the method name?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionRetryTest.java
##########
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.junit.Assert.fail;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
+import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test
+public class TransactionRetryTest extends TransactionTestBase {
+
+    private static final int TOPIC_PARTITION = 3;
+    private static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
+    private static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test";
+    private static final int NUM_PARTITIONS = 16;
+    @BeforeMethod
+    protected void setup() throws Exception {
+        setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
+        admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() {
+        super.internalCleanup();
+    }
+
+
+    @Test
+    public void testTryNewTxnAgainWhenTCNotReadyOrConnecting () throws Exception {
+        Callable<CompletableFuture<Transaction>> callable = ()
+                -> {
+            try {
+                return pulsarClient
+                        .newTransaction()
+                        .withTransactionTimeout(5, TimeUnit.SECONDS)
+                        .build();
+            } catch (PulsarClientException e) {
+                e.printStackTrace();

Review comment:
       nit: use logger




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #13135: [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #13135:
URL: https://github.com/apache/pulsar/pull/13135#discussion_r765682056



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -2022,11 +2034,9 @@ protected void handleNewTxn(CommandNewTxn command) {
                     ctx.writeAndFlush(Commands.newTxnResponse(requestId, txnID.getLeastSigBits(),
                             txnID.getMostSigBits()));
                 } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Send response error for new txn request {}", requestId, ex);
-                    }
+                    ex = handleException(ex, "new txn", requestId);

Review comment:
       use org.apache.pulsar.common.api.proto.BaseCommand.NEW_TXN is better

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1997,7 +1997,19 @@ private boolean checkTransactionEnableAndSendError(long requestId) {
             return true;
         }
     }
+    private Throwable handleException(Throwable ex, String op, long requestId) {
+        if (ex instanceof ManagedLedgerException.ManagedLedgerFencedException) {
+            return new CoordinatorException.CoordinatorNotFoundException(ex.getMessage());
 
+        }
+        if (TransactionMetadataStoreService.isRetryableException(ex) || ex.getCause() != null
+                && TransactionMetadataStoreService.isRetryableException(ex.getCause())) {
+            return new CoordinatorException.TcOperationRetryException(ex.getMessage());
+        } else {
+            log.error("Send response error for {} request {}.", op, requestId, ex);

Review comment:
       when produce CoordinatorNotFoundException will print log.

##########
File path: pulsar-client-cpp/include/pulsar/Result.h
##########
@@ -88,6 +88,7 @@ enum Result
     ResultProducerFenced,                            /// Producer was fenced by broker
 
     ResultMemoryBufferIsFull,  /// Client-wide memory limit has been reached
+    ResultRetryTcOpAgain       /// Get a exception which need client to retry the TC`s operation again

Review comment:
       TcOperationRetry

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -2025,8 +2025,15 @@ protected void handleNewTxn(CommandNewTxn command) {
                     if (log.isDebugEnabled()) {
                         log.debug("Send response error for new txn request {}", requestId, ex);
                     }
+                    if (ex instanceof ManagedLedgerException.ManagedLedgerFencedException) {
+                        ex = new CoordinatorException
+                                .CoordinatorNotFoundException(ex.getMessage());
+                    } else if (TransactionMetadataStoreService.isRetryableException(ex) || ex.getCause() != null
+                            && TransactionMetadataStoreService.isRetryableException(ex.getCause())) {
+                        ex = new CoordinatorException.TcOperationRetryException(ex.getMessage());
+                    }
 
-                    ctx.writeAndFlush(Commands.newTxnResponse(requestId, tcId.getId(),
+                   ctx.writeAndFlush(Commands.newTxnResponse(requestId, tcId.getId(),

Review comment:
       code format

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -2025,8 +2025,15 @@ protected void handleNewTxn(CommandNewTxn command) {
                     if (log.isDebugEnabled()) {
                         log.debug("Send response error for new txn request {}", requestId, ex);
                     }
+                    if (ex instanceof ManagedLedgerException.ManagedLedgerFencedException) {

Review comment:
       add ex.getCause() instanceof ManagedLedgerException.ManagedLedgerFencedException

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -181,7 +207,10 @@ private void failPendingRequest() {
         }
         long requestId = client.newRequestId();
         ByteBuf cmd = Commands.newTxn(transactionCoordinatorId, requestId, unit.toMillis(timeout));
-        OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback);
+        OpForNewTxnCallBack op = OpForNewTxnCallBack.create(cmd, callback, timeout, unit, client);
+        if (checkIfConnecting(requestId, op)) {

Review comment:
       can move this to canSendRequest

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -206,8 +235,27 @@ void handleNewTxnResponse(CommandNewTxnResponse response) {
             }
             op.callback.complete(txnID);
         } else {
-            LOG.error("Got new txn for request {} error {}", response.getRequestId(), response.getError());
             handleTransactionFailOp(response.getError(), response.getMessage(), op);
+            if (response.getError() == ServerError.TransactionCoordinatorNotFound
+                    || response.getError() == ServerError.RetryTcOpAgain) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again",
+                            response.getRequestId());
+                }
+                timer.newTimeout(timeout -> {
+                    long requestId = client.newRequestId();
+                    ByteBuf cmd = Commands.newTxn(transactionCoordinatorId, requestId,
+                            op.unit.toMillis(op.timeout));
+                    OpForNewTxnCallBack opNew = OpForNewTxnCallBack.create(cmd, op.callback, op.timeout,

Review comment:
       backoff seem not to use

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -335,17 +440,61 @@ void handleEndTxnResponse(CommandEndTxnResponse response) {
             }
             op.callback.complete(null);
         } else {
-            LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError());
             handleTransactionFailOp(response.getError(), response.getMessage(), op);
-        }
+            if (response.getError() == ServerError.TransactionCoordinatorNotFound
+                    || response.getError() == ServerError.RetryTcOpAgain) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again",
+                            response.getRequestId());
+                }
+                timer.newTimeout(timeout -> {
+                    long requestId = client.newRequestId();
+                    TxnID txnID = op.txnID;
+                    TxnAction action = op.action;
+                    BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(),
+                            txnID.getMostSigBits(), action);
+                    OpForEndTxnCallBack opNew = OpForEndTxnCallBack
+                            .create(Commands.serializeWithSize(cmd), op.callback, txnID, action, client);
+                    ReferenceCountUtil.safeRelease(op.cmd);
+                    op.recycle();
+                    tryExecuteCommandAgain(opNew, requestId);
+                }, op.backoff.next(), TimeUnit.MILLISECONDS);
+                return;
+            }
+            LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError());
 
+        }
         onResponse(op);
     }
 
+    private <T> void tryExecuteCommandAgain(OpBase<T> op, long requestId) {
+        ClientCnx cnx = cnx();
+        if (cnx == null) {
+            timer.newTimeout(timeout ->
+                    tryExecuteCommandAgain(op, requestId), op.backoff.next(), TimeUnit.MILLISECONDS);
+            return;
+        }
+        pendingRequests.put(requestId, op);
+        timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
+        op.cmd.retain();
+        cnx.ctx().writeAndFlush(op.cmd, cnx().ctx().voidPromise());
+    }
+
     private void handleTransactionFailOp(ServerError error, String message, OpBase<?> op) {

Review comment:
       return a boolean do retry is better




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #13135: [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #13135:
URL: https://github.com/apache/pulsar/pull/13135#discussion_r766602200



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -432,38 +705,43 @@ private void onResponse(OpBase<?> op) {
         semaphore.release();
     }
 
-    private boolean canSendRequest(CompletableFuture<?> callback) {
-        if (!isValidHandlerState(callback)) {
-            return false;
-        }
+    private void canSendRequestNow(CompletableFuture<?> callback, long requestId, OpBase<?> op) {
         try {
             if (blockIfReachMaxPendingOps) {
                 semaphore.acquire();
             } else {
                 if (!semaphore.tryAcquire()) {
                     callback.completeExceptionally(new TransactionCoordinatorClientException("Reach max pending ops."));
-                    return false;
+                    ReferenceCountUtil.safeRelease(op.cmd);
+                    op.recycle();
+                    return;
                 }
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             callback.completeExceptionally(TransactionCoordinatorClientException.unwrap(e));
-            return false;
+            ReferenceCountUtil.safeRelease(op.cmd);
+            op.recycle();
+            return;
+        }
+        if (!checkStateAndSendRequest(callback, requestId, op)) {
+            ReferenceCountUtil.safeRelease(op.cmd);
+            op.recycle();

Review comment:
       semaphore.release();




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on pull request #13135: [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on pull request #13135:
URL: https://github.com/apache/pulsar/pull/13135#issuecomment-991893615


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #13135: [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #13135:
URL: https://github.com/apache/pulsar/pull/13135#discussion_r767536609



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -499,7 +499,7 @@ public void endTransactionForTimeout(TxnID txnID) {
         return resultFuture.thenCompose((future) -> endTxnInTransactionMetadataStore(txnID, txnAction));
     }
 
-    private static boolean isRetryableException(Throwable e) {
+    public static boolean isRetryableException(Throwable e) {

Review comment:
       Should be private? 

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -60,6 +63,10 @@
     private final ConcurrentLongHashMap<OpBase<?>> pendingRequests =
         new ConcurrentLongHashMap<>(16, 1);
     private final ConcurrentLinkedQueue<RequestTime> timeoutQueue;
+    private final ConcurrentLongHashMap<OpBase<?>> waitingExecutedRequests =

Review comment:
       Can we reuse the `pendingRequests` ? we can only remove the op from the `pendingRequests` if the request succeeds or encounter a non-recoverable exception, after the connection opened, we can just flush out the `pendingRequests` again.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -335,67 +400,289 @@ void handleEndTxnResponse(CommandEndTxnResponse response) {
             }
             op.callback.complete(null);
         } else {
-            LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError());
-            handleTransactionFailOp(response.getError(), response.getMessage(), op);
-        }
+            if (checkIfNeedRetryByError(response.getError(), response.getMessage(), op)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Get a response for the {} request {} error "
+                                    + "TransactionCoordinatorNotFound and try it again",
+                            BaseCommand.Type.END_TXN.name(), response.getRequestId());
+                }
+                timer.newTimeout(timeout -> {
+                    long requestId = client.newRequestId();
+                    TxnID txnID = op.txnID;
+                    TxnAction action = op.action;
+                    ByteBuf cmd = Commands.serializeWithSize(Commands.newEndTxn(requestId, txnID.getLeastSigBits(),
+                            txnID.getMostSigBits(), action));
+                    OpForEndTxnCallBack opNew = OpForEndTxnCallBack.create(cmd, op.callback, txnID, action,
+                                    op.backoff);
+                    onNewRequest(op, opNew, requestId);

Review comment:
       The reason for creating a new op is trying to use a different request ID for the new OP? Do you see any problem when using the old request ID?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #13135: [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #13135:
URL: https://github.com/apache/pulsar/pull/13135#discussion_r766675485



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -158,6 +178,12 @@ public void connectionOpened(ClientCnx cnx) {
         }
     }
 
+    private void executedWaitingRequests() {
+        waitingExecutedRequests.forEach((requestId, op) -> {
+            tryInternalExecuteCommandAgain(op, requestId);

Review comment:
       clear queue




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #13135: [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #13135:
URL: https://github.com/apache/pulsar/pull/13135#discussion_r763747147



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -335,41 +402,77 @@ void handleEndTxnResponse(CommandEndTxnResponse response) {
             }
             op.callback.complete(null);
         } else {
-            LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError());
             handleTransactionFailOp(response.getError(), response.getMessage(), op);
+            if (response.getError() == ServerError.TransactionCoordinatorNotFound) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again",
+                            response.getRequestId());
+                }
+                timer.newTimeout(timeout -> {
+                    long requestId = client.newRequestId();
+                    TxnID txnID = op.txnID;
+                    TxnAction action = op.action;
+                    BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(),
+                            txnID.getMostSigBits(), action);
+                    OpForEndTxnCallBack opNew = OpForEndTxnCallBack
+                            .create(Commands.serializeWithSize(cmd), op.callback, txnID, action);
+                    op.cmd.release();
+                    op.recycle();
+                    tryExecuteCommandAgain(opNew, requestId);
+                }, op.backoff.next(), TimeUnit.MILLISECONDS);
+                return;
+            } else {
+                LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError());
+            }
         }
-
         onResponse(op);
     }
 
+    private <T> void tryExecuteCommandAgain(OpBase<T> op, long requestId) {
+        if (cnx() == null) {
+            timer.newTimeout(timeout ->
+                    tryExecuteCommandAgain(op, requestId), op.backoff.next(), TimeUnit.MILLISECONDS);
+            return;
+        }
+        pendingRequests.put(requestId, op);
+        timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
+        cnx().ctx().writeAndFlush(op.cmd, cnx().ctx().voidPromise());
+    }
+
     private void handleTransactionFailOp(ServerError error, String message, OpBase<?> op) {
         if (error == ServerError.TransactionCoordinatorNotFound && getState() != State.Connecting) {

Review comment:
       ```
   if (error == ServerError.TransactionCoordinatorNotFound) {
       if (getState() != State.Connecting)) {
                       connectionHandler.reconnectLater(new TransactionCoordinatorClientException
                       .CoordinatorNotFoundException(message));
       }
       return;
   }
   ```

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -335,41 +402,77 @@ void handleEndTxnResponse(CommandEndTxnResponse response) {
             }
             op.callback.complete(null);
         } else {
-            LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError());
             handleTransactionFailOp(response.getError(), response.getMessage(), op);
+            if (response.getError() == ServerError.TransactionCoordinatorNotFound) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again",
+                            response.getRequestId());
+                }
+                timer.newTimeout(timeout -> {
+                    long requestId = client.newRequestId();
+                    TxnID txnID = op.txnID;
+                    TxnAction action = op.action;
+                    BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(),
+                            txnID.getMostSigBits(), action);
+                    OpForEndTxnCallBack opNew = OpForEndTxnCallBack
+                            .create(Commands.serializeWithSize(cmd), op.callback, txnID, action);
+                    op.cmd.release();
+                    op.recycle();
+                    tryExecuteCommandAgain(opNew, requestId);
+                }, op.backoff.next(), TimeUnit.MILLISECONDS);
+                return;
+            } else {

Review comment:
       use return, we don't need else.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -335,41 +402,77 @@ void handleEndTxnResponse(CommandEndTxnResponse response) {
             }
             op.callback.complete(null);
         } else {
-            LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError());
             handleTransactionFailOp(response.getError(), response.getMessage(), op);
+            if (response.getError() == ServerError.TransactionCoordinatorNotFound) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again",
+                            response.getRequestId());
+                }
+                timer.newTimeout(timeout -> {
+                    long requestId = client.newRequestId();
+                    TxnID txnID = op.txnID;
+                    TxnAction action = op.action;
+                    BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(),
+                            txnID.getMostSigBits(), action);
+                    OpForEndTxnCallBack opNew = OpForEndTxnCallBack
+                            .create(Commands.serializeWithSize(cmd), op.callback, txnID, action);
+                    op.cmd.release();
+                    op.recycle();
+                    tryExecuteCommandAgain(opNew, requestId);
+                }, op.backoff.next(), TimeUnit.MILLISECONDS);
+                return;
+            } else {
+                LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError());
+            }
         }
-
         onResponse(op);
     }
 
+    private <T> void tryExecuteCommandAgain(OpBase<T> op, long requestId) {
+        if (cnx() == null) {
+            timer.newTimeout(timeout ->
+                    tryExecuteCommandAgain(op, requestId), op.backoff.next(), TimeUnit.MILLISECONDS);
+            return;
+        }
+        pendingRequests.put(requestId, op);
+        timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
+        cnx().ctx().writeAndFlush(op.cmd, cnx().ctx().voidPromise());

Review comment:
       should use local variable, otherwise it will produce NPE. code like this
   ```
          ClientCnx cnx = cnx();
           if (cnx == null) {
               timer.newTimeout(timeout ->
                       tryExecuteCommandAgain(op, requestId), op.backoff.next(), TimeUnit.MILLISECONDS);
               return;
           }
           pendingRequests.put(requestId, op);
           timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
           cnx.ctx().writeAndFlush(op.cmd, cnx().ctx().voidPromise());
   ```

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -335,41 +402,77 @@ void handleEndTxnResponse(CommandEndTxnResponse response) {
             }
             op.callback.complete(null);
         } else {
-            LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError());
             handleTransactionFailOp(response.getError(), response.getMessage(), op);
+            if (response.getError() == ServerError.TransactionCoordinatorNotFound) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again",
+                            response.getRequestId());
+                }
+                timer.newTimeout(timeout -> {
+                    long requestId = client.newRequestId();
+                    TxnID txnID = op.txnID;
+                    TxnAction action = op.action;
+                    BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(),
+                            txnID.getMostSigBits(), action);
+                    OpForEndTxnCallBack opNew = OpForEndTxnCallBack
+                            .create(Commands.serializeWithSize(cmd), op.callback, txnID, action);
+                    op.cmd.release();
+                    op.recycle();
+                    tryExecuteCommandAgain(opNew, requestId);
+                }, op.backoff.next(), TimeUnit.MILLISECONDS);
+                return;
+            } else {
+                LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError());
+            }
         }
-
         onResponse(op);
     }
 
+    private <T> void tryExecuteCommandAgain(OpBase<T> op, long requestId) {
+        if (cnx() == null) {
+            timer.newTimeout(timeout ->
+                    tryExecuteCommandAgain(op, requestId), op.backoff.next(), TimeUnit.MILLISECONDS);
+            return;
+        }
+        pendingRequests.put(requestId, op);
+        timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
+        cnx().ctx().writeAndFlush(op.cmd, cnx().ctx().voidPromise());
+    }
+
     private void handleTransactionFailOp(ServerError error, String message, OpBase<?> op) {
         if (error == ServerError.TransactionCoordinatorNotFound && getState() != State.Connecting) {
             connectionHandler.reconnectLater(new TransactionCoordinatorClientException
                     .CoordinatorNotFoundException(message));
         }
 
-        if (op != null) {
+        if (op != null && error != ServerError.TransactionCoordinatorNotFound) {
             op.callback.completeExceptionally(getExceptionByServerError(error, message));
         }
     }
 
     private static abstract class OpBase<T> {
         protected ByteBuf cmd;
         protected CompletableFuture<T> callback;
+        protected Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS, 3, TimeUnit.SECONDS, 10,
+                TimeUnit.SECONDS);
 
         abstract void recycle();
     }
 
-    private static class OpForTxnIdCallBack extends OpBase<TxnID> {
+    private static class OpForNewTxnCallBack extends OpBase<TxnID> {
 
-        static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture<TxnID> callback) {
-            OpForTxnIdCallBack op = RECYCLER.get();
+        protected long timeout;
+        protected TimeUnit unit;
+        static OpForNewTxnCallBack create(ByteBuf cmd, CompletableFuture<TxnID> callback, long timeout, TimeUnit unit) {
+            OpForNewTxnCallBack op = RECYCLER.get();
             op.callback = callback;
             op.cmd = cmd;
+            op.timeout = timeout;
+            op.unit = unit;

Review comment:
       every field need to be set initial value like `org.apache.bookkeeper.mledger.impl.OpReadEntry.recycle()`

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -295,6 +340,28 @@ public void handleAddSubscriptionToTxnResponse(CommandAddSubscriptionToTxnRespon
             LOG.error("Add subscription to txn failed for request {} error {}.",
                     response.getRequestId(), response.getError());
             handleTransactionFailOp(response.getError(), response.getMessage(), op);
+            if (response.getError() == ServerError.TransactionCoordinatorNotFound) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again",
+                            response.getRequestId());
+                }
+                timer.newTimeout(timeout -> {
+                    long requestId = client.newRequestId();
+                    TxnID txnID = op.txnID;
+                    List<Subscription> subscriptionList = op.subscriptionList;
+                    ByteBuf cmd = Commands.newAddSubscriptionToTxn(
+                            requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), subscriptionList);
+                    OpForAddSubscriptionToTxnCallBack opNew = OpForAddSubscriptionToTxnCallBack
+                            .create(cmd, op.callback, txnID, subscriptionList);
+                    op.cmd.release();

Review comment:
       use ReferenceCountUtil.safeRelease() is better.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -335,41 +402,77 @@ void handleEndTxnResponse(CommandEndTxnResponse response) {
             }
             op.callback.complete(null);
         } else {
-            LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError());
             handleTransactionFailOp(response.getError(), response.getMessage(), op);
+            if (response.getError() == ServerError.TransactionCoordinatorNotFound) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again",
+                            response.getRequestId());
+                }
+                timer.newTimeout(timeout -> {
+                    long requestId = client.newRequestId();
+                    TxnID txnID = op.txnID;
+                    TxnAction action = op.action;
+                    BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(),
+                            txnID.getMostSigBits(), action);
+                    OpForEndTxnCallBack opNew = OpForEndTxnCallBack
+                            .create(Commands.serializeWithSize(cmd), op.callback, txnID, action);
+                    op.cmd.release();
+                    op.recycle();
+                    tryExecuteCommandAgain(opNew, requestId);
+                }, op.backoff.next(), TimeUnit.MILLISECONDS);
+                return;
+            } else {

Review comment:
       use return, we don't need else.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -335,41 +402,77 @@ void handleEndTxnResponse(CommandEndTxnResponse response) {
             }
             op.callback.complete(null);
         } else {
-            LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError());
             handleTransactionFailOp(response.getError(), response.getMessage(), op);
+            if (response.getError() == ServerError.TransactionCoordinatorNotFound) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again",
+                            response.getRequestId());
+                }
+                timer.newTimeout(timeout -> {
+                    long requestId = client.newRequestId();
+                    TxnID txnID = op.txnID;
+                    TxnAction action = op.action;
+                    BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(),
+                            txnID.getMostSigBits(), action);
+                    OpForEndTxnCallBack opNew = OpForEndTxnCallBack
+                            .create(Commands.serializeWithSize(cmd), op.callback, txnID, action);
+                    op.cmd.release();
+                    op.recycle();
+                    tryExecuteCommandAgain(opNew, requestId);
+                }, op.backoff.next(), TimeUnit.MILLISECONDS);
+                return;
+            } else {
+                LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError());
+            }
         }
-
         onResponse(op);
     }
 
+    private <T> void tryExecuteCommandAgain(OpBase<T> op, long requestId) {
+        if (cnx() == null) {
+            timer.newTimeout(timeout ->
+                    tryExecuteCommandAgain(op, requestId), op.backoff.next(), TimeUnit.MILLISECONDS);
+            return;
+        }
+        pendingRequests.put(requestId, op);
+        timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
+        cnx().ctx().writeAndFlush(op.cmd, cnx().ctx().voidPromise());
+    }
+
     private void handleTransactionFailOp(ServerError error, String message, OpBase<?> op) {
         if (error == ServerError.TransactionCoordinatorNotFound && getState() != State.Connecting) {
             connectionHandler.reconnectLater(new TransactionCoordinatorClientException
                     .CoordinatorNotFoundException(message));
         }
 
-        if (op != null) {
+        if (op != null && error != ServerError.TransactionCoordinatorNotFound) {

Review comment:
       dont change this

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java
##########
@@ -228,14 +198,14 @@ public void start() throws Exception {
         Awaitility.await().until(() -> {

Review comment:
       now we don't need to use this logical. The same as reconnect

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -335,41 +402,77 @@ void handleEndTxnResponse(CommandEndTxnResponse response) {
             }
             op.callback.complete(null);
         } else {
-            LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError());
             handleTransactionFailOp(response.getError(), response.getMessage(), op);
+            if (response.getError() == ServerError.TransactionCoordinatorNotFound) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again",
+                            response.getRequestId());
+                }
+                timer.newTimeout(timeout -> {
+                    long requestId = client.newRequestId();
+                    TxnID txnID = op.txnID;
+                    TxnAction action = op.action;
+                    BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(),

Review comment:
       should cmd should retain, otherwise it will be release in `cnx().ctx().writeAndFlush(op.cmd, cnx().ctx().voidPromise());`

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -335,41 +402,77 @@ void handleEndTxnResponse(CommandEndTxnResponse response) {
             }
             op.callback.complete(null);
         } else {
-            LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError());
             handleTransactionFailOp(response.getError(), response.getMessage(), op);
+            if (response.getError() == ServerError.TransactionCoordinatorNotFound) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again",
+                            response.getRequestId());
+                }
+                timer.newTimeout(timeout -> {
+                    long requestId = client.newRequestId();
+                    TxnID txnID = op.txnID;
+                    TxnAction action = op.action;
+                    BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(),
+                            txnID.getMostSigBits(), action);
+                    OpForEndTxnCallBack opNew = OpForEndTxnCallBack
+                            .create(Commands.serializeWithSize(cmd), op.callback, txnID, action);
+                    op.cmd.release();
+                    op.recycle();
+                    tryExecuteCommandAgain(opNew, requestId);
+                }, op.backoff.next(), TimeUnit.MILLISECONDS);
+                return;
+            } else {
+                LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError());
+            }
         }
-
         onResponse(op);
     }
 
+    private <T> void tryExecuteCommandAgain(OpBase<T> op, long requestId) {
+        if (cnx() == null) {
+            timer.newTimeout(timeout ->
+                    tryExecuteCommandAgain(op, requestId), op.backoff.next(), TimeUnit.MILLISECONDS);
+            return;
+        }
+        pendingRequests.put(requestId, op);
+        timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
+        cnx().ctx().writeAndFlush(op.cmd, cnx().ctx().voidPromise());
+    }
+
     private void handleTransactionFailOp(ServerError error, String message, OpBase<?> op) {
         if (error == ServerError.TransactionCoordinatorNotFound && getState() != State.Connecting) {
             connectionHandler.reconnectLater(new TransactionCoordinatorClientException
                     .CoordinatorNotFoundException(message));
         }
 
-        if (op != null) {
+        if (op != null && error != ServerError.TransactionCoordinatorNotFound) {
             op.callback.completeExceptionally(getExceptionByServerError(error, message));
         }
     }
 
     private static abstract class OpBase<T> {
         protected ByteBuf cmd;
         protected CompletableFuture<T> callback;
+        protected Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS, 3, TimeUnit.SECONDS, 10,

Review comment:
       client.getConfiguration().getInitialBackoffIntervalNanos()
   client.getConfiguration().getMaxBackoffIntervalNanos()
    use this to config backoff




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #13135: [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #13135:
URL: https://github.com/apache/pulsar/pull/13135#discussion_r766570924



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -206,8 +228,23 @@ void handleNewTxnResponse(CommandNewTxnResponse response) {
             }
             op.callback.complete(txnID);
         } else {
+            if (handleTransactionFailOp(response.getError(), response.getMessage(), op)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again",

Review comment:
       log add op name

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -249,8 +283,24 @@ void handleAddPublishPartitionToTxnResponse(CommandAddPartitionToTxnResponse res
             }
             op.callback.complete(null);
         } else {
+            if (handleTransactionFailOp(response.getError(), response.getMessage(), op)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again",
+                            response.getRequestId());
+                }
+                timer.newTimeout(timeout -> {
+                    long requestId = client.newRequestId();
+                    TxnID txnID = op.txnID;
+                    ByteBuf cmd = Commands.newAddPartitionToTxn(requestId, txnID.getLeastSigBits(),
+                            txnID.getMostSigBits(), op.partitions);
+                    OpForAddPublishPartitionToTxnCallBack opNew = OpForAddPublishPartitionToTxnCallBack
+                            .create(cmd, op.callback, txnID, op.partitions, op.backoff);
+                    onNewRequest(op, opNew, requestId);
+                }, op.backoff.next(), TimeUnit.MILLISECONDS);
+                return;
+            }
             LOG.error("Add publish partition for request {} error {}.", response.getRequestId(), response.getError());

Review comment:
       log add txnId

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -206,8 +228,23 @@ void handleNewTxnResponse(CommandNewTxnResponse response) {
             }
             op.callback.complete(txnID);
         } else {
+            if (handleTransactionFailOp(response.getError(), response.getMessage(), op)) {

Review comment:
       checkIfNeedRetryByError

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -432,38 +705,43 @@ private void onResponse(OpBase<?> op) {
         semaphore.release();
     }
 
-    private boolean canSendRequest(CompletableFuture<?> callback) {
-        if (!isValidHandlerState(callback)) {
-            return false;
-        }
+    private void canSendRequestNow(CompletableFuture<?> callback, long requestId, OpBase<?> op) {
         try {
             if (blockIfReachMaxPendingOps) {
                 semaphore.acquire();
             } else {
                 if (!semaphore.tryAcquire()) {
                     callback.completeExceptionally(new TransactionCoordinatorClientException("Reach max pending ops."));
-                    return false;
+                    ReferenceCountUtil.safeRelease(op.cmd);
+                    op.recycle();
+                    return;
                 }
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             callback.completeExceptionally(TransactionCoordinatorClientException.unwrap(e));
-            return false;
+            ReferenceCountUtil.safeRelease(op.cmd);
+            op.recycle();
+            return;
+        }
+        if (!checkStateAndSendRequest(callback, requestId, op)) {
+            ReferenceCountUtil.safeRelease(op.cmd);
+            op.recycle();
         }
-        return true;
+
     }
 
-    private boolean isValidHandlerState(CompletableFuture<?> callback) {
+    private boolean checkStateAndSendRequest(CompletableFuture<?> callback, long requestId, OpBase<?> op) {
         switch (getState()) {
             case Ready:
+                pendingRequests.put(requestId, op);
+                timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));

Review comment:
       this is the only one place we send request. So I think when ready and ClientCnx cnx = cnx(); cnx != null, we can send request. when the connect state is connecting, we should append this request to the queue and if it is not connecting state, we can do after backoff.nextTime.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -176,21 +202,17 @@ private void failPendingRequest() {
         }
         CompletableFuture<TxnID> callback = new CompletableFuture<>();
 
-        if (!canSendRequest(callback)) {
-            return callback;
-        }
+
         long requestId = client.newRequestId();
         ByteBuf cmd = Commands.newTxn(transactionCoordinatorId, requestId, unit.toMillis(timeout));
-        OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback);
-        pendingRequests.put(requestId, op);
-        timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
-        cmd.retain();
-        cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise());
+        OpForNewTxnCallBack op = OpForNewTxnCallBack.create(cmd, callback, timeout, unit, client);
+
+        canSendRequestNow(callback, requestId, op);

Review comment:
       handleTransactionOpRequest

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -294,7 +340,27 @@ public void handleAddSubscriptionToTxnResponse(CommandAddSubscriptionToTxnRespon
         } else {
             LOG.error("Add subscription to txn failed for request {} error {}.",
                     response.getRequestId(), response.getError());
-            handleTransactionFailOp(response.getError(), response.getMessage(), op);
+            if (handleTransactionFailOp(response.getError(), response.getMessage(), op)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again",
+                            response.getRequestId());
+                }
+                timer.newTimeout(timeout -> {
+                    long requestId = client.newRequestId();
+                    TxnID txnID = op.txnID;
+                    List<Subscription> subscriptionList = op.subscriptionList;
+                    ByteBuf cmd = Commands.newAddSubscriptionToTxn(
+                            requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), subscriptionList);
+                    OpForAddSubscriptionToTxnCallBack opNew = OpForAddSubscriptionToTxnCallBack
+                            .create(cmd, op.callback, txnID, subscriptionList,
+                                    op.backoff);
+                    onNewRequest(op, opNew, requestId);
+                }, op.backoff.next(), TimeUnit.MILLISECONDS);
+                return;
+            }
+            LOG.error("Add subscription to txn failed for request {} error {}.",

Review comment:
       add op name

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -335,67 +395,280 @@ void handleEndTxnResponse(CommandEndTxnResponse response) {
             }
             op.callback.complete(null);
         } else {
+            if (handleTransactionFailOp(response.getError(), response.getMessage(), op)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again",
+                            response.getRequestId());
+                }
+                timer.newTimeout(timeout -> {
+                    long requestId = client.newRequestId();
+                    TxnID txnID = op.txnID;
+                    TxnAction action = op.action;
+                    ByteBuf cmd = Commands.serializeWithSize(Commands.newEndTxn(requestId, txnID.getLeastSigBits(),
+                            txnID.getMostSigBits(), action));
+                    OpForEndTxnCallBack opNew = OpForEndTxnCallBack.create(cmd, op.callback, txnID, action,
+                                    op.backoff);
+                    onNewRequest(op, opNew, requestId);
+                }, op.backoff.next(), TimeUnit.MILLISECONDS);
+                return;
+            }
             LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError());
-            handleTransactionFailOp(response.getError(), response.getMessage(), op);
-        }
 
+        }
         onResponse(op);
     }
 
-    private void handleTransactionFailOp(ServerError error, String message, OpBase<?> op) {
-        if (error == ServerError.TransactionCoordinatorNotFound && getState() != State.Connecting) {
-            connectionHandler.reconnectLater(new TransactionCoordinatorClientException
-                    .CoordinatorNotFoundException(message));
+    private void onNewRequest(OpBase<?> oldOp, OpBase<?> opNew, long requestId) {
+        ReferenceCountUtil.safeRelease(oldOp.cmd);
+        oldOp.recycle();
+        tryInternalExecuteCommandAgain(opNew, requestId);
+    }
+
+    private <T> void tryInternalExecuteCommandAgain(OpBase<T> op, long requestId) {
+        ClientCnx cnx = cnx();
+        if (cnx == null) {
+            timer.newTimeout(timeout ->

Review comment:
       should check state




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on a change in pull request #13135: [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on a change in pull request #13135:
URL: https://github.com/apache/pulsar/pull/13135#discussion_r767078065



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -294,7 +340,27 @@ public void handleAddSubscriptionToTxnResponse(CommandAddSubscriptionToTxnRespon
         } else {
             LOG.error("Add subscription to txn failed for request {} error {}.",
                     response.getRequestId(), response.getError());
-            handleTransactionFailOp(response.getError(), response.getMessage(), op);
+            if (handleTransactionFailOp(response.getError(), response.getMessage(), op)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again",
+                            response.getRequestId());
+                }
+                timer.newTimeout(timeout -> {
+                    long requestId = client.newRequestId();
+                    TxnID txnID = op.txnID;
+                    List<Subscription> subscriptionList = op.subscriptionList;
+                    ByteBuf cmd = Commands.newAddSubscriptionToTxn(
+                            requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), subscriptionList);
+                    OpForAddSubscriptionToTxnCallBack opNew = OpForAddSubscriptionToTxnCallBack
+                            .create(cmd, op.callback, txnID, subscriptionList,
+                                    op.backoff);
+                    onNewRequest(op, opNew, requestId);
+                }, op.backoff.next(), TimeUnit.MILLISECONDS);
+                return;
+            }
+            LOG.error("Add subscription to txn failed for request {} error {}.",

Review comment:
       This contains the name of the operation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #13135: [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #13135:
URL: https://github.com/apache/pulsar/pull/13135#discussion_r766675485



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -158,6 +178,12 @@ public void connectionOpened(ClientCnx cnx) {
         }
     }
 
+    private void executedWaitingRequests() {
+        waitingExecutedRequests.forEach((requestId, op) -> {
+            tryInternalExecuteCommandAgain(op, requestId);

Review comment:
       clear queue




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on a change in pull request #13135: [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on a change in pull request #13135:
URL: https://github.com/apache/pulsar/pull/13135#discussion_r767078065



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -294,7 +340,27 @@ public void handleAddSubscriptionToTxnResponse(CommandAddSubscriptionToTxnRespon
         } else {
             LOG.error("Add subscription to txn failed for request {} error {}.",
                     response.getRequestId(), response.getError());
-            handleTransactionFailOp(response.getError(), response.getMessage(), op);
+            if (handleTransactionFailOp(response.getError(), response.getMessage(), op)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again",
+                            response.getRequestId());
+                }
+                timer.newTimeout(timeout -> {
+                    long requestId = client.newRequestId();
+                    TxnID txnID = op.txnID;
+                    List<Subscription> subscriptionList = op.subscriptionList;
+                    ByteBuf cmd = Commands.newAddSubscriptionToTxn(
+                            requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), subscriptionList);
+                    OpForAddSubscriptionToTxnCallBack opNew = OpForAddSubscriptionToTxnCallBack
+                            .create(cmd, op.callback, txnID, subscriptionList,
+                                    op.backoff);
+                    onNewRequest(op, opNew, requestId);
+                }, op.backoff.next(), TimeUnit.MILLISECONDS);
+                return;
+            }
+            LOG.error("Add subscription to txn failed for request {} error {}.",

Review comment:
       This contains the name of the operation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 merged pull request #13135: [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect

Posted by GitBox <gi...@apache.org>.
congbobo184 merged pull request #13135:
URL: https://github.com/apache/pulsar/pull/13135


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #13135: [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #13135:
URL: https://github.com/apache/pulsar/pull/13135#discussion_r763741588



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -335,41 +402,77 @@ void handleEndTxnResponse(CommandEndTxnResponse response) {
             }
             op.callback.complete(null);
         } else {
-            LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError());
             handleTransactionFailOp(response.getError(), response.getMessage(), op);
+            if (response.getError() == ServerError.TransactionCoordinatorNotFound) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Get a response for request {} error TransactionCoordinatorNotFound and try it again",
+                            response.getRequestId());
+                }
+                timer.newTimeout(timeout -> {
+                    long requestId = client.newRequestId();
+                    TxnID txnID = op.txnID;
+                    TxnAction action = op.action;
+                    BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(),
+                            txnID.getMostSigBits(), action);
+                    OpForEndTxnCallBack opNew = OpForEndTxnCallBack
+                            .create(Commands.serializeWithSize(cmd), op.callback, txnID, action);
+                    op.cmd.release();
+                    op.recycle();
+                    tryExecuteCommandAgain(opNew, requestId);
+                }, op.backoff.next(), TimeUnit.MILLISECONDS);
+                return;
+            } else {

Review comment:
       use return, we don't need else.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org