You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/21 03:02:31 UTC

[pulsar] branch branch-2.9 updated (ac932d5 -> 1d63246)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from ac932d5  [Transaction] Fix generate transactionId some comment. (#13234)
     new 8fa7139  [Issue #12485][Python Client] cannot use any values that evaluates to False (#12489)
     new c81aec3  [Config] Add readWorkerThreadsThrottlingEnabled to conf/bookkeeper.conf (#12666)
     new 757c121  Some depdency in integration tests scope should be test (#12696)
     new 9db66c3  [Pulsar SQL] Support query chunked messages feature in Pulsar SQL (#12720)
     new 49caf87  fix shedding heartbeat ns (#13208)
     new 1510cf6  [Transaction] Fix performance (#13253)
     new bff7916  [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect (#13135)
     new 3e076bb  [Broker] Modify return result of NamespacesBase#internalGetPublishRate (#13237)
     new b09fd87  [Transaction] Remove request if can not send (#13308)
     new 22a24d6  [Transaction] Allow transaction be commit or abort in the state of aborting or committing. (#13323)
     new ebaf2b7  Fix Version.h not found when CMake binary directory is customized (#13324)
     new 7ca4552  [Transaction]stop TP replaying with Exception (#12700)
     new 914f172  [Transaction] Delete the redundant code (#13327)
     new b244d0c  [Transaction]Txn client check timeout (#12521)
     new 1d63246  Optimize transaction FieldUpdater to static final (#13396)

The 15 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 conf/bookkeeper.conf                               |   5 +
 .../apache/bookkeeper/mledger/ManagedCursor.java   |   6 +
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   1 +
 .../mledger/impl/ManagedCursorContainerTest.java   |   5 +
 .../broker/TransactionMetadataStoreService.java    |   8 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   |   8 +-
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  15 +-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  15 +-
 .../apache/pulsar/broker/loadbalance/LoadData.java |  10 +
 .../broker/loadbalance/impl/OverloadShedder.java   |   8 +-
 .../broker/loadbalance/impl/ThresholdShedder.java  |   6 +-
 .../pulsar/broker/namespace/NamespaceService.java  |   6 +
 .../apache/pulsar/broker/service/ServerCnx.java    |  68 ++-
 .../pendingack/impl/MLPendingAckStore.java         |  25 +-
 .../pulsar/common/naming/NamespaceBundle.java      |  12 +
 .../broker/namespace/NamespaceServiceTest.java     |   9 +-
 .../pulsar/broker/transaction/TransactionTest.java |  87 +++
 .../broker/transaction/TransactionTestBase.java    |  14 -
 .../api/AuthorizationProducerConsumerTest.java     |   2 +
 .../client/impl/ConsumerAckResponseTest.java       |   2 +
 .../impl}/TransactionClientConnectTest.java        | 218 +++-----
 .../client/impl/TransactionEndToEndTest.java       |  67 ++-
 .../TransactionCoordinatorClientException.java     |  20 +
 pulsar-client-cpp/CMakeLists.txt                   |   1 +
 .../python/pulsar/schema/definition.py             |   6 +-
 .../apache/pulsar/client/impl/ConsumerBase.java    |   4 +
 .../client/impl/PartitionedProducerImpl.java       |   5 +
 .../apache/pulsar/client/impl/ProducerImpl.java    |  13 +
 .../pulsar/client/impl/PulsarClientImpl.java       |   1 +
 .../client/impl/TransactionMetaStoreHandler.java   | 585 ++++++++++++++-------
 .../impl/transaction/TransactionBuilderImpl.java   |   9 +-
 .../client/impl/transaction/TransactionImpl.java   |  65 ++-
 .../apache/pulsar/common/api/raw/RawMessage.java   |  29 +
 .../pulsar/common/api/raw/RawMessageImpl.java      |  44 ++
 .../pulsar/sql/presto/PulsarRecordCursor.java      | 141 ++++-
 .../pulsar/sql/presto/TestReadChunkedMessages.java | 214 ++++++++
 .../pulsar/testclient/PerformanceConsumer.java     |  14 +-
 .../pulsar/testclient/PerformanceProducer.java     |  15 +-
 .../pulsar/testclient/PerformanceTransaction.java  |  34 +-
 .../pulsar/testclient/utils/PerformanceUtils.java  |  59 ---
 .../testclient/PerformanceTransactionTest.java     |  25 +-
 .../coordinator/impl/MLTransactionLogImpl.java     |   2 +-
 tests/integration/pom.xml                          |   2 +
 43 files changed, 1339 insertions(+), 546 deletions(-)
 rename pulsar-broker/src/test/java/org/apache/pulsar/{broker/transaction => client/impl}/TransactionClientConnectTest.java (50%)
 create mode 100644 pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java
 delete mode 100644 pulsar-testclient/src/main/java/org/apache/pulsar/testclient/utils/PerformanceUtils.java

[pulsar] 06/15: [Transaction] Fix performance (#13253)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1510cf627d7e90c656d908296ad2f2cd7a66fa39
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Mon Dec 13 13:09:18 2021 +0800

    [Transaction] Fix performance (#13253)
    
    ### Motivation
    There is one omission and several irregular log formats.
    ### Modification
    1. messagesSent.increment()
    2. log format
    
    (cherry picked from commit c531c1ca0d74e0c771786ffb9930611c82a0aefe)
---
 .../org/apache/pulsar/testclient/PerformanceConsumer.java  |  4 ++--
 .../org/apache/pulsar/testclient/PerformanceProducer.java  |  3 ++-
 .../apache/pulsar/testclient/PerformanceTransaction.java   | 14 +++++++-------
 .../pulsar/testclient/PerformanceTransactionTest.java      |  2 +-
 4 files changed, 12 insertions(+), 11 deletions(-)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 134ffdd..9c15a0e 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -573,7 +573,7 @@ public class PerformanceConsumer {
                         dec.format(rateAck));
             }
             log.info(
-                    "Throughput received: {} msg --- {}  msg/s -- {} Mbit/s  "
+                    "Throughput received: {} msg --- {}  msg/s --- {} Mbit/s  "
                             + "--- Latency: mean: {} ms - med: {} "
                             + "- 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
                     intFormat.format(total),
@@ -622,7 +622,7 @@ public class PerformanceConsumer {
         }
         log.info(
             "Aggregated throughput stats --- {} records received --- {} msg/s --- {} Mbit/s"
-                 + "--- AckRate: {}  msg/s --- ack failed {} msg",
+                 + " --- AckRate: {}  msg/s --- ack failed {} msg",
             totalMessagesReceived.sum(),
             dec.format(rate),
             dec.format(throughput),
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index cfd5568..20eb8f3 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -509,7 +509,7 @@ public class PerformanceProducer {
                 totalTxnOpSuccess = totalEndTxnOpSuccessNum.sum();
                 totalTxnOpFail = totalEndTxnOpFailNum.sum();
                 rateOpenTxn = numTxnOpSuccess.sumThenReset() / elapsed;
-                log.info("--- Transaction : {} transaction end successfully ---{} transaction end failed "
+                log.info("--- Transaction : {} transaction end successfully --- {} transaction end failed "
                                 + "--- {} Txn/s",
                         totalTxnOpSuccess, totalTxnOpFail, totalFormat.format(rateOpenTxn));
             }
@@ -728,6 +728,7 @@ public class PerformanceProducer {
                     PulsarClient pulsarClient = client;
                     messageBuilder.sendAsync().thenRun(() -> {
                         bytesSent.add(payloadData.length);
+                        messagesSent.increment();
 
                         totalMessagesSent.increment();
                         totalBytesSent.add(payloadData.length);
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
index 49d441e..5127f85 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
@@ -533,8 +533,8 @@ public class PerformanceTransaction {
                     ? "Throughput transaction: {} transaction executes --- {} transaction/s"
                     : "Throughput task: {} task executes --- {} task/s";
             log.info(
-                    txnOrTaskLog + "  ---send Latency: mean: {} ms - med: {} "
-                            + "- 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}" + "---ack Latency: "
+                    txnOrTaskLog + "  --- send Latency: mean: {} ms - med: {} "
+                            + "- 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}" + " --- ack Latency: "
                             + "mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
                     intFormat.format(total),
                     dec.format(rate),
@@ -582,8 +582,8 @@ public class PerformanceTransaction {
                 "Aggregated throughput stats --- {} transaction executed --- {} transaction/s "
                         + " --- {} transaction open successfully --- {} transaction open failed"
                         + " --- {} transaction end successfully --- {} transaction end failed"
-                        + "--- {} message ack failed --- {} message send failed"
-                        + "--- {} message ack success --- {} message send success ",
+                        + " --- {} message ack failed --- {} message send failed"
+                        + " --- {} message ack success --- {} message send success ",
                 total,
                 dec.format(rate),
                 numTransactionOpenSuccess,
@@ -606,9 +606,9 @@ public class PerformanceTransaction {
         long numMessageSendFailed = numMessagesSendFailed.sum();
         long numMessageSendSuccess = numMessagesSendSuccess.sum();
         log.info(
-                "Aggregated throughput stats --- {} task executed --- {} task/s "
-                        + "--- {} message ack failed --- {} message send failed"
-                        + "--- {} message ack success --- {} message send success ",
+                "Aggregated throughput stats --- {} task executed --- {} task/s"
+                        + " --- {} message ack failed --- {} message send failed"
+                        + " --- {} message ack success --- {} message send success",
                 total,
                 totalFormat.format(rate),
                 numMessageAckFailed,
diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
index e04ea04..c5e62f7 100644
--- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
+++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
@@ -212,7 +212,7 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
         for (int i = 0; i < 505; i++) {
-            producer.newMessage().send();
+            producer.newMessage().value("messages for test transaction consumer".getBytes()).send();
         }
         Thread thread = new Thread(() -> {
             try {

[pulsar] 05/15: fix shedding heartbeat ns (#13208)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 49caf878e1487dda6da509380c651366c0b8f0c7
Author: Zhanpeng Wu <zh...@qq.com>
AuthorDate: Mon Dec 13 19:44:55 2021 +0800

    fix shedding heartbeat ns (#13208)
    
    Related to #12252
    
    I found that the problem mentioned in #12252 has not been solved, because the `HEARTBEAT_NAMESPACE_PATTERN` pattern needs a namespace as input, but what actually provides is the full name of the bundle.
    
    1. fix the parttern matching problem
    2. add a test case for it
    
    This change is already covered by existing tests.
    
    (cherry picked from commit 78e3d8f7d872746db962be36ad3de49dac1ef015)
---
 .../java/org/apache/pulsar/broker/loadbalance/LoadData.java  | 10 ++++++++++
 .../pulsar/broker/loadbalance/impl/OverloadShedder.java      |  8 ++------
 .../pulsar/broker/loadbalance/impl/ThresholdShedder.java     |  6 +-----
 .../org/apache/pulsar/broker/namespace/NamespaceService.java |  6 ++++++
 .../org/apache/pulsar/common/naming/NamespaceBundle.java     | 12 ++++++++++++
 .../apache/pulsar/broker/namespace/NamespaceServiceTest.java |  8 ++++++++
 6 files changed, 39 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
index a469c5c..4243420 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
@@ -20,8 +20,11 @@ package org.apache.pulsar.broker.loadbalance;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 import org.apache.pulsar.broker.BrokerData;
 import org.apache.pulsar.broker.BundleData;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.naming.NamespaceBundle;
 
 /**
  * This class represents all data that could be relevant when making a load management decision.
@@ -59,6 +62,13 @@ public class LoadData {
         return bundleData;
     }
 
+    public Map<String, BundleData> getBundleDataForLoadShedding() {
+        return bundleData.entrySet().stream()
+                .filter(e -> !NamespaceService.isSystemServiceNamespace(
+                        NamespaceBundle.getBundleNamespace(e.getKey())))
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
     public Map<String, Long> getRecentlyUnloadedBundles() {
         return recentlyUnloadedBundles;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
index 3f33fa3..985ed6f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 import java.util.Map;
@@ -102,10 +100,8 @@ public class OverloadShedder implements LoadSheddingStrategy {
                 // Sort bundles by throughput, then pick the biggest N which combined
                 // make up for at least the minimum throughput to offload
 
-                loadData.getBundleData().entrySet().stream()
-                    .filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches()
-                            && !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches()
-                            && localData.getBundles().contains(e.getKey()))
+                loadData.getBundleDataForLoadShedding().entrySet().stream()
+                    .filter(e -> localData.getBundles().contains(e.getKey()))
                     .map((e) -> {
                         // Map to throughput value
                         // Consider short-term byte rate to address system resource burden
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
index 3e10326..afca708 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 import java.util.HashMap;
@@ -105,9 +103,7 @@ public class ThresholdShedder implements LoadSheddingStrategy {
             MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);
 
             if (localData.getBundles().size() > 1) {
-                loadData.getBundleData().entrySet().stream()
-                    .filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches()
-                        && !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches())
+                loadData.getBundleDataForLoadShedding().entrySet().stream()
                     .map((e) -> {
                         String bundle = e.getKey();
                         BundleData bundleData = e.getValue();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index f6cba9c..8f6bfb8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1349,6 +1349,12 @@ public class NamespaceService implements AutoCloseable {
         }
     }
 
+    public static boolean isSystemServiceNamespace(String namespace) {
+        return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
+                || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches()
+                || SLA_NAMESPACE_PATTERN.matcher(namespace).matches();
+    }
+
     public boolean registerSLANamespace() throws PulsarServerException {
         boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(host, config), false);
         if (isNameSpaceRegistered) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java
index 1531095..98dcb93 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java
@@ -152,6 +152,18 @@ public class NamespaceBundle implements ServiceUnitId, Comparable<NamespaceBundl
         return namespaceBundle.substring(namespaceBundle.lastIndexOf('/') + 1);
     }
 
+    public static String getBundleNamespace(String namespaceBundle) {
+        int index = namespaceBundle.lastIndexOf('/');
+        if (index != -1) {
+            try {
+                return NamespaceName.get(namespaceBundle.substring(0, index)).toString();
+            } catch (Exception e) {
+                // return itself if meets invalid format
+            }
+        }
+        return namespaceBundle;
+    }
+
     public NamespaceBundleFactory getNamespaceBundleFactory() {
         return factory;
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 8d35cd3..d45dcc2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -539,6 +539,14 @@ public class NamespaceServiceTest extends BrokerTestBase {
         }
     }
 
+    @Test
+    public void testHeartbeatNamespaceMatch() throws Exception {
+        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), conf);
+        NamespaceBundle namespaceBundle = pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundle(namespaceName);
+        assertTrue(NamespaceService.isSystemServiceNamespace(
+                        NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
+    }
+
     @SuppressWarnings("unchecked")
     private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
             NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {

[pulsar] 15/15: Optimize transaction FieldUpdater to static final (#13396)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1d6324605f348b4bb49bd81ca151c88b4c15bbc8
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Mon Dec 20 14:49:29 2021 +0800

    Optimize transaction FieldUpdater to static final (#13396)
    
    (cherry picked from commit b261d6abfc3a6a0707bf8ff6712a58cf7f0b6a1b)
---
 .../java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 4128a6f..ebcb20e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -66,7 +66,7 @@ public class TransactionImpl implements Transaction , TimerTask {
     private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
     private final ArrayList<CompletableFuture<Void>> ackFutureList;
     private volatile State state;
-    private final AtomicReferenceFieldUpdater<TransactionImpl, State> STATE_UPDATE =
+    private static final AtomicReferenceFieldUpdater<TransactionImpl, State> STATE_UPDATE =
         AtomicReferenceFieldUpdater.newUpdater(TransactionImpl.class, State.class, "state");
 
     @Override

[pulsar] 13/15: [Transaction] Delete the redundant code (#13327)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 914f172592c6d205cc380ea3edb76e0f03904e0b
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Dec 17 14:47:12 2021 +0800

    [Transaction] Delete the redundant code (#13327)
    
    The problem was resolved, so there is no need to add a wait and retry method again.
    1. Delete the redundant code
    2. Optimize some code form
    
    (cherry picked from commit fbe010323076ba2339e2339e3031a78e20b09061)
---
 .../broker/transaction/TransactionTestBase.java    | 14 -----
 .../pulsar/testclient/PerformanceConsumer.java     | 10 ++--
 .../pulsar/testclient/PerformanceProducer.java     | 12 +++--
 .../pulsar/testclient/PerformanceTransaction.java  | 20 +++++---
 .../pulsar/testclient/utils/PerformanceUtils.java  | 59 ----------------------
 .../testclient/PerformanceTransactionTest.java     | 23 ++++-----
 6 files changed, 38 insertions(+), 100 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index e13365d..fe7a813 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -61,8 +61,6 @@ import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.MockZooKeeperSession;
 import org.apache.zookeeper.ZooKeeper;
-import org.awaitility.Awaitility;
-import org.testng.Assert;
 
 @Slf4j
 public abstract class TransactionTestBase extends TestRetrySupport {
@@ -144,8 +142,6 @@ public abstract class TransactionTestBase extends TestRetrySupport {
                 .statsInterval(0, TimeUnit.SECONDS)
                 .enableTransaction(true)
                 .build();
-        // wait tc init success to ready state
-        waitForCoordinatorToBeAvailable(numPartitionsOfTC);
     }
 
     protected void startBroker() throws Exception {
@@ -332,14 +328,4 @@ public abstract class TransactionTestBase extends TestRetrySupport {
             log.warn("Failed to clean up mocked pulsar service:", e);
         }
     }
-    public void waitForCoordinatorToBeAvailable(int numOfTCPerBroker){
-        // wait tc init success to ready state
-        Awaitility.await()
-                .untilAsserted(() -> {
-                    int transactionMetaStoreCount = pulsarServiceList.stream()
-                            .mapToInt(pulsarService -> pulsarService.getTransactionMetadataStoreService().getStores().size())
-                            .sum();
-                    Assert.assertEquals(transactionMetaStoreCount, numOfTCPerBroker);
-                });
-    }
 }
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 9c15a0e..3c7bed9 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.testclient;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.testclient.utils.PerformanceUtils.buildTransaction;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
@@ -362,8 +361,13 @@ public class PerformanceConsumer {
         }
         PulsarClient pulsarClient = clientBuilder.build();
 
-        AtomicReference<Transaction> atomicReference = buildTransaction(pulsarClient, arguments.isEnableTransaction,
-                arguments.transactionTimeout);
+        AtomicReference<Transaction> atomicReference;
+        if (arguments.isEnableTransaction) {
+            atomicReference = new AtomicReference<>(pulsarClient.newTransaction()
+                    .withTransactionTimeout(arguments.transactionTimeout, TimeUnit.SECONDS).build().get());
+        } else {
+            atomicReference = new AtomicReference<>(null);
+        }
 
         AtomicLong messageAckedCount = new AtomicLong();
         Semaphore messageReceiveLimiter = new Semaphore(arguments.numMessagesPerTransaction);
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 20eb8f3..0e3d550 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -74,7 +74,6 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
 import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES;
 import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
 import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_BATCHING_MAX_MESSAGES;
-import static org.apache.pulsar.testclient.utils.PerformanceUtils.buildTransaction;
 
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -422,7 +421,7 @@ public class PerformanceProducer {
                 clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
             }
 
-            try (PulsarAdmin client = clientBuilder.build();) {
+            try (PulsarAdmin client = clientBuilder.build()) {
                 for (String topic : arguments.topics) {
                     log.info("Creating partitioned topic {} with {} partitions", topic, arguments.partitions);
                     try {
@@ -592,8 +591,15 @@ public class PerformanceProducer {
                     // enable round robin message routing if it is a partitioned topic
                     .messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
 
+            AtomicReference<Transaction> transactionAtomicReference;
             if (arguments.isEnableTransaction) {
                 producerBuilder.sendTimeout(0, TimeUnit.SECONDS);
+                transactionAtomicReference = new AtomicReference<>(client.newTransaction()
+                        .withTransactionTimeout(arguments.transactionTimeout, TimeUnit.SECONDS)
+                        .build()
+                        .get());
+            } else {
+                transactionAtomicReference = new AtomicReference<>(null);
             }
             if (arguments.producerName != null) {
                 String producerName = String.format("%s%s%d", arguments.producerName, arguments.separator, producerId);
@@ -659,8 +665,6 @@ public class PerformanceProducer {
             }
             // Send messages on all topics/producers
             long totalSent = 0;
-            AtomicReference<Transaction> transactionAtomicReference = buildTransaction(client,
-                    arguments.isEnableTransaction, arguments.transactionTimeout);
             AtomicLong numMessageSend = new AtomicLong(0);
             Semaphore numMsgPerTxnLimit = new Semaphore(arguments.numMessagesPerTransaction);
             while (true) {
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
index 5127f85..eee284b 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.testclient;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.pulsar.testclient.utils.PerformanceUtils.buildTransaction;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
@@ -284,7 +283,7 @@ public class PerformanceTransaction {
         ExecutorService executorService = new ThreadPoolExecutor(arguments.numTestThreads,
                 arguments.numTestThreads,
                 0L, TimeUnit.MILLISECONDS,
-                new LinkedBlockingQueue<Runnable>());
+                new LinkedBlockingQueue<>());
 
 
         long startTime = System.nanoTime();
@@ -311,16 +310,23 @@ public class PerformanceTransaction {
                     //A thread may perform tasks of multiple transactions in a traversing manner.
                     List<Producer<byte[]>> producers = null;
                     List<List<Consumer<byte[]>>> consumers = null;
+                    AtomicReference<Transaction> atomicReference = null;
                     try {
                         producers = buildProducers(client, arguments);
                         consumers = buildConsumer(client, arguments);
+                        if (!arguments.isDisableTransaction) {
+                            atomicReference = new AtomicReference<>(client.newTransaction()
+                                    .withTransactionTimeout(arguments.transactionTimeout, TimeUnit.SECONDS)
+                                    .build()
+                                    .get());
+                        } else {
+                            atomicReference = new AtomicReference<>(null);
+                        }
                     } catch (Exception e) {
                         log.error("Failed to build Producer/Consumer with exception : ", e);
                         executorService.shutdownNow();
                         PerfClientUtils.exit(-1);
                     }
-                    AtomicReference<Transaction> atomicReference = buildTransaction(client,
-                            !arguments.isDisableTransaction, arguments.transactionTimeout);
                     //The while loop has no break, and finally ends the execution through the shutdownNow of
                     //the executorService
                     while (true) {
@@ -351,7 +357,7 @@ public class PerformanceTransaction {
                         for (List<Consumer<byte[]>> subscriptions : consumers) {
                                 for (Consumer<byte[]> consumer : subscriptions) {
                                     for (int j = 0; j < arguments.numMessagesReceivedPerTransaction; j++) {
-                                        Message message = null;
+                                        Message<byte[]> message = null;
                                         try {
                                             message = consumer.receive();
                                         } catch (PulsarClientException e) {
@@ -690,9 +696,7 @@ public class PerformanceTransaction {
                 .sendTimeout(0, TimeUnit.SECONDS);
 
         final List<Future<Producer<byte[]>>> producerFutures = Lists.newArrayList();
-        Iterator<String> produceTopicsIterator = arguments.producerTopic.iterator();
-        while(produceTopicsIterator.hasNext()){
-            String topic = produceTopicsIterator.next();
+        for (String topic : arguments.producerTopic) {
             log.info("Create producer for topic {}", topic);
             producerFutures.add(producerBuilder.clone().topic(topic).createAsync());
         }
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/utils/PerformanceUtils.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/utils/PerformanceUtils.java
deleted file mode 100644
index ded1131..0000000
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/utils/PerformanceUtils.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.testclient.utils;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.transaction.Transaction;
-import org.apache.pulsar.testclient.PerformanceProducer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PerformanceUtils {
-
-    private static final Logger log = LoggerFactory.getLogger(PerformanceProducer.class);
-
-    public static AtomicReference<Transaction> buildTransaction(PulsarClient pulsarClient, boolean isEnableTransaction,
-                                                                long transactionTimeout) {
-
-        AtomicLong numBuildTxnFailed = new AtomicLong();
-        if (isEnableTransaction) {
-            while(true) {
-                AtomicReference atomicReference = null;
-                try {
-                    atomicReference = new AtomicReference(pulsarClient.newTransaction()
-                            .withTransactionTimeout(transactionTimeout, TimeUnit.SECONDS).build().get());
-                } catch (Exception e) {
-                    numBuildTxnFailed.incrementAndGet();
-                    if (numBuildTxnFailed.get()%10 == 0) {
-                        log.error("Failed to new a transaction with {} times", numBuildTxnFailed.get(), e);
-                    }
-                }
-                if (atomicReference != null && atomicReference.get() != null) {
-                    log.info("After {} failures, the transaction was created successfully for the first time",
-                            numBuildTxnFailed.get());
-                    return atomicReference;
-                }
-            }
-        }
-        return new AtomicReference<>(null);
-    }
-}
diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
index c5e62f7..a08fbe0 100644
--- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
+++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
@@ -91,8 +91,8 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testTxnPerf() throws Exception {
         String argString = "--topics-c %s --topics-p %s -threads 1 -ntxn 50 -u %s -ss %s -np 1 -au %s";
-        String testConsumeTopic = testTopic + UUID.randomUUID().toString();
-        String testProduceTopic = testTopic + UUID.randomUUID().toString();
+        String testConsumeTopic = testTopic + UUID.randomUUID();
+        String testProduceTopic = testTopic + UUID.randomUUID();
         String testSub = "testSub";
         admin.topics().createPartitionedTopic(testConsumeTopic, 1);
         String args = String.format(argString, testConsumeTopic, testProduceTopic,
@@ -119,9 +119,8 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
         CountDownLatch countDownLatch = new CountDownLatch(50);
         for (int i = 0; i < 50
                 ; i++) {
-            produceToConsumeTopic.newMessage().value(("testConsume " + i).getBytes()).sendAsync().thenRun(() -> {
-                countDownLatch.countDown();
-            });
+            produceToConsumeTopic.newMessage().value(("testConsume " + i).getBytes()).sendAsync().thenRun(
+                    countDownLatch::countDown);
         }
 
         countDownLatch.await();
@@ -149,11 +148,11 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
         for (int i = 0; i < 50; i++) {
-            Message message = consumeFromProduceTopic.receive(2, TimeUnit.SECONDS);
+            Message<byte[]> message = consumeFromProduceTopic.receive(2, TimeUnit.SECONDS);
             Assert.assertNotNull(message);
             consumeFromProduceTopic.acknowledge(message);
         }
-        Message message = consumeFromConsumeTopic.receive(2, TimeUnit.SECONDS);
+        Message<byte[]> message = consumeFromConsumeTopic.receive(2, TimeUnit.SECONDS);
         Assert.assertNull(message);
         message = consumeFromProduceTopic.receive(2, TimeUnit.SECONDS);
         Assert.assertNull(message);
@@ -187,16 +186,16 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
                 .enableBatchIndexAcknowledgment(false)
                 .subscribe();
         for (int i = 0; i < totalMessage; i++) {
-           Message message = consumer.receive(2, TimeUnit.SECONDS);
+           Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertNotNull(message);
            consumer.acknowledge(message);
         }
-        Message message = consumer.receive(2, TimeUnit.SECONDS);
+        Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
         Assert.assertNull(message);
     }
 
     @Test
-    public void testConsumeTxnMessage() throws InterruptedException, PulsarClientException, ExecutionException {
+    public void testConsumeTxnMessage() throws InterruptedException, PulsarClientException {
         String argString = "%s -r 10 -u %s -txn -ss %s -st %s -sp %s -ntxn %d";
         String subName = "sub";
         String topic = testTopic + UUID.randomUUID();
@@ -230,10 +229,10 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
                 .enableBatchIndexAcknowledgment(false)
                .subscribe();
         for (int i = 0; i < 5; i++) {
-            Message message = consumer.receive(2, TimeUnit.SECONDS);
+            Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
             Assert.assertNotNull(message);
         }
-        Message message = consumer.receive(2, TimeUnit.SECONDS);
+        Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
         Assert.assertNull(message);
     }
 

[pulsar] 12/15: [Transaction]stop TP replaying with Exception (#12700)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7ca4552ac4aaeb81b409764da34cccee0e81a97b
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Dec 17 22:28:29 2021 +0800

    [Transaction]stop TP replaying with Exception (#12700)
    
    When MLPendingAckStore replaying, if any ledger was deleted from bookkeeper, or ManagerLedger was fenced, MLPendingAckStore will not stop recovering and continue to report the exception.
    
    End replaying when there is no ledger to read or the managerLedger is fenced.
    
    Add a unit test.
    
    (cherry picked from commit a962137f530cd2d6c2315749270a1d2cae8b1cc2)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   |  6 +++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  1 +
 .../mledger/impl/ManagedCursorContainerTest.java   |  5 ++
 .../pendingack/impl/MLPendingAckStore.java         | 25 +++++----
 .../pulsar/broker/transaction/TransactionTest.java | 61 ++++++++++++++++++++++
 .../coordinator/impl/MLTransactionLogImpl.java     |  2 +-
 6 files changed, 89 insertions(+), 11 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 72ee1a1..d1fb90a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -710,4 +710,10 @@ public interface ManagedCursor {
      * @return if read position changed
      */
     boolean checkAndUpdateReadPositionChanged();
+
+    /**
+     * Checks if the cursor is closed.
+     * @return whether this cursor is closed.
+     */
+    public boolean isClosed();
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 521ec7a..3d382ad 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -837,6 +837,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         }
     }
 
+    @Override
     public boolean isClosed() {
         return state == State.Closed || state == State.Closing;
     }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 57e1964..af30c9c 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -385,6 +385,11 @@ public class ManagedCursorContainerTest {
         public boolean checkAndUpdateReadPositionChanged() {
             return false;
         }
+
+        @Override
+        public boolean isClosed() {
+            return false;
+        }
     }
 
     @Test
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index fb88878..1592318 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -34,7 +34,6 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
@@ -303,13 +302,12 @@ public class MLPendingAckStore implements PendingAckStore {
         @Override
         public void run() {
             try {
-                while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0) {
-                    if (((ManagedCursorImpl) cursor).isClosed()) {
-                        log.warn("[{}] MLPendingAckStore cursor have been closed, close replay thread.",
-                                cursor.getManagedLedger().getName());
-                        return;
-                    }
-                    fillEntryQueueCallback.fillQueue();
+                if (cursor.isClosed()) {
+                    log.warn("[{}] MLPendingAckStore cursor have been closed, close replay thread.",
+                            cursor.getManagedLedger().getName());
+                    return;
+                }
+                while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0 && fillEntryQueueCallback.fillQueue()) {
                     Entry entry = entryQueue.poll();
                     if (entry != null) {
                         ByteBuf buffer = entry.getDataBuffer();
@@ -361,15 +359,17 @@ public class MLPendingAckStore implements PendingAckStore {
 
     class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {
 
+        private volatile boolean isReadable = true;
         private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
 
-        void fillQueue() {
+        boolean fillQueue() {
             if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {
                 if (cursor.hasMoreEntries()) {
                     outstandingReadsRequests.incrementAndGet();
                     readAsync(100, this);
                 }
             }
+            return isReadable;
         }
 
         @Override
@@ -389,7 +389,12 @@ public class MLPendingAckStore implements PendingAckStore {
 
         @Override
         public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
-            log.error("MLPendingAckStore stat reply fail!", exception);
+            if (managedLedger.getConfig().isAutoSkipNonRecoverableData()
+                    && exception instanceof ManagedLedgerException.NonRecoverableLedgerException
+                    || exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
+                isReadable = false;
+            }
+            log.error("MLPendingAckStore of topic [{}] stat reply fail!", managedLedger.getName(), exception);
             outstandingReadsRequests.decrementAndGet();
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 90ddc4b..9cad6fd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -49,6 +49,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -60,8 +61,10 @@ import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
+import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -76,6 +79,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -503,6 +507,63 @@ public class TransactionTest extends TransactionTestBase {
     }
 
     @Test
+    public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{
+        String topic = NAMESPACE1 + "/testEndTPRecoveringWhenManagerLedgerDisReadable";
+        admin.topics().createNonPartitionedTopic(topic);
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .producerName("test")
+                .enableBatching(false)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .topic(topic)
+                .create();
+        producer.newMessage().send();
+
+        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
+                .getTopic(topic, false).get().get();
+        persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
+        PersistentSubscription persistentSubscription = (PersistentSubscription) persistentTopic
+                .createSubscription("test",
+                CommandSubscribe.InitialPosition.Earliest, false).get();
+
+        ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class);
+        doReturn(true).when(managedCursor).hasMoreEntries();
+        doReturn(false).when(managedCursor).isClosed();
+        doReturn(new PositionImpl(-1, -1)).when(managedCursor).getMarkDeletedPosition();
+        doAnswer(invocation -> {
+            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
+            callback.readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"),
+                    null);
+            return null;
+        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
+
+        TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class);
+        doReturn(CompletableFuture.completedFuture(
+                new MLPendingAckStore(persistentTopic.getManagedLedger(), managedCursor, null)))
+                .when(pendingAckStoreProvider).newPendingAckStore(any());
+        doReturn(CompletableFuture.completedFuture(true)).when(pendingAckStoreProvider).checkInitializedBefore(any());
+
+        Class<PulsarService> pulsarServiceClass = PulsarService.class;
+        Field field = pulsarServiceClass.getDeclaredField("transactionPendingAckStoreProvider");
+        field.setAccessible(true);
+        field.set(getPulsarServiceList().get(0), pendingAckStoreProvider);
+
+        PendingAckHandleImpl pendingAckHandle1 = new PendingAckHandleImpl(persistentSubscription);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(pendingAckHandle1.getStats().state, "Ready"));
+
+        doAnswer(invocation -> {
+            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
+            callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null);
+            return null;
+        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
+
+        PendingAckHandleImpl pendingAckHandle2 = new PendingAckHandleImpl(persistentSubscription);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(pendingAckHandle2.getStats().state, "Ready"));
+    }
+
+    @Test
     public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{
         String topic = NAMESPACE1 + "/testEndTBRecoveringWhenManagerLedgerDisReadable";
         admin.topics().createNonPartitionedTopic(topic);
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index e154bb8..8bf2ebf 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -240,7 +240,7 @@ public class MLTransactionLogImpl implements TransactionLog {
     class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {
 
         private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
-        private boolean isReadable = true;
+        private volatile boolean isReadable = true;
 
         boolean fillQueue() {
             if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {

[pulsar] 11/15: Fix Version.h not found when CMake binary directory is customized (#13324)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ebaf2b7a573e03950e6db52e02c0dae1ed012458
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Dec 15 17:08:09 2021 +0800

    Fix Version.h not found when CMake binary directory is customized (#13324)
    
    ### Motivation
    
    When I build C++ tests on my local env, the following error happened.
    
    ```
    tests/VersionTest.cc:19:10: fatal error: 'pulsar/Version.h' file not found
    #include <pulsar/Version.h>
    ```
    
    It's because I specified another directory as CMake directory.
    
    ```bash
    mkdir _builds && cd _builds && cmake ..
    ```
    
    After https://github.com/apache/pulsar/pull/12769, the `Version.h` is generated under `${CMAKE_BINARY_DIR}/include/pulsar` directory but it's not included in `CMakeLists.txt`. CI works well because it's built in the default CMake directory so that `CMAKE_BINARY_DIR` is the same with `CMAKE_SOURCE_DIR`, which is included.
    
    ### Modifications
    
    Add the `${CMAKE_BINARY_DIR}/include` to `included_directories`.
    
    (cherry picked from commit ca37e67211feda4f7e0984e6414e707f1c1dfd07)
---
 pulsar-client-cpp/CMakeLists.txt | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar-client-cpp/CMakeLists.txt b/pulsar-client-cpp/CMakeLists.txt
index 8521dbf..8ba9569 100644
--- a/pulsar-client-cpp/CMakeLists.txt
+++ b/pulsar-client-cpp/CMakeLists.txt
@@ -334,6 +334,7 @@ file(MAKE_DIRECTORY ${AUTOGEN_DIR})
 include_directories(
   ${CMAKE_SOURCE_DIR}
   ${CMAKE_SOURCE_DIR}/include
+  ${CMAKE_BINARY_DIR}/include
   ${AUTOGEN_DIR}
   ${Boost_INCLUDE_DIR}
   ${OPENSSL_INCLUDE_DIR}

[pulsar] 09/15: [Transaction] Remove request if can not send (#13308)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b09fd872aa61138fc979e05868096c21fdcf0509
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Wed Dec 15 12:48:41 2021 +0800

    [Transaction] Remove request if can not send (#13308)
    
    (cherry picked from commit eb42df7126ac4015c67f6989ec083ee173dce3f4)
---
 .../client/impl/TransactionMetaStoreHandler.java   | 44 +++++++++++++++-------
 1 file changed, 30 insertions(+), 14 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index b2b756a..3c6286b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -200,7 +200,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
         internalPinnedExecutor.execute(() -> {
             pendingRequests.put(requestId, op);
             timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
-            checkStateAndSendRequest(op);
+            if (!checkStateAndSendRequest(op)) {
+                pendingRequests.remove(requestId);
+            }
         });
         return callback;
     }
@@ -249,7 +251,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
                                         }
                                         return;
                                     }
-                                    checkStateAndSendRequest(op);
+                                    if (!checkStateAndSendRequest(op)) {
+                                        pendingRequests.remove(requestId);
+                                    }
                                 });
                             }
                             , op.backoff.next(), TimeUnit.MILLISECONDS);
@@ -279,7 +283,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
         internalPinnedExecutor.execute(() -> {
             pendingRequests.put(requestId, op);
             timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
-            checkStateAndSendRequest(op);
+            if (!checkStateAndSendRequest(op)) {
+                pendingRequests.remove(requestId);
+            }
         });
 
         return callback;
@@ -329,7 +335,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
                                         }
                                         return;
                                     }
-                                    checkStateAndSendRequest(op);
+                                    if (!checkStateAndSendRequest(op)) {
+                                        pendingRequests.remove(requestId);
+                                    }
                                 });
                             }
                             , op.backoff.next(), TimeUnit.MILLISECONDS);
@@ -360,7 +368,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
         internalPinnedExecutor.execute(() -> {
             pendingRequests.put(requestId, op);
             timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
-            checkStateAndSendRequest(op);
+            if (!checkStateAndSendRequest(op)) {
+                pendingRequests.remove(requestId);
+            }
         });
         return callback;
     }
@@ -408,7 +418,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
                                         }
                                         return;
                                     }
-                                    checkStateAndSendRequest(op);
+                                    if (!checkStateAndSendRequest(op)) {
+                                        pendingRequests.remove(requestId);
+                                    }
                                 });
                             }
                             , op.backoff.next(), TimeUnit.MILLISECONDS);
@@ -437,7 +449,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
         internalPinnedExecutor.execute(() -> {
             pendingRequests.put(requestId, op);
             timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
-            checkStateAndSendRequest(op);
+            if (!checkStateAndSendRequest(op)) {
+                pendingRequests.remove(requestId);
+            }
         });
         return callback;
     }
@@ -486,7 +500,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
                                         }
                                         return;
                                     }
-                                    checkStateAndSendRequest(op);
+                                    if (!checkStateAndSendRequest(op)) {
+                                        pendingRequests.remove(requestId);
+                                    }
                                 });
                             }
                             , op.backoff.next(), TimeUnit.MILLISECONDS);
@@ -634,7 +650,7 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
         return true;
     }
 
-    private void checkStateAndSendRequest(OpBase<?> op) {
+    private boolean checkStateAndSendRequest(OpBase<?> op) {
         switch (getState()) {
             case Ready:
                 ClientCnx cnx = cnx();
@@ -644,9 +660,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
                 } else {
                     LOG.error("The cnx was null when the TC handler was ready", new NullPointerException());
                 }
-                break;
+                return true;
             case Connecting:
-                break;
+                return true;
             case Closing:
             case Closed:
                 op.callback.completeExceptionally(
@@ -655,7 +671,7 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
                                         + transactionCoordinatorId
                                         + " is closing or closed."));
                 onResponse(op);
-                break;
+                return false;
             case Failed:
             case Uninitialized:
                 op.callback.completeExceptionally(
@@ -664,13 +680,13 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
                                         + transactionCoordinatorId
                                         + " not connected."));
                 onResponse(op);
-                break;
+                return false;
             default:
                 op.callback.completeExceptionally(
                         new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException(
                                 transactionCoordinatorId));
                 onResponse(op);
-                break;
+                return false;
         }
     }
 

[pulsar] 03/15: Some depdency in integration tests scope should be test (#12696)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 757c121f6e2a31a040038d39576d244e4b3f28f2
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Wed Nov 10 20:44:58 2021 +0800

    Some depdency in integration tests scope should be test (#12696)
    
    (cherry picked from commit b27a7169e8538430b8b31fe39e2615a8236be436)
---
 tests/integration/pom.xml | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index bfaa069..dcbaa8f 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -148,12 +148,14 @@
     <dependency>
   	  <groupId>org.elasticsearch.client</groupId>
   	  <artifactId>elasticsearch-rest-high-level-client</artifactId>
+      <scope>test</scope>
   	</dependency>
 
     <dependency>
       <groupId>com.rabbitmq</groupId>
       <artifactId>amqp-client</artifactId>
       <version>${rabbitmq-client.version}</version>
+      <scope>test</scope>
     </dependency>
 
     <dependency>

[pulsar] 04/15: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL (#12720)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9db66c31370c0b2a693f92ea8fba6d02e4a9bdd4
Author: ran <ga...@126.com>
AuthorDate: Mon Dec 13 11:20:47 2021 +0800

    [Pulsar SQL] Support query chunked messages feature in Pulsar SQL (#12720)
    
    ### Motivation
    
    Currently, the Pulsar SQL didn't support query chunked messages.
    
    ### Modifications
    
    Add a chunked message map in `PulsarRecordCursor` to maintain incomplete chunked messages, if one chunked message was received completely, it will be offered in the message queue to wait for deserialization.
    
    (cherry picked from commit 93b74b5498ced9e75512c593e6e5a9f5a6c8f26b)
---
 .../apache/pulsar/client/impl/ProducerImpl.java    |   9 +
 .../apache/pulsar/common/api/raw/RawMessage.java   |  29 +++
 .../pulsar/common/api/raw/RawMessageImpl.java      |  44 +++++
 .../pulsar/sql/presto/PulsarRecordCursor.java      | 141 ++++++++++++--
 .../pulsar/sql/presto/TestReadChunkedMessages.java | 214 +++++++++++++++++++++
 5 files changed, 424 insertions(+), 13 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index cc978a1..a3ca386 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -463,7 +463,16 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     sequenceId = msgMetadata.getSequenceId();
                 }
                 String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
+                byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion() ?
+                        msg.getMessageBuilder().getSchemaVersion() : null;
                 for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+                    // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
+                    // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
+                    // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
+                    // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
+                    if (chunkId > 0 && schemaVersion != null) {
+                        msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+                    }
                     serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
                             readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed,
                             compressedPayload.readableBytes(), uncompressedSize, callback);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
index d093628..483b5a3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
@@ -121,4 +121,33 @@ public interface RawMessage {
      * @return true if the key is base64 encoded, false otherwise
      */
     boolean hasBase64EncodedKey();
+
+    /**
+     * Get uuid of chunked message.
+     *
+     * @return uuid
+     */
+    String getUUID();
+
+    /**
+     * Get chunkId of chunked message.
+     *
+     * @return chunkId
+     */
+    int getChunkId();
+
+    /**
+     * Get chunk num of chunked message.
+     *
+     * @return chunk num
+     */
+    int getNumChunksFromMsg();
+
+    /**
+     * Get chunk message total size in bytes.
+     *
+     * @return chunked message total size in bytes
+     */
+    int getTotalChunkMsgSize();
+
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
index defc1b4..3aa0cbc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
@@ -81,6 +81,14 @@ public class RawMessageImpl implements RawMessage {
         return msg;
     }
 
+    public RawMessage updatePayloadForChunkedMessage(ByteBuf chunkedTotalPayload) {
+        if (!msgMetadata.getMetadata().hasNumChunksFromMsg() || msgMetadata.getMetadata().getNumChunksFromMsg() <= 1) {
+            throw new RuntimeException("The update payload operation only support multi chunked messages.");
+        }
+        payload = chunkedTotalPayload;
+        return this;
+    }
+
     @Override
     public Map<String, String> getProperties() {
         if (singleMessageMetadata != null && singleMessageMetadata.getPropertiesCount() > 0) {
@@ -170,6 +178,42 @@ public class RawMessageImpl implements RawMessage {
         return msgMetadata.getMetadata().isPartitionKeyB64Encoded();
     }
 
+    @Override
+    public String getUUID() {
+        if (msgMetadata.getMetadata().hasUuid()) {
+            return msgMetadata.getMetadata().getUuid();
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public int getChunkId() {
+        if (msgMetadata.getMetadata().hasChunkId()) {
+            return msgMetadata.getMetadata().getChunkId();
+        } else {
+            return -1;
+        }
+    }
+
+    @Override
+    public int getNumChunksFromMsg() {
+        if (msgMetadata.getMetadata().hasNumChunksFromMsg()) {
+            return msgMetadata.getMetadata().getNumChunksFromMsg();
+        } else {
+            return -1;
+        }
+    }
+
+    @Override
+    public int getTotalChunkMsgSize() {
+        if (msgMetadata.getMetadata().hasTotalChunkMsgSize()) {
+            return msgMetadata.getMetadata().getTotalChunkMsgSize();
+        } else {
+            return -1;
+        }
+    }
+
     public int getBatchSize() {
         return msgMetadata.getMetadata().getNumMessagesInBatch();
     }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index b1230d3..558b87b 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -29,6 +29,9 @@ import com.google.common.annotations.VisibleForTesting;
 import io.airlift.log.Logger;
 import io.airlift.slice.Slice;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import io.netty.util.ReferenceCountUtil;
 import io.prestosql.decoder.DecoderColumnHandle;
 import io.prestosql.decoder.FieldValueProvider;
 import io.prestosql.spi.block.Block;
@@ -58,6 +61,8 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
 import org.apache.pulsar.common.api.raw.MessageParser;
 import org.apache.pulsar.common.api.raw.RawMessage;
+import org.apache.pulsar.common.api.raw.RawMessageIdImpl;
+import org.apache.pulsar.common.api.raw.RawMessageImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -66,6 +71,7 @@ import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.sql.presto.util.CacheSizeAllocator;
 import org.apache.pulsar.sql.presto.util.NoStrictCacheSizeAllocator;
 import org.apache.pulsar.sql.presto.util.NullCacheSizeAllocator;
@@ -112,6 +118,8 @@ public class PulsarRecordCursor implements RecordCursor {
 
     PulsarDispatchingRowDecoderFactory decoderFactory;
 
+    protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentOpenHashMap<>();
+
     private static final Logger log = Logger.get(PulsarRecordCursor.class);
 
     public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit,
@@ -265,7 +273,8 @@ public class PulsarRecordCursor implements RecordCursor {
                             metricsTracker.register_BYTES_READ(bytes);
 
                             // check if we have processed all entries in this split
-                            if (((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0) {
+                            // and no incomplete chunked messages exist
+                            if (entryExceedSplitEndPosition(entry) && chunkedMessagesMap.isEmpty()) {
                                 return;
                             }
 
@@ -279,15 +288,25 @@ public class PulsarRecordCursor implements RecordCursor {
                                                 // start time for message queue read
                                                 metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
 
-                                                while (true) {
-                                                    if (!haveAvailableCacheSize(
-                                                            messageQueueCacheSizeAllocator, messageQueue)
-                                                            || !messageQueue.offer(message)) {
-                                                        Thread.sleep(1);
-                                                    } else {
-                                                        messageQueueCacheSizeAllocator.allocate(
-                                                                message.getData().readableBytes());
-                                                        break;
+                                                if (message.getNumChunksFromMsg() > 1)  {
+                                                    message = processChunkedMessages(message);
+                                                } else if (entryExceedSplitEndPosition(entry)) {
+                                                    // skip no chunk or no multi chunk message
+                                                    // that exceed split end position
+                                                    message.release();
+                                                    message = null;
+                                                }
+                                                if (message != null) {
+                                                    while (true) {
+                                                        if (!haveAvailableCacheSize(
+                                                                messageQueueCacheSizeAllocator, messageQueue)
+                                                                || !messageQueue.offer(message)) {
+                                                            Thread.sleep(1);
+                                                        } else {
+                                                            messageQueueCacheSizeAllocator.allocate(
+                                                                    message.getData().readableBytes());
+                                                            break;
+                                                        }
                                                     }
                                                 }
 
@@ -328,6 +347,10 @@ public class PulsarRecordCursor implements RecordCursor {
         }
     }
 
+    private boolean entryExceedSplitEndPosition(Entry entry) {
+        return ((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0;
+    }
+
     @VisibleForTesting
     class ReadEntries implements AsyncCallbacks.ReadEntriesCallback {
 
@@ -341,8 +364,9 @@ public class PulsarRecordCursor implements RecordCursor {
         public void run() {
 
             if (outstandingReadsRequests.get() > 0) {
-                if (!cursor.hasMoreEntries() || ((PositionImpl) cursor.getReadPosition())
-                        .compareTo(pulsarSplit.getEndPosition()) >= 0) {
+                if (!cursor.hasMoreEntries() ||
+                        (((PositionImpl) cursor.getReadPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0
+                                && chunkedMessagesMap.isEmpty())) {
                     isDone = true;
 
                 } else {
@@ -408,7 +432,7 @@ public class PulsarRecordCursor implements RecordCursor {
 
         public boolean hasFinished() {
             return messageQueue.isEmpty() && isDone && outstandingReadsRequests.get() >= 1
-                && splitSize <= entriesProcessed;
+                && splitSize <= entriesProcessed && chunkedMessagesMap.isEmpty();
         }
 
         @Override
@@ -732,4 +756,95 @@ public class PulsarRecordCursor implements RecordCursor {
         }
     }
 
+    private RawMessage processChunkedMessages(RawMessage message) {
+        final String uuid = message.getUUID();
+        final int chunkId = message.getChunkId();
+        final int totalChunkMsgSize = message.getTotalChunkMsgSize();
+        final int numChunks = message.getNumChunksFromMsg();
+
+        RawMessageIdImpl rawMessageId = (RawMessageIdImpl) message.getMessageId();
+        if (rawMessageId.getLedgerId() > pulsarSplit.getEndPositionLedgerId()
+                && !chunkedMessagesMap.containsKey(uuid)) {
+            // If the message is out of the split range, we only care about the incomplete chunked messages.
+            message.release();
+            return null;
+        }
+        if (chunkId == 0) {
+            ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(totalChunkMsgSize, totalChunkMsgSize);
+            chunkedMessagesMap.computeIfAbsent(uuid, (key) -> ChunkedMessageCtx.get(numChunks, chunkedMsgBuffer));
+        }
+
+        ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(uuid);
+        if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
+                || chunkId != (chunkedMsgCtx.lastChunkedMessageId + 1) || chunkId >= numChunks) {
+            // Means we lost the first chunk, it will happen when the beginning chunk didn't belong to this split.
+            log.info("Received unexpected chunk. messageId: %s, last-chunk-id: %s chunkId: %s, totalChunks: %s",
+                    message.getMessageId(),
+                    (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), chunkId,
+                    numChunks);
+            if (chunkedMsgCtx != null) {
+                if (chunkedMsgCtx.chunkedMsgBuffer != null) {
+                    ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
+                }
+                chunkedMsgCtx.recycle();
+            }
+            chunkedMessagesMap.remove(uuid);
+            message.release();
+            return null;
+        }
+
+        // append the chunked payload and update lastChunkedMessage-id
+        chunkedMsgCtx.chunkedMsgBuffer.writeBytes(message.getData());
+        chunkedMsgCtx.lastChunkedMessageId = chunkId;
+
+        // if final chunk is not received yet then release payload and return
+        if (chunkId != (numChunks - 1)) {
+            message.release();
+            return null;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Chunked message completed. chunkId: %s, totalChunks: %s, msgId: %s, sequenceId: %s",
+                    chunkId, numChunks, rawMessageId, message.getSequenceId());
+        }
+        chunkedMessagesMap.remove(uuid);
+        ByteBuf unCompressedPayload = chunkedMsgCtx.chunkedMsgBuffer;
+        chunkedMsgCtx.recycle();
+        // The chunked message complete, we use the entire payload to instead of the last chunk payload.
+        return ((RawMessageImpl) message).updatePayloadForChunkedMessage(unCompressedPayload);
+    }
+
+    static class ChunkedMessageCtx {
+
+        protected int totalChunks = -1;
+        protected ByteBuf chunkedMsgBuffer;
+        protected int lastChunkedMessageId = -1;
+
+        static ChunkedMessageCtx get(int numChunksFromMsg, ByteBuf chunkedMsgBuffer) {
+            ChunkedMessageCtx ctx = RECYCLER.get();
+            ctx.totalChunks = numChunksFromMsg;
+            ctx.chunkedMsgBuffer = chunkedMsgBuffer;
+            return ctx;
+        }
+
+        private final Recycler.Handle<ChunkedMessageCtx> recyclerHandle;
+
+        private ChunkedMessageCtx(Recycler.Handle<ChunkedMessageCtx> recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        private static final Recycler<ChunkedMessageCtx> RECYCLER = new Recycler<ChunkedMessageCtx>() {
+            protected ChunkedMessageCtx newObject(Recycler.Handle<ChunkedMessageCtx> handle) {
+                return new ChunkedMessageCtx(handle);
+            }
+        };
+
+        public void recycle() {
+            this.totalChunks = -1;
+            this.chunkedMsgBuffer = null;
+            this.lastChunkedMessageId = -1;
+            recyclerHandle.recycle(this);
+        }
+    }
+
 }
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java
new file mode 100644
index 0000000..0a02dc3
--- /dev/null
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import com.google.common.collect.Sets;
+import io.prestosql.spi.connector.ConnectorContext;
+import io.prestosql.spi.predicate.TupleDomain;
+import io.prestosql.testing.TestingConnectorContext;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test read chunked messages.
+ */
+@Test
+@Slf4j
+public class TestReadChunkedMessages extends MockedPulsarServiceBaseTest {
+
+    private final static int MAX_MESSAGE_SIZE = 1024 * 1024;
+
+    @EqualsAndHashCode
+    @Data
+    static class Movie {
+        private String name;
+        private Long publishTime;
+        private byte[] binaryData;
+    }
+
+    @EqualsAndHashCode
+    @Data
+    static class MovieMessage {
+        private Movie movie;
+        private String messageId;
+    }
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        conf.setMaxMessageSize(MAX_MESSAGE_SIZE);
+        conf.setManagedLedgerMaxEntriesPerLedger(5);
+        conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+        internalSetup();
+
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+
+        // so that clients can test short names
+        admin.tenants().createTenant("public",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("public/default");
+        admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test"));
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        internalCleanup();
+    }
+
+    @Test
+    public void queryTest() throws Exception {
+        String topic = "chunk-topic";
+        TopicName topicName = TopicName.get(topic);
+        int messageCnt = 20;
+        Set<MovieMessage> messageSet = prepareChunkedData(topic, messageCnt);
+        SchemaInfo schemaInfo = Schema.AVRO(Movie.class).getSchemaInfo();
+
+        PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig();
+        connectorConfig.setWebServiceUrl(pulsar.getWebServiceAddress());
+        PulsarSplitManager pulsarSplitManager = new PulsarSplitManager(new PulsarConnectorId("1"), connectorConfig);
+        Collection<PulsarSplit> splits = pulsarSplitManager.getSplitsForTopic(
+                topicName.getPersistenceNamingEncoding(),
+                pulsar.getManagedLedgerFactory(),
+                new ManagedLedgerConfig(),
+                3,
+                new PulsarTableHandle("1", topicName.getNamespace(), topic, topic),
+                schemaInfo,
+                topic,
+                TupleDomain.all(),
+                null);
+
+        List<PulsarColumnHandle> columnHandleList = TestPulsarConnector.getColumnColumnHandles(
+                topicName, schemaInfo, PulsarColumnHandle.HandleKeyValueType.NONE, true);
+        ConnectorContext prestoConnectorContext = new TestingConnectorContext();
+
+        for (PulsarSplit split : splits) {
+            queryAndCheck(columnHandleList, split, connectorConfig, prestoConnectorContext, messageSet);
+        }
+        Assert.assertTrue(messageSet.isEmpty());
+    }
+
+    private Set<MovieMessage> prepareChunkedData(String topic, int messageCnt) throws PulsarClientException, InterruptedException {
+        pulsarClient.newConsumer(Schema.AVRO(Movie.class))
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscribe()
+                .close();
+        Producer<Movie> producer = pulsarClient.newProducer(Schema.AVRO(Movie.class))
+                .topic(topic)
+                .enableBatching(false)
+                .enableChunking(true)
+                .create();
+        Set<MovieMessage> messageSet = new LinkedHashSet<>();
+        CountDownLatch countDownLatch = new CountDownLatch(messageCnt);
+        for (int i = 0; i < messageCnt; i++) {
+            final double dataTimes = (i % 5) * 0.5;
+            byte[] movieBinaryData = RandomUtils.nextBytes((int) (MAX_MESSAGE_SIZE * dataTimes));
+            final int length = movieBinaryData.length;
+            final int index = i;
+
+            Movie movie = new Movie();
+            movie.setName("movie-" + i);
+            movie.setPublishTime(System.currentTimeMillis());
+            movie.setBinaryData(movieBinaryData);
+            producer.newMessage().value(movie).sendAsync()
+                    .whenComplete((msgId, throwable) -> {
+                        if (throwable != null) {
+                            log.error("Failed to produce message.", throwable);
+                            countDownLatch.countDown();
+                            return;
+                        }
+                        MovieMessage movieMessage = new MovieMessage();
+                        movieMessage.setMovie(movie);
+                        MessageIdImpl messageId = (MessageIdImpl) msgId;
+                        movieMessage.setMessageId("(" + messageId.getLedgerId() + "," + messageId.getEntryId() + ",0)");
+                        messageSet.add(movieMessage);
+                        countDownLatch.countDown();
+                    });
+        }
+        countDownLatch.await();
+        Assert.assertEquals(messageCnt, messageSet.size());
+        producer.close();
+        return messageSet;
+    }
+
+    private void queryAndCheck(List<PulsarColumnHandle> columnHandleList,
+                               PulsarSplit split,
+                               PulsarConnectorConfig connectorConfig,
+                               ConnectorContext prestoConnectorContext,
+                               Set<MovieMessage> messageSet) {
+        PulsarRecordCursor pulsarRecordCursor = new PulsarRecordCursor(
+                columnHandleList, split, connectorConfig, pulsar.getManagedLedgerFactory(),
+                new ManagedLedgerConfig(), new PulsarConnectorMetricsTracker(new NullStatsProvider()),
+                new PulsarDispatchingRowDecoderFactory(prestoConnectorContext.getTypeManager()));
+
+        AtomicInteger receiveMsgCnt = new AtomicInteger(messageSet.size());
+        while (pulsarRecordCursor.advanceNextPosition()) {
+            Movie movie = new Movie();
+            MovieMessage movieMessage = new MovieMessage();
+            movieMessage.setMovie(movie);
+            for (int i = 0; i < columnHandleList.size(); i++) {
+                switch (columnHandleList.get(i).getName()) {
+                    case "binaryData":
+                        movie.setBinaryData(pulsarRecordCursor.getSlice(i).getBytes());
+                        break;
+                    case "name":
+                        movie.setName(new String(pulsarRecordCursor.getSlice(i).getBytes()));
+                        break;
+                    case "publishTime":
+                        movie.setPublishTime(pulsarRecordCursor.getLong(i));
+                        break;
+                    case "__message_id__":
+                        movieMessage.setMessageId(new String(pulsarRecordCursor.getSlice(i).getBytes()));
+                    default:
+                        // do nothing
+                        break;
+                }
+            }
+
+            Assert.assertTrue(messageSet.contains(movieMessage));
+            messageSet.remove(movieMessage);
+            receiveMsgCnt.decrementAndGet();
+        }
+    }
+
+}

[pulsar] 01/15: [Issue #12485][Python Client] cannot use any values that evaluates to False (#12489)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8fa7139ade8cefe2ec1fcd6873204546ed75d9df
Author: Travis Sturzl <tr...@gmail.com>
AuthorDate: Wed Nov 10 05:17:39 2021 -0700

    [Issue #12485][Python Client] cannot use any values that evaluates to False (#12489)
    
    (cherry picked from commit aa59f753590ce9e6c0a7cddd1b19a89e5ef539ee)
---
 pulsar-client-cpp/python/pulsar/schema/definition.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py
index 9b6c861..a7a235b 100644
--- a/pulsar-client-cpp/python/pulsar/schema/definition.py
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -184,7 +184,7 @@ class Record(with_metaclass(RecordMeta, object)):
         return self.__class__
 
     def validate_type(self, name, val):
-        if not val and not self._required:
+        if val is None and not self._required:
             return self.default()
 
         if not isinstance(val, self.__class__):
@@ -219,7 +219,7 @@ class Field(object):
         pass
 
     def validate_type(self, name, val):
-        if not val and not self._required:
+        if val is None and not self._required:
             return self.default()
 
         if type(val) != self.python_type():
@@ -350,7 +350,7 @@ class String(Field):
     def validate_type(self, name, val):
         t = type(val)
 
-        if not val and not self._required:
+        if val is None and not self._required:
             return self.default()
 
         if not (t is str or t.__name__ == 'unicode'):

[pulsar] 08/15: [Broker] Modify return result of NamespacesBase#internalGetPublishRate (#13237)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3e076bbf8804ab1a92386c22d03b0d6e31aa65ba
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Wed Dec 15 15:54:41 2021 +0800

    [Broker] Modify return result of NamespacesBase#internalGetPublishRate (#13237)
    
    ### Motivation
    It should return `null` instead of `RestException` in method `NamespacesBase#internalGetPublishRate`, because `null` means that the `publish-rate` is not configured.
    It is the same as `internalGetSubscriptionDispatchRate` as below:
    https://github.com/apache/pulsar/blob/6d9d24d50db5418ddbb845d2c7a2be2b9ac72893/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java#L1303-L1308
    
    (cherry picked from commit 3e55b4f2c40ecd73bb38962cebc4a822bfe5f1ef)
---
 .../apache/pulsar/broker/admin/impl/NamespacesBase.java   |  8 +-------
 .../org/apache/pulsar/broker/admin/v1/Namespaces.java     | 15 ++++++++++-----
 .../org/apache/pulsar/broker/admin/v2/Namespaces.java     | 15 ++++++++++-----
 .../client/api/AuthorizationProducerConsumerTest.java     |  2 ++
 4 files changed, 23 insertions(+), 17 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 87c091a..5f1231f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1197,13 +1197,7 @@ public abstract class NamespacesBase extends AdminResource {
         validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
 
         Policies policies = getNamespacePolicies(namespaceName);
-        PublishRate publishRate = policies.publishMaxMessageRate.get(pulsar().getConfiguration().getClusterName());
-        if (publishRate != null) {
-            return publishRate;
-        } else {
-            throw new RestException(Status.NOT_FOUND,
-                    "Publish-rate is not configured for cluster " + pulsar().getConfiguration().getClusterName());
-        }
+        return policies.publishMaxMessageRate.get(pulsar().getConfiguration().getClusterName());
     }
 
     @SuppressWarnings("deprecation")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 51728d6..afb6174 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -675,7 +675,8 @@ public class Namespaces extends NamespacesBase {
     @GET
     @Path("/{property}/{cluster}/{namespace}/publishRate")
     @ApiOperation(hidden = true,
-            value = "Get publish-rate configured for the namespace, -1 represents not configured yet")
+            value = "Get publish-rate configured for the namespace, null means publish-rate not configured, "
+                    + "-1 means msg-publish-rate or byte-publish-rate not configured in publish-rate yet")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist")})
     public PublishRate getPublishRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
@@ -697,7 +698,8 @@ public class Namespaces extends NamespacesBase {
     @GET
     @Path("/{property}/{cluster}/{namespace}/dispatchRate")
     @ApiOperation(hidden = true,
-            value = "Get dispatch-rate configured for the namespace, -1 represents not configured yet")
+            value = "Get dispatch-rate configured for the namespace, null means dispatch-rate not configured, "
+                    + "-1 means msg-dispatch-rate or byte-dispatch-rate not configured in dispatch-rate yet")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist")})
     public DispatchRate getDispatchRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
@@ -720,8 +722,9 @@ public class Namespaces extends NamespacesBase {
 
     @GET
     @Path("/{property}/{cluster}/{namespace}/subscriptionDispatchRate")
-    @ApiOperation(value =
-            "Get Subscription dispatch-rate configured for the namespace, -1 represents not configured yet")
+    @ApiOperation(value = "Get subscription dispatch-rate configured for the namespace, null means subscription "
+            + "dispatch-rate not configured, -1 means msg-dispatch-rate or byte-dispatch-rate not configured "
+            + "in dispatch-rate yet")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist")})
     public DispatchRate getSubscriptionDispatchRate(@PathParam("property") String property,
@@ -746,7 +749,9 @@ public class Namespaces extends NamespacesBase {
 
     @GET
     @Path("/{tenant}/{cluster}/{namespace}/replicatorDispatchRate")
-    @ApiOperation(value = "Get replicator dispatch-rate configured for the namespace, -1 represents not configured yet")
+    @ApiOperation(value = "Get replicator dispatch-rate configured for the namespace, null means replicator "
+            + "dispatch-rate not configured, -1 means msg-dispatch-rate or byte-dispatch-rate not configured "
+            + "in dispatch-rate yet")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
         @ApiResponse(code = 404, message = "Namespace does not exist") })
     public DispatchRate getReplicatorDispatchRate(@PathParam("tenant") String tenant,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index e1e892c..37b7354 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -602,7 +602,8 @@ public class Namespaces extends NamespacesBase {
     @GET
     @Path("/{property}/{namespace}/publishRate")
     @ApiOperation(hidden = true,
-            value = "Get publish-rate configured for the namespace, -1 represents not configured yet")
+            value = "Get publish-rate configured for the namespace, null means publish-rate not configured, "
+                    + "-1 means msg-publish-rate or byte-publish-rate not configured in publish-rate yet")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist")})
     public PublishRate getPublishRate(
@@ -634,7 +635,8 @@ public class Namespaces extends NamespacesBase {
 
     @GET
     @Path("/{tenant}/{namespace}/dispatchRate")
-    @ApiOperation(value = "Get dispatch-rate configured for the namespace, -1 represents not configured yet")
+    @ApiOperation(value = "Get dispatch-rate configured for the namespace, null means dispatch-rate not configured, "
+            + "-1 means msg-dispatch-rate or byte-dispatch-rate not configured in dispatch-rate yet")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
     public DispatchRate getDispatchRate(@PathParam("tenant") String tenant,
@@ -657,8 +659,9 @@ public class Namespaces extends NamespacesBase {
 
     @GET
     @Path("/{tenant}/{namespace}/subscriptionDispatchRate")
-    @ApiOperation(
-            value = "Get Subscription dispatch-rate configured for the namespace, -1 represents not configured yet")
+    @ApiOperation(value = "Get subscription dispatch-rate configured for the namespace, null means subscription "
+            + "dispatch-rate not configured, -1 means msg-dispatch-rate or byte-dispatch-rate not configured "
+            + "in dispatch-rate yet")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist")})
     public DispatchRate getSubscriptionDispatchRate(@PathParam("tenant") String tenant,
@@ -732,7 +735,9 @@ public class Namespaces extends NamespacesBase {
 
     @GET
     @Path("/{tenant}/{namespace}/replicatorDispatchRate")
-    @ApiOperation(value = "Get replicator dispatch-rate configured for the namespace, -1 represents not configured yet")
+    @ApiOperation(value = "Get replicator dispatch-rate configured for the namespace, null means replicator "
+            + "dispatch-rate not configured, -1 means msg-dispatch-rate or byte-dispatch-rate not configured "
+            + "in dispatch-rate yet")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
         @ApiResponse(code = 404, message = "Namespace does not exist") })
     public DispatchRate getReplicatorDispatchRate(@PathParam("tenant") String tenant,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index c9d76b0..62aa429 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -22,6 +22,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
@@ -209,6 +210,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
         superAdmin.tenants().createTenant("my-property",
                 new TenantInfoImpl(Sets.newHashSet(tenantRole), Sets.newHashSet("test")));
         superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        assertNull(superAdmin.namespaces().getPublishRate(namespace));
 
         // subscriptionRole doesn't have topic-level authorization, so it will fail to get topic stats-internal info
         try {

[pulsar] 10/15: [Transaction] Allow transaction be commit or abort in the state of aborting or committing. (#13323)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 22a24d62e8d4a0da43f6b28ff911696a515b6e0b
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Wed Dec 15 13:41:02 2021 +0800

    [Transaction] Allow transaction be commit or abort in the state of aborting or committing. (#13323)
    
    Due to operations such as concurrency, retry, and timeout abort. We should allow  `abort` or `commit` in the state of aborting or committing
    Allow `abort` or `commit` in the state of aborting or committing
    
    (cherry picked from commit d220c21a601a9a2c97ee3434b3b44b4bd9b8adc1)
---
 .../broker/namespace/NamespaceServiceTest.java     |  1 -
 .../pulsar/broker/transaction/TransactionTest.java | 26 ++++++++++++++++++
 .../client/impl/transaction/TransactionImpl.java   | 31 ++++++++++++++++++----
 3 files changed, 52 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index d45dcc2..bb33ac8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -66,7 +66,6 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.impl.BundlesDataImpl.BundlesDataImplBuilder;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.GetResult;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 3d0c406..90ddc4b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -76,6 +76,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -560,4 +561,29 @@ public class TransactionTest extends TransactionTestBase {
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore2.getCoordinatorStats().state, "Ready"));
     }
+
+    @Test
+    public void testEndTxnWhenCommittingOrAborting() throws Exception {
+        Transaction commitTxn = pulsarClient
+                .newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build()
+                .get();
+        Transaction abortTxn = pulsarClient
+                .newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        Class<TransactionImpl> transactionClass = TransactionImpl.class;
+        Field field = transactionClass.getDeclaredField("state");
+        field.setAccessible(true);
+
+        field.set(commitTxn, TransactionImpl.State.COMMITTING);
+        field.set(abortTxn, TransactionImpl.State.ABORTING);
+
+        abortTxn.abort();
+        commitTxn.commit();
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 60c7829..458976a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -144,7 +144,7 @@ public class TransactionImpl implements Transaction {
 
     @Override
     public CompletableFuture<Void> commit() {
-        return checkIfOpen().thenCompose((value) -> {
+        return checkIfOpenOrCommitting().thenCompose((value) -> {
             CompletableFuture<Void> commitFuture = new CompletableFuture<>();
             this.state = State.COMMITTING;
             allOpComplete().whenComplete((v, e) -> {
@@ -172,7 +172,7 @@ public class TransactionImpl implements Transaction {
 
     @Override
     public CompletableFuture<Void> abort() {
-        return checkIfOpen().thenCompose(value -> {
+        return checkIfOpenOrAborting().thenCompose(value -> {
             CompletableFuture<Void> abortFuture = new CompletableFuture<>();
             this.state = State.ABORTING;
             allOpComplete().whenComplete((v, e) -> {
@@ -217,12 +217,33 @@ public class TransactionImpl implements Transaction {
         if (state == State.OPEN) {
             return CompletableFuture.completedFuture(null);
         } else {
-            return FutureUtil.failedFuture(new InvalidTxnStatusException("[" + txnIdMostBits + ":"
-                    + txnIdLeastBits + "] with unexpected state : "
-                    + state.name() + ", expect " + State.OPEN + " state!"));
+            return invalidTxnStatusFuture();
+        }
+    }
+
+    private CompletableFuture<Void> checkIfOpenOrCommitting() {
+        if (state == State.OPEN || state == State.COMMITTING) {
+            return CompletableFuture.completedFuture(null);
+        } else {
+            return invalidTxnStatusFuture();
+        }
+    }
+
+    private CompletableFuture<Void> checkIfOpenOrAborting() {
+        if (state == State.OPEN || state == State.ABORTING) {
+            return CompletableFuture.completedFuture(null);
+        } else {
+            return invalidTxnStatusFuture();
         }
     }
 
+    private CompletableFuture<Void> invalidTxnStatusFuture() {
+        return FutureUtil.failedFuture(new InvalidTxnStatusException("[" + txnIdMostBits + ":"
+                + txnIdLeastBits + "] with unexpected state : "
+                + state.name() + ", expect " + State.OPEN + " state!"));
+    }
+
+
     private CompletableFuture<Void> allOpComplete() {
         List<CompletableFuture<?>> futureList = new ArrayList<>();
         futureList.addAll(sendFutureList);

[pulsar] 14/15: [Transaction]Txn client check timeout (#12521)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b244d0c3bc625bb4d4d3108735fd2a219d328a50
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Sat Dec 18 08:35:18 2021 +0800

    [Transaction]Txn client check timeout (#12521)
    
    ### Motivation
    Optimize the logic on the Transaction Client side.
    Avoid sending and acking messages with timeout  transactions.
    
    ### Modifications
    
    * TransactionImp
    
         *  Add a tool field for CAS to replace State : STATE_UPDATE.
    **When committing and aborted, only the successful cas operation will make subsequent judgments, otherwise it will return a failure future**
         *   Implement TimerTasker to execute tasks that replace the state of the transaction as Aborted.
    * TransactionBuildImpl
         * In the callback of the build method, call the timer of PulsarClient to start a Timeout. Pass in the corresponding transactionImpl (TimeTasker has been implemented)
    
    (cherry picked from commit c5d7a84c8e5c27e48022df8c7082496840cd3be9)
---
 .../client/impl/ConsumerAckResponseTest.java       |  2 +
 .../client/impl/TransactionEndToEndTest.java       | 67 +++++++++++++++++++++-
 .../TransactionCoordinatorClientException.java     | 20 +++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  4 ++
 .../client/impl/PartitionedProducerImpl.java       |  5 ++
 .../apache/pulsar/client/impl/ProducerImpl.java    |  4 ++
 .../pulsar/client/impl/PulsarClientImpl.java       |  1 +
 .../impl/transaction/TransactionBuilderImpl.java   |  9 ++-
 .../client/impl/transaction/TransactionImpl.java   | 38 +++++++++---
 9 files changed, 137 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
index 0378c53..6981865 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
@@ -50,8 +50,10 @@ public class ConsumerAckResponseTest extends ProducerConsumerBase {
         super.producerBaseSetup();
         doReturn(1L).when(transaction).getTxnIdLeastBits();
         doReturn(1L).when(transaction).getTxnIdMostBits();
+        doReturn(TransactionImpl.State.OPEN).when(transaction).getState();
         CompletableFuture<Void> completableFuture = CompletableFuture.completedFuture(null);
         doNothing().when(transaction).registerAckOp(any());
+        doReturn(true).when(transaction).checkIfOpen(any());
         doReturn(completableFuture).when(transaction).registerAckedTopic(any(), any());
 
         Thread.sleep(1000 * 3);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 4630449..f52d319 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -23,7 +23,9 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
+import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.ArrayList;
@@ -770,19 +772,46 @@ public class TransactionEndToEndTest extends TransactionTestBase {
             }
         });
 
+        Class<TransactionImpl> transactionClass = TransactionImpl.class;
+        Constructor<TransactionImpl> constructor = transactionClass
+                .getDeclaredConstructor(PulsarClientImpl.class, long.class, long.class, long.class);
+        constructor.setAccessible(true);
+
+        TransactionImpl timeoutTxnSkipClientTimeout = constructor.newInstance(pulsarClient, 5,
+                        timeoutTxn.getTxnID().getLeastSigBits(), timeoutTxn.getTxnID().getMostSigBits());
+
         try {
-            timeoutTxn.commit().get();
+            timeoutTxnSkipClientTimeout.commit().get();
             fail();
         } catch (Exception e) {
             assertTrue(e.getCause() instanceof TransactionNotFoundException);
         }
         Field field = TransactionImpl.class.getDeclaredField("state");
         field.setAccessible(true);
-        TransactionImpl.State state = (TransactionImpl.State) field.get(timeoutTxn);
+        TransactionImpl.State state = (TransactionImpl.State) field.get(timeoutTxnSkipClientTimeout);
         assertEquals(state, TransactionImpl.State.ERROR);
     }
 
     @Test
+    public void testTxnTimeoutAtTransactionMetadataStore() throws Exception{
+        TxnID txnID = pulsarServiceList.get(0).getTransactionMetadataStoreService()
+                .newTransaction(new TransactionCoordinatorID(0), 1).get();
+        Awaitility.await().until(() -> {
+            try {
+               getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txnID).get();
+                return false;
+            } catch (Exception e) {
+                return true;
+            }
+        });
+        Collection<TransactionMetadataStore> transactionMetadataStores =
+                getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores().values();
+        long timeoutCount = transactionMetadataStores.stream()
+                .mapToLong(store -> store.getMetadataStoreStats().timeoutCount).sum();
+        Assert.assertEquals(timeoutCount, 1);
+    }
+
+    @Test
     public void transactionTimeoutTest() throws Exception {
         String topic = NAMESPACE1 + "/txn-timeout";
 
@@ -943,4 +972,38 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         }
         assertTrue(flag);
     }
+
+    @Test
+    public void testTxnTimeOutInClient() throws Exception{
+        String topic = NAMESPACE1 + "/testTxnTimeOutInClient";
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).producerName("testTxnTimeOut_producer")
+                .topic(topic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).consumerName("testTxnTimeOut_consumer")
+                .topic(topic).subscriptionName("testTxnTimeOut_sub").subscribe();
+
+        Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
+                .build().get();
+        producer.newMessage().send();
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIMEOUT);
+        });
+
+        try {
+            producer.newMessage(transaction).send();
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertTrue(e.getCause().getCause() instanceof TransactionCoordinatorClientException
+                    .InvalidTxnStatusException);
+        }
+        try {
+            Message<String> message = consumer.receive();
+            consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertTrue(e.getCause() instanceof TransactionCoordinatorClientException
+                    .InvalidTxnStatusException);
+        }
+    }
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java
index 0e1f6c7..d7df4e3 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java
@@ -68,6 +68,11 @@ public class TransactionCoordinatorClientException extends IOException {
         public InvalidTxnStatusException(String message) {
             super(message);
         }
+
+        public InvalidTxnStatusException(String txnId, String actualState, String expectState) {
+            super("["+ txnId +"] with unexpected state : "
+                    + actualState + ", expect " + expectState + " state!");
+        }
     }
 
     /**
@@ -93,6 +98,21 @@ public class TransactionCoordinatorClientException extends IOException {
         }
     }
 
+
+    /**
+     * Thrown when transaction meta was timeout.
+     */
+    public static class TransactionTimeotException extends TransactionCoordinatorClientException {
+
+        public TransactionTimeotException(Throwable t) {
+            super(t);
+        }
+
+        public TransactionTimeotException(String transactionId) {
+            super("The transaction " +  transactionId + " is timeout.");
+        }
+    }
+
     /**
      * Thrown when send request to transaction meta store but the transaction meta store handler not ready.
      */
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 1251593..0f47208 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -487,6 +487,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         if (null != txn) {
             checkArgument(txn instanceof TransactionImpl);
             txnImpl = (TransactionImpl) txn;
+            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+           if (!txnImpl.checkIfOpen(completableFuture)) {
+               return completableFuture;
+           }
         }
         return doAcknowledgeWithTxn(messageId, AckType.Individual, Collections.emptyMap(), txnImpl);
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 3311050..216d775 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TopicMetadata;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -191,6 +192,10 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
 
     @Override
     CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {
+        CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
+        if (txn != null && !((TransactionImpl)txn).checkIfOpen(completableFuture)) {
+            return completableFuture;
+        }
         int partition = routerPolicy.choosePartition(message, topicMetadata);
         checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(),
                 "Illegal partition index chosen by the message routing policy: " + partition);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index a3ca386..5944c8f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -371,6 +371,10 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         if (txn == null) {
             return internalSendAsync(message);
         } else {
+            CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
+            if (!((TransactionImpl)txn).checkIfOpen(completableFuture)) {
+               return completableFuture;
+            }
             return ((TransactionImpl) txn).registerProducedTopic(topic)
                         .thenCompose(ignored -> internalSendAsync(message));
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 7703afc..c5195c9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -95,6 +95,7 @@ public class PulsarClientImpl implements PulsarClient {
     protected final ClientConfigurationData conf;
     private LookupService lookup;
     private final ConnectionPool cnxPool;
+    @Getter
     private final Timer timer;
     private boolean needStopTimer;
     private final ExecutorProvider externalExecutorProvider;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
index 84be46f..3ac8676 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl.transaction;
 
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
@@ -67,8 +69,11 @@ public class TransactionBuilderImpl implements TransactionBuilder {
                         future.completeExceptionally(throwable);
                         return;
                     }
-                    future.complete(new TransactionImpl(client, txnTimeout,
-                            txnID.getLeastSigBits(), txnID.getMostSigBits()));
+                    TransactionImpl transaction = new TransactionImpl(client, txnTimeout,
+                            txnID.getLeastSigBits(), txnID.getMostSigBits());
+                    client.getTimer().newTimeout(transaction,
+                            txnTimeout, timeUnit);
+                    future.complete(transaction);
                 });
         return future;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 458976a..4128a6f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl.transaction;
 
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -25,6 +27,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import com.google.common.collect.Lists;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
@@ -48,7 +51,7 @@ import org.apache.pulsar.common.util.FutureUtil;
  */
 @Slf4j
 @Getter
-public class TransactionImpl implements Transaction {
+public class TransactionImpl implements Transaction , TimerTask {
 
     private final PulsarClientImpl client;
     private final long transactionTimeoutMs;
@@ -63,6 +66,13 @@ public class TransactionImpl implements Transaction {
     private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
     private final ArrayList<CompletableFuture<Void>> ackFutureList;
     private volatile State state;
+    private final AtomicReferenceFieldUpdater<TransactionImpl, State> STATE_UPDATE =
+        AtomicReferenceFieldUpdater.newUpdater(TransactionImpl.class, State.class, "state");
+
+    @Override
+    public void run(Timeout timeout) throws Exception {
+        STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIMEOUT);
+    }
 
     public enum State {
         OPEN,
@@ -70,7 +80,8 @@ public class TransactionImpl implements Transaction {
         ABORTING,
         COMMITTED,
         ABORTED,
-        ERROR
+        ERROR,
+        TIMEOUT
     }
 
     TransactionImpl(PulsarClientImpl client,
@@ -93,7 +104,8 @@ public class TransactionImpl implements Transaction {
 
     // register the topics that will be modified by this transaction
     public CompletableFuture<Void> registerProducedTopic(String topic) {
-        return checkIfOpen().thenCompose(value -> {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        if (checkIfOpen(completableFuture)) {
             synchronized (TransactionImpl.this) {
                 // we need to issue the request to TC to register the produced topic
                 return registerPartitionMap.compute(topic, (key, future) -> {
@@ -106,7 +118,9 @@ public class TransactionImpl implements Transaction {
                     }
                 });
             }
-        });
+        } else {
+            return completableFuture;
+        }
     }
 
     public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
@@ -115,7 +129,8 @@ public class TransactionImpl implements Transaction {
 
     // register the topics that will be modified by this transaction
     public CompletableFuture<Void> registerAckedTopic(String topic, String subscription) {
-        return checkIfOpen().thenCompose(value -> {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        if (checkIfOpen(completableFuture)) {
             synchronized (TransactionImpl.this) {
                 // we need to issue the request to TC to register the acked topic
                 return registerSubscriptionMap.compute(Pair.of(topic, subscription), (key, future) -> {
@@ -128,7 +143,9 @@ public class TransactionImpl implements Transaction {
                     }
                 });
             }
-        });
+        } else {
+            return completableFuture;
+        }
     }
 
     public synchronized void registerAckOp(CompletableFuture<Void> ackFuture) {
@@ -213,11 +230,14 @@ public class TransactionImpl implements Transaction {
         return new TxnID(txnIdMostBits, txnIdLeastBits);
     }
 
-    private CompletableFuture<Void> checkIfOpen() {
+    public <T> boolean checkIfOpen(CompletableFuture<T> completableFuture) {
         if (state == State.OPEN) {
-            return CompletableFuture.completedFuture(null);
+            return true;
         } else {
-            return invalidTxnStatusFuture();
+            completableFuture
+                    .completeExceptionally(new InvalidTxnStatusException(
+                            new TxnID(txnIdMostBits, txnIdLeastBits).toString(), state.name(), State.OPEN.name()));
+            return false;
         }
     }
 

[pulsar] 07/15: [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect (#13135)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bff7916441e9ed89d20f25664088290cf856cf5a
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Tue Dec 14 21:15:57 2021 +0800

    [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect (#13135)
    
    ### Motivation and Modification
    We should not throw the following exceptions to the user to deal with.
    1. `TransactionCoordinatorNotFound` or `ManagerLedgerFenceException`
               --- we should  retry the operation and reconnect to TC
    2. `TransactionMetaStoreHandler` was connecting
              ---- add the operation into `pendingRequests`, and executed the requests in `pendingRequests` when the connected completely.
    3.  The complexity of concurrent operations is too high. For operations in a TransactionMetaStoreHandler, consider using single-threaded operations
            --- use `internalPinnedExecutor`
    
    (cherry picked from commit 56323e4a5b70c3008706515acd871ba0571ec1eb)
---
 .../broker/TransactionMetadataStoreService.java    |   8 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  68 ++-
 .../impl}/TransactionClientConnectTest.java        | 218 +++-----
 .../client/impl/TransactionMetaStoreHandler.java   | 577 ++++++++++++++-------
 4 files changed, 504 insertions(+), 367 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 240c6c9..3f3e8d8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -356,7 +356,7 @@ public class TransactionMetadataStoreService {
                                         endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
 
                             }
-                            completableFuture.completeExceptionally(e);
+                            completableFuture.completeExceptionally(e.getCause());
                             return null;
                         })).exceptionally(e -> {
                     if (!isRetryableException(e.getCause())) {
@@ -371,7 +371,7 @@ public class TransactionMetadataStoreService {
                                 endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
 
                     }
-                    completableFuture.completeExceptionally(e);
+                    completableFuture.completeExceptionally(e.getCause());
                     return null;
                 });
             } else {
@@ -391,7 +391,7 @@ public class TransactionMetadataStoreService {
                             LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, "
                                     + "TxnAction : {}", txnID, txnAction, e);
                         }
-                        completableFuture.completeExceptionally(e);
+                        completableFuture.completeExceptionally(e.getCause());
                         return null;
                     });
                 } else {
@@ -409,7 +409,7 @@ public class TransactionMetadataStoreService {
                 transactionOpRetryTimer.newTimeout(timeout -> endTransaction(txnID, txnAction, isTimeout),
                         endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
             }
-            completableFuture.completeExceptionally(e);
+            completableFuture.completeExceptionally(e.getCause());
             return null;
         });
         return completableFuture;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 7762c22..30ce9c1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2003,7 +2003,26 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             return true;
         }
     }
+    private Throwable handleTxnException(Throwable ex, String op, long requestId) {
+        if (ex instanceof CoordinatorException.CoordinatorNotFoundException || ex != null
+                && ex.getCause() instanceof CoordinatorException.CoordinatorNotFoundException) {
+            if (log.isDebugEnabled()) {
+                log.debug("The Coordinator was not found for the request {}", op);
+            }
+            return ex;
+        }
+        if (ex instanceof ManagedLedgerException.ManagedLedgerFencedException || ex != null
+                && ex.getCause() instanceof ManagedLedgerException.ManagedLedgerFencedException) {
+            if (log.isDebugEnabled()) {
+                log.debug("Throw a CoordinatorNotFoundException to client "
+                        + "with the message got from a ManagedLedgerFencedException for the request {}", op);
+            }
+            return new CoordinatorException.CoordinatorNotFoundException(ex.getMessage());
 
+        }
+        log.error("Send response error for {} request {}.", op, requestId, ex);
+        return ex;
+    }
     @Override
     protected void handleNewTxn(CommandNewTxn command) {
         final long requestId = command.getRequestId();
@@ -2028,9 +2047,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                     ctx.writeAndFlush(Commands.newTxnResponse(requestId, txnID.getLeastSigBits(),
                             txnID.getMostSigBits()));
                 } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Send response error for new txn request {}", requestId, ex);
-                    }
+                    ex = handleTxnException(ex, BaseCommand.Type.NEW_TXN.name(), requestId);
 
                     ctx.writeAndFlush(Commands.newTxnResponse(requestId, tcId.getId(),
                             BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
@@ -2066,19 +2083,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                         ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
                                 txnID.getLeastSigBits(), txnID.getMostSigBits()));
                     } else {
-                        if (log.isDebugEnabled()) {
-                            log.debug("Send response error for add published partition to txn request {}", requestId,
-                                    ex);
-                        }
+                        ex = handleTxnException(ex, BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId);
 
-                        if (ex instanceof CoordinatorException.CoordinatorNotFoundException) {
-                            ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(),
-                                    BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
-                        } else {
-                            ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(),
-                                    BrokerServiceException.getClientErrorCode(ex.getCause()),
-                                    ex.getCause().getMessage()));
-                        }
+                        ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(),
+                                BrokerServiceException.getClientErrorCode(ex),
+                                ex.getMessage()));
                         transactionMetadataStoreService.handleOpFail(ex, tcId);
                     }
             }));
@@ -2105,16 +2114,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                         ctx.writeAndFlush(Commands.newEndTxnResponse(requestId,
                                 txnID.getLeastSigBits(), txnID.getMostSigBits()));
                     } else {
-                        log.error("Send response error for end txn request.", ex);
+                        ex = handleTxnException(ex, BaseCommand.Type.END_TXN.name(), requestId);
+                        ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
+                                BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
 
-                        if (ex instanceof CoordinatorException.CoordinatorNotFoundException) {
-                            ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
-                                    BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
-                        } else {
-                            ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
-                                    BrokerServiceException.getClientErrorCode(ex.getCause()),
-                                    ex.getCause().getMessage()));
-                        }
                         transactionMetadataStoreService.handleOpFail(ex, tcId);
                     }
                 });
@@ -2325,20 +2328,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                 txnID.getLeastSigBits(), txnID.getMostSigBits()));
                         log.info("handle add partition to txn finish.");
                     } else {
-                        if (log.isDebugEnabled()) {
-                            log.debug("Send response error for add published partition to txn request {}",
-                                    requestId, ex);
-                        }
+                        ex = handleTxnException(ex, BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId);
 
-                        if (ex instanceof CoordinatorException.CoordinatorNotFoundException) {
-                            ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
-                                    txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex),
-                                    ex.getMessage()));
-                        } else {
-                            ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
-                                    txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex.getCause()),
-                                    ex.getCause().getMessage()));
-                        }
+                        ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
+                                txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex),
+                                ex.getMessage()));
                         transactionMetadataStoreService.handleOpFail(ex, tcId);
                     }
                 }));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
similarity index 50%
rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java
rename to pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
index a51eae8..7fb924f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
@@ -16,19 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction;
-
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.TransactionMetadataStoreService;
+import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
 import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -39,10 +48,11 @@ import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertFalse;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
-import static org.testng.FileAssert.fail;
 
+@Slf4j
 public class TransactionClientConnectTest extends TransactionTestBase {
 
     private static final String RECONNECT_TOPIC = NAMESPACE1 + "/txn-client-reconnect-test";
@@ -60,142 +70,69 @@ public class TransactionClientConnectTest extends TransactionTestBase {
 
     @Test
     public void testTransactionNewReconnect() throws Exception {
-        start();
-
-        // when throw CoordinatorNotFoundException client will reconnect tc
-        try {
-            pulsarClient.newTransaction()
-                    .withTransactionTimeout(200, TimeUnit.MILLISECONDS).build().get();
-            fail();
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException);
-        }
-        reconnect();
-
-        fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
-
-        // tc fence will remove this tc and reopen
-        try {
-            pulsarClient.newTransaction()
-                    .withTransactionTimeout(200, TimeUnit.MILLISECONDS).build().get();
-            fail();
-        } catch (ExecutionException e) {
-            assertEquals(e.getCause().getMessage(),
-                    "org.apache.bookkeeper.mledger.ManagedLedgerException$ManagedLedgerFencedException: " +
-                            "java.lang.Exception: Attempted to use a fenced managed ledger");
-        }
-
-        reconnect();
+        Callable<CompletableFuture<?>> callable = () -> pulsarClient.newTransaction()
+                .withTransactionTimeout(200, TimeUnit.MILLISECONDS).build();
+        tryCommandReconnect(callable, callable);
     }
 
     @Test
     public void testTransactionAddSubscriptionToTxnAsyncReconnect() throws Exception {
         TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient();
-        start();
+        Callable<CompletableFuture<?>> callable = () -> transactionCoordinatorClient
+                .addSubscriptionToTxnAsync(new TxnID(0, 0), "test", "test");
+        tryCommandReconnect(callable, callable);
+    }
 
+    public void tryCommandReconnect(Callable<CompletableFuture<?>> callable1, Callable<CompletableFuture<?>> callable2)
+            throws Exception {
+        start();
         try {
-            transactionCoordinatorClient.addSubscriptionToTxnAsync(new TxnID(0, 0), "test", "test").get();
-            fail();
+            callable1.call().get();
         } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException);
+            assertFalse(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException);
+            waitToReady();
+            callable1.call().get();
         }
-
-        reconnect();
         fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
+        CompletableFuture<?> completableFuture = callable2.call();
         try {
-            transactionCoordinatorClient.addSubscriptionToTxnAsync(new TxnID(0, 0), "test", "test").get();
-            fail();
+            completableFuture.get(3, TimeUnit.SECONDS);
+        } catch (TimeoutException ignore) {
         } catch (ExecutionException e) {
-            if (e.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException) {
-                assertEquals(e.getCause().getMessage(), "The transaction with this txdID `(0,0)`not found ");
-            } else {
-                assertEquals(e.getCause().getMessage(), "java.lang.Exception: Attempted to use a fenced managed ledger");
-            }
+            Assert.assertFalse(e.getCause()
+                    instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException);
         }
-        reconnect();
+
+        unFence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
+        completableFuture.get();
     }
 
     @Test
     public void testTransactionAbortToTxnAsyncReconnect() throws Exception {
         TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient();
-        start();
-
-        try {
-            transactionCoordinatorClient.abortAsync(new TxnID(0, 0)).get();
-            fail();
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException);
-        }
-
-        reconnect();
-        fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
-        try {
-            transactionCoordinatorClient.abortAsync(new TxnID(0, 0)).get();
-            fail();
-        } catch (ExecutionException e) {
-            if (e.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException) {
-                assertEquals(e.getCause().getMessage(), "The transaction with this txdID `(0,0)`not found ");
-            } else {
-                assertEquals(e.getCause().getMessage(), "java.lang.Exception: Attempted to use a fenced managed ledger");
-            }
-        }
-        reconnect();
+        Callable<CompletableFuture<?>> callable1 = () -> transactionCoordinatorClient.abortAsync(new TxnID(0,
+                0));
+        Callable<CompletableFuture<?>> callable2 = () -> transactionCoordinatorClient.abortAsync(new TxnID(0,
+                1));
+        tryCommandReconnect(callable1, callable2);
     }
 
     @Test
     public void testTransactionCommitToTxnAsyncReconnect() throws Exception {
         TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient();
-        start();
-
-        try {
-            transactionCoordinatorClient.commitAsync(new TxnID(0, 0)).get();
-            fail();
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException);
-        }
-
-        reconnect();
-        fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
-        try {
-            transactionCoordinatorClient.commitAsync(new TxnID(0, 0)).get();
-            fail();
-        } catch (ExecutionException e) {
-            if (e.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException) {
-                assertEquals(e.getCause().getMessage(), "The transaction with this txdID `(0,0)`not found ");
-            } else {
-                assertEquals(e.getCause().getMessage(), "java.lang.Exception: Attempted to use a fenced managed ledger");
-            }
-        }
-        reconnect();
+        Callable<CompletableFuture<?>> callable1 = () -> transactionCoordinatorClient.commitAsync(new TxnID(0,
+                0));
+        Callable<CompletableFuture<?>> callable2 = () -> transactionCoordinatorClient.commitAsync(new TxnID(0,
+                1));
+        tryCommandReconnect(callable1, callable2);
     }
 
     @Test
     public void testTransactionAddPublishPartitionToTxnReconnect() throws Exception {
         TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient();
-        start();
-
-        try {
-            transactionCoordinatorClient.addPublishPartitionToTxnAsync(new TxnID(0, 0),
-                    Collections.singletonList("test")).get();
-            fail();
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException);
-        }
-
-        reconnect();
-        fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
-        try {
-            transactionCoordinatorClient.addPublishPartitionToTxnAsync(new TxnID(0, 0),
-                    Collections.singletonList("test")).get();
-            fail();
-        } catch (ExecutionException e) {
-            if (e.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException) {
-                assertEquals(e.getCause().getMessage(), "The transaction with this txdID `(0,0)`not found ");
-            } else {
-                assertEquals(e.getCause().getMessage(), "java.lang.Exception: Attempted to use a fenced managed ledger");
-            }
-        }
-        reconnect();
+        Callable<CompletableFuture<?>> callable = () -> transactionCoordinatorClient.addPublishPartitionToTxnAsync(new TxnID(0, 0),
+                Collections.singletonList("test"));
+        tryCommandReconnect(callable, callable);
     }
 
     @Test
@@ -209,7 +146,11 @@ public class TransactionClientConnectTest extends TransactionTestBase {
         for (TransactionMetaStoreHandler handler : handlers) {
             handler.newTransactionAsync(10, TimeUnit.SECONDS).get();
         }
-        pulsarClient.close();
+        for (TransactionMetaStoreHandler handler : handlers) {
+            Field stateField = HandlerState.class.getDeclaredField("state");
+            stateField.setAccessible(true);
+            stateField.set(handler, HandlerState.State.Closed);
+        }
         for (TransactionMetaStoreHandler handler : handlers) {
             Method method = TransactionMetaStoreHandler.class.getMethod("getConnectHandleState");
             method.setAccessible(true);
@@ -225,21 +166,14 @@ public class TransactionClientConnectTest extends TransactionTestBase {
 
     public void start() throws Exception {
         // wait transaction coordinator init success
-        Awaitility.await().until(() -> {
-            try {
-                pulsarClient.newTransaction()
-                        .withTransactionTimeout(200, TimeUnit.MILLISECONDS).build().get();
-            } catch (Exception e) {
-                return false;
-            }
-            return true;
-        });
         pulsarClient.newTransaction()
-                .withTransactionTimeout(200, TimeUnit.MILLISECONDS).build().get();
+                .withTransactionTimeout(30, TimeUnit.SECONDS).build().get();
+        pulsarClient.newTransaction()
+                .withTransactionTimeout(30, TimeUnit.SECONDS).build().get();
 
         TransactionMetadataStoreService transactionMetadataStoreService =
                 getPulsarServiceList().get(0).getTransactionMetadataStoreService();
-        // remove transaction metadata store
+        // remove transaction metadap0-ta store
         transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0)).get();
 
     }
@@ -250,15 +184,35 @@ public class TransactionClientConnectTest extends TransactionTestBase {
         field.set(((MLTransactionMetadataStore) transactionMetadataStoreService.getStores()
                 .get(TransactionCoordinatorID.get(0))).getManagedLedger(), ManagedLedgerImpl.State.Fenced);
     }
+    public void unFence(TransactionMetadataStoreService transactionMetadataStoreService) throws Exception {
+        Field field = ManagedLedgerImpl.class.getDeclaredField("state");
+        field.setAccessible(true);
+        field.set(((MLTransactionMetadataStore) transactionMetadataStoreService.getStores()
+                .get(TransactionCoordinatorID.get(0))).getManagedLedger(), ManagedLedgerImpl.State.LedgerOpened);
+    }
 
-    public void reconnect() {
-        //reconnect
+    public void waitToReady() throws Exception{
+        TransactionMetadataStoreService transactionMetadataStoreService =
+                getPulsarServiceList().get(0).getTransactionMetadataStoreService();
+        Class<TransactionMetadataStoreService> transactionMetadataStoreServiceClass =
+                TransactionMetadataStoreService.class;
+        Field field1 =
+                transactionMetadataStoreServiceClass.getDeclaredField("stores");
+        field1.setAccessible(true);
+        Map<TransactionCoordinatorID, TransactionMetadataStore> stores =
+                (Map<TransactionCoordinatorID, TransactionMetadataStore>) field1
+                        .get(transactionMetadataStoreService);
         Awaitility.await().until(() -> {
-            try {
-                pulsarClient.newTransaction()
-                        .withTransactionTimeout(200, TimeUnit.MILLISECONDS).build().get();
-            } catch (Exception e) {
-                return false;
+            for (TransactionMetadataStore transactionMetadataStore : stores.values()) {
+                Class<TransactionMetadataStoreState> transactionMetadataStoreStateClass =
+                        TransactionMetadataStoreState.class;
+                Field field = transactionMetadataStoreStateClass.getDeclaredField("state");
+                field.setAccessible(true);
+                TransactionMetadataStoreState.State state =
+                        (TransactionMetadataStoreState.State) field.get(transactionMetadataStore);
+                if (!state.equals(TransactionMetadataStoreState.State.Ready)) {
+                    return false;
+                }
             }
             return true;
         });
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index ba6ee50..b2b756a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -23,7 +23,9 @@ import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
 import io.netty.util.ReferenceCountUtil;
 import io.netty.util.Timeout;
+import io.netty.util.Timer;
 import io.netty.util.TimerTask;
+import java.util.concurrent.ExecutorService;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
@@ -61,6 +63,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
         new ConcurrentLongHashMap<>(16, 1);
     private final ConcurrentLinkedQueue<RequestTime> timeoutQueue;
 
+    protected final Timer timer;
+    private final ExecutorService internalPinnedExecutor;
+
     private static class RequestTime {
         final long creationTimeMs;
         final long requestId;
@@ -96,6 +101,8 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
             this);
         this.connectFuture = connectFuture;
         this.connectionHandler.grabCnx();
+        this.timer = pulsarClient.timer();
+        internalPinnedExecutor = pulsarClient.getInternalExecutorService();
     }
 
     @Override
@@ -109,64 +116,73 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
 
     @Override
     public void connectionOpened(ClientCnx cnx) {
-        LOG.info("Transaction meta handler with transaction coordinator id {} connection opened.",
-            transactionCoordinatorId);
-
-        if (getState() == State.Closing || getState() == State.Closed) {
-            setState(State.Closed);
-            failPendingRequest();
-            this.pendingRequests.clear();
-            return;
-        }
-
-        connectionHandler.setClientCnx(cnx);
-        cnx.registerTransactionMetaStoreHandler(transactionCoordinatorId, this);
-
-        // if broker protocol version < 19, don't send TcClientConnectRequest to broker.
-        if (cnx.getRemoteEndpointProtocolVersion() > ProtocolVersion.v18.getValue()) {
-            long requestId = client.newRequestId();
-            ByteBuf request = Commands.newTcClientConnectRequest(transactionCoordinatorId, requestId);
+        internalPinnedExecutor.execute(() -> {
+            LOG.info("Transaction meta handler with transaction coordinator id {} connection opened.",
+                    transactionCoordinatorId);
+
+            if (getState() == State.Closing || getState() == State.Closed) {
+                setState(State.Closed);
+                failPendingRequest();
+                this.pendingRequests.clear();
+                return;
+            }
 
-            cnx.sendRequestWithId(request, requestId).thenRun(() -> {
-                LOG.info("Transaction coordinator client connect success! tcId : {}", transactionCoordinatorId);
+            connectionHandler.setClientCnx(cnx);
+            cnx.registerTransactionMetaStoreHandler(transactionCoordinatorId, this);
+
+            // if broker protocol version < 19, don't send TcClientConnectRequest to broker.
+            if (cnx.getRemoteEndpointProtocolVersion() > ProtocolVersion.v18.getValue()) {
+                long requestId = client.newRequestId();
+                ByteBuf request = Commands.newTcClientConnectRequest(transactionCoordinatorId, requestId);
+
+                cnx.sendRequestWithId(request, requestId).thenRun(() -> {
+                    internalPinnedExecutor.execute(() -> {
+                        LOG.info("Transaction coordinator client connect success! tcId : {}", transactionCoordinatorId);
+                        if (!changeToReadyState()) {
+                            setState(State.Closed);
+                            cnx.channel().close();
+                        }
+
+                        if (!this.connectFuture.isDone()) {
+                            this.connectFuture.complete(null);
+                        }
+                        this.connectionHandler.resetBackoff();
+                        pendingRequests.forEach((requestID, opBase) -> checkStateAndSendRequest(opBase));
+                    });
+                }).exceptionally((e) -> {
+                    internalPinnedExecutor.execute(() -> {
+                        LOG.error("Transaction coordinator client connect fail! tcId : {}",
+                                transactionCoordinatorId, e.getCause());
+                        if (getState() == State.Closing || getState() == State.Closed
+                                || e.getCause() instanceof PulsarClientException.NotAllowedException) {
+                            setState(State.Closed);
+                            cnx.channel().close();
+                        } else {
+                            connectionHandler.reconnectLater(e.getCause());
+                        }
+                    });
+                    return null;
+                });
+            } else {
                 if (!changeToReadyState()) {
-                    setState(State.Closed);
-                    cnx.channel().close();
-                }
-
-                if (!this.connectFuture.isDone()) {
-                    this.connectFuture.complete(null);
-                }
-                this.connectionHandler.resetBackoff();
-            }).exceptionally((e) -> {
-                LOG.error("Transaction coordinator client connect fail! tcId : {}",
-                        transactionCoordinatorId, e.getCause());
-                if (getState() == State.Closing || getState() == State.Closed
-                        || e.getCause() instanceof PulsarClientException.NotAllowedException) {
-                    setState(State.Closed);
                     cnx.channel().close();
-                } else {
-                    connectionHandler.reconnectLater(e.getCause());
                 }
-                return null;
-            });
-        } else {
-            if (!changeToReadyState()) {
-                cnx.channel().close();
+                this.connectFuture.complete(null);
             }
-            this.connectFuture.complete(null);
-        }
+        });
     }
 
     private void failPendingRequest() {
-        pendingRequests.keys().forEach(k -> {
-            OpBase<?> op = pendingRequests.remove(k);
-            if (op != null && !op.callback.isDone()) {
-                op.callback.completeExceptionally(new PulsarClientException.AlreadyClosedException(
-                        "Could not get response from transaction meta store when " +
-                                "the transaction meta store has already close."));
-                onResponse(op);
-            }
+        internalPinnedExecutor.execute(() -> {
+            pendingRequests.keys().forEach(k -> {
+                OpBase<?> op = pendingRequests.remove(k);
+                if (op != null && !op.callback.isDone()) {
+                    op.callback.completeExceptionally(new PulsarClientException.AlreadyClosedException(
+                            "Could not get response from transaction meta store when " +
+                                    "the transaction meta store has already close."));
+                    onResponse(op);
+                }
+            });
         });
     }
 
@@ -175,42 +191,76 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
             LOG.debug("New transaction with timeout in ms {}", unit.toMillis(timeout));
         }
         CompletableFuture<TxnID> callback = new CompletableFuture<>();
-
         if (!canSendRequest(callback)) {
             return callback;
         }
         long requestId = client.newRequestId();
         ByteBuf cmd = Commands.newTxn(transactionCoordinatorId, requestId, unit.toMillis(timeout));
-        OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback);
-        pendingRequests.put(requestId, op);
-        timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
-        cmd.retain();
-        cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise());
+        OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback, client);
+        internalPinnedExecutor.execute(() -> {
+            pendingRequests.put(requestId, op);
+            timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
+            checkStateAndSendRequest(op);
+        });
         return callback;
     }
 
     void handleNewTxnResponse(CommandNewTxnResponse response) {
-        OpForTxnIdCallBack op = (OpForTxnIdCallBack) pendingRequests.remove(response.getRequestId());
-        if (op == null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Got new txn response for timeout {} - {}", response.getTxnidMostBits(),
-                    response.getTxnidLeastBits());
+        boolean hasError = response.hasError();
+        ServerError error;
+        String message;
+        if (hasError) {
+             error = response.getError();
+             message = response.getMessage();
+        } else {
+            error = null;
+            message = null;
+        }
+        TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
+        long requestId = response.getRequestId();
+        internalPinnedExecutor.execute(() -> {
+            OpForTxnIdCallBack op = (OpForTxnIdCallBack) pendingRequests.remove(requestId);
+            if (op == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Got new txn response for timeout {} - {}", txnID.getMostSigBits(),
+                            txnID.getLeastSigBits());
+                }
+                return;
             }
-            return;
-        }
 
-        if (!response.hasError()) {
-            TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Got new txn response {} for request {}", txnID, response.getRequestId());
+            if (!hasError) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Got new txn response {} for request {}", txnID, requestId);
+                }
+                op.callback.complete(txnID);
+            } else {
+                if (checkIfNeedRetryByError(error, message, op)) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Get a response for the {}  request {} error "
+                                        + "TransactionCoordinatorNotFound and try it again",
+                                BaseCommand.Type.NEW_TXN.name(), requestId);
+                    }
+                    pendingRequests.put(requestId, op);
+                    timer.newTimeout(timeout -> {
+                                internalPinnedExecutor.execute(() -> {
+                                    if (!pendingRequests.containsKey(requestId)) {
+                                        if (LOG.isDebugEnabled()) {
+                                            LOG.debug("The request {} already timeout", requestId);
+                                        }
+                                        return;
+                                    }
+                                    checkStateAndSendRequest(op);
+                                });
+                            }
+                            , op.backoff.next(), TimeUnit.MILLISECONDS);
+                    return;
+                }
+                LOG.error("Got {} for request {} error {}", BaseCommand.Type.NEW_TXN.name(),
+                        requestId, error);
             }
-            op.callback.complete(txnID);
-        } else {
-            LOG.error("Got new txn for request {} error {}", response.getRequestId(), response.getError());
-            handleTransactionFailOp(response.getError(), response.getMessage(), op);
-        }
 
-        onResponse(op);
+            onResponse(op);
+        });
     }
 
     public CompletableFuture<Void> addPublishPartitionToTxnAsync(TxnID txnID, List<String> partitions) {
@@ -218,42 +268,80 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
             LOG.debug("Add publish partition {} to txn {}", partitions, txnID);
         }
         CompletableFuture<Void> callback = new CompletableFuture<>();
-
         if (!canSendRequest(callback)) {
             return callback;
         }
         long requestId = client.newRequestId();
         ByteBuf cmd = Commands.newAddPartitionToTxn(
                 requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), partitions);
-        OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback);
-        pendingRequests.put(requestId, op);
-        timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
-        cmd.retain();
-        cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise());
+        OpForVoidCallBack op = OpForVoidCallBack
+                .create(cmd, callback, client);
+        internalPinnedExecutor.execute(() -> {
+            pendingRequests.put(requestId, op);
+            timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
+            checkStateAndSendRequest(op);
+        });
+
         return callback;
     }
 
     void handleAddPublishPartitionToTxnResponse(CommandAddPartitionToTxnResponse response) {
-        OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(response.getRequestId());
-        if (op == null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Got add publish partition to txn response for timeout {} - {}", response.getTxnidMostBits(),
-                        response.getTxnidLeastBits());
+        boolean hasError = response.hasError();
+        ServerError error;
+        String message;
+        if (hasError) {
+            error = response.getError();
+            message = response.getMessage();
+        } else {
+            error = null;
+            message = null;
+        }
+        TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
+        long requestId = response.getRequestId();
+        internalPinnedExecutor.execute(() -> {
+            OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(requestId);
+            if (op == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Got add publish partition to txn response for timeout {} - {}", txnID.getMostSigBits(),
+                            txnID.getLeastSigBits());
+                }
+                return;
             }
-            return;
-        }
 
-        if (!response.hasError()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Add publish partition for request {} success.", response.getRequestId());
+            if (!hasError) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Add publish partition for request {} success.", requestId);
+                }
+                op.callback.complete(null);
+            } else {
+                if (checkIfNeedRetryByError(error, message, op)) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Get a response for the {} request {} "
+                                        + " error TransactionCoordinatorNotFound and try it again",
+                                BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId);
+                    }
+                    pendingRequests.put(requestId, op);
+                    timer.newTimeout(timeout -> {
+                                internalPinnedExecutor.execute(() -> {
+                                    if (!pendingRequests.containsKey(requestId)) {
+                                        if (LOG.isDebugEnabled()) {
+                                            LOG.debug("The request {} already timeout", requestId);
+                                        }
+                                        return;
+                                    }
+                                    checkStateAndSendRequest(op);
+                                });
+                            }
+                            , op.backoff.next(), TimeUnit.MILLISECONDS);
+                    return;
+                }
+                LOG.error("{} for request {} error {} with txnID {}.", BaseCommand.Type.ADD_PARTITION_TO_TXN.name(),
+                        requestId, error, txnID);
+
             }
-            op.callback.complete(null);
-        } else {
-            LOG.error("Add publish partition for request {} error {}.", response.getRequestId(), response.getError());
-            handleTransactionFailOp(response.getError(), response.getMessage(), op);
-        }
 
-        onResponse(op);
+            onResponse(op);
+        });
     }
 
     public CompletableFuture<Void> addSubscriptionToTxn(TxnID txnID, List<Subscription> subscriptionList) {
@@ -262,41 +350,76 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
         }
 
         CompletableFuture<Void> callback = new CompletableFuture<>();
-
         if (!canSendRequest(callback)) {
             return callback;
         }
         long requestId = client.newRequestId();
         ByteBuf cmd = Commands.newAddSubscriptionToTxn(
                 requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), subscriptionList);
-        OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback);
-        pendingRequests.put(requestId, op);
-        timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
-        cmd.retain();
-        cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise());
+        OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback, client);
+        internalPinnedExecutor.execute(() -> {
+            pendingRequests.put(requestId, op);
+            timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
+            checkStateAndSendRequest(op);
+        });
         return callback;
     }
 
     public void handleAddSubscriptionToTxnResponse(CommandAddSubscriptionToTxnResponse response) {
-        OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(response.getRequestId());
-        if (op == null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Add subscription to txn timeout for request {}.", response.getRequestId());
+        boolean hasError = response.hasError();
+        ServerError error;
+        String message;
+        if (hasError) {
+            error = response.getError();
+            message = response.getMessage();
+        } else {
+            error = null;
+            message = null;
+        }
+        long requestId = response.getRequestId();
+        internalPinnedExecutor.execute(() -> {
+            OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(requestId);
+            if (op == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Add subscription to txn timeout for request {}.", requestId);
+                }
+                return;
             }
-            return;
-        }
 
-        if (!response.hasError()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Add subscription to txn success for request {}.", response.getRequestId());
+            if (!hasError) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Add subscription to txn success for request {}.", requestId);
+                }
+                op.callback.complete(null);
+            } else {
+                LOG.error("Add subscription to txn failed for request {} error {}.",
+                        requestId, error);
+                if (checkIfNeedRetryByError(error, message, op)) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Get a response for {} request {} error TransactionCoordinatorNotFound and try it again",
+                                BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId);
+                    }
+                    pendingRequests.put(requestId, op);
+                    timer.newTimeout(timeout -> {
+                                internalPinnedExecutor.execute(() -> {
+                                    if (!pendingRequests.containsKey(requestId)) {
+                                        if (LOG.isDebugEnabled()) {
+                                            LOG.debug("The request {} already timeout", requestId);
+                                        }
+                                        return;
+                                    }
+                                    checkStateAndSendRequest(op);
+                                });
+                            }
+                            , op.backoff.next(), TimeUnit.MILLISECONDS);
+                    return;
+                }
+                LOG.error("{} failed for request {} error {}.", BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(),
+                       requestId, error);
+
             }
-            op.callback.complete(null);
-        } else {
-            LOG.error("Add subscription to txn failed for request {} error {}.",
-                    response.getRequestId(), response.getError());
-            handleTransactionFailOp(response.getError(), response.getMessage(), op);
-        }
-        onResponse(op);
+            onResponse(op);
+        });
     }
 
     public CompletableFuture<Void> endTxnAsync(TxnID txnID, TxnAction action) {
@@ -304,68 +427,115 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
             LOG.debug("End txn {}, action {}", txnID, action);
         }
         CompletableFuture<Void> callback = new CompletableFuture<>();
-
         if (!canSendRequest(callback)) {
             return callback;
         }
         long requestId = client.newRequestId();
         BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), action);
         ByteBuf buf = Commands.serializeWithSize(cmd);
-        OpForVoidCallBack op = OpForVoidCallBack.create(buf, callback);
-        pendingRequests.put(requestId, op);
-        timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
-        buf.retain();
-        cnx().ctx().writeAndFlush(buf, cnx().ctx().voidPromise());
+        OpForVoidCallBack op = OpForVoidCallBack.create(buf, callback, client);
+        internalPinnedExecutor.execute(() -> {
+            pendingRequests.put(requestId, op);
+            timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
+            checkStateAndSendRequest(op);
+        });
         return callback;
     }
 
     void handleEndTxnResponse(CommandEndTxnResponse response) {
-        OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(response.getRequestId());
-        if (op == null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Got end txn response for timeout {} - {}", response.getTxnidMostBits(),
-                        response.getTxnidLeastBits());
+        boolean hasError = response.hasError();
+        ServerError error;
+        String message;
+        if (hasError) {
+            error = response.getError();
+            message = response.getMessage();
+        } else {
+            error = null;
+            message = null;
+        }
+        TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
+        long requestId = response.getRequestId();
+        internalPinnedExecutor.execute(() -> {
+            OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(requestId);
+            if (op == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Got end txn response for timeout {} - {}", txnID.getMostSigBits(),
+                            txnID.getLeastSigBits());
+                }
+                return;
             }
-            return;
-        }
 
-        if (!response.hasError()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Got end txn response success for request {}", response.getRequestId());
-            }
-            op.callback.complete(null);
-        } else {
-            LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError());
-            handleTransactionFailOp(response.getError(), response.getMessage(), op);
-        }
+            if (!hasError) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Got end txn response success for request {}", requestId);
+                }
+                op.callback.complete(null);
+            } else {
+                if (checkIfNeedRetryByError(error, message, op)) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Get a response for the {} request {} error "
+                                        + "TransactionCoordinatorNotFound and try it again",
+                                BaseCommand.Type.END_TXN.name(), requestId);
+                    }
+                    pendingRequests.put(requestId, op);
+                    timer.newTimeout(timeout -> {
+                                internalPinnedExecutor.execute(() -> {
+                                    if (!pendingRequests.containsKey(requestId)) {
+                                        if (LOG.isDebugEnabled()) {
+                                            LOG.debug("The request {} already timeout", requestId);
+                                        }
+                                        return;
+                                    }
+                                    checkStateAndSendRequest(op);
+                                });
+                            }
+                            , op.backoff.next(), TimeUnit.MILLISECONDS);
+                    return;
+                }
+                LOG.error("Got {} response for request {} error {}", BaseCommand.Type.END_TXN.name(),
+                        requestId, error);
 
-        onResponse(op);
+            }
+            onResponse(op);
+        });
     }
 
-    private void handleTransactionFailOp(ServerError error, String message, OpBase<?> op) {
-        if (error == ServerError.TransactionCoordinatorNotFound && getState() != State.Connecting) {
-            connectionHandler.reconnectLater(new TransactionCoordinatorClientException
-                    .CoordinatorNotFoundException(message));
+
+    private boolean checkIfNeedRetryByError(ServerError error, String message, OpBase<?> op) {
+        if (error == ServerError.TransactionCoordinatorNotFound) {
+            if (getState() != State.Connecting) {
+                connectionHandler.reconnectLater(new TransactionCoordinatorClientException
+                        .CoordinatorNotFoundException(message));
+            }
+            return true;
         }
 
         if (op != null) {
             op.callback.completeExceptionally(getExceptionByServerError(error, message));
         }
+        return false;
     }
 
     private static abstract class OpBase<T> {
         protected ByteBuf cmd;
         protected CompletableFuture<T> callback;
+        protected Backoff backoff;
 
         abstract void recycle();
     }
 
     private static class OpForTxnIdCallBack extends OpBase<TxnID> {
 
-        static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture<TxnID> callback) {
+        static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture<TxnID> callback, PulsarClientImpl client) {
             OpForTxnIdCallBack op = RECYCLER.get();
             op.callback = callback;
             op.cmd = cmd;
+            op.backoff = new BackoffBuilder()
+                    .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
+                            TimeUnit.NANOSECONDS)
+                    .setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10, TimeUnit.NANOSECONDS)
+                    .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+                    .create();
             return op;
         }
 
@@ -375,6 +545,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
 
         @Override
         void recycle() {
+            this.backoff = null;
+            this.cmd = null;
+            this.callback = null;
             recyclerHandle.recycle(this);
         }
 
@@ -389,18 +562,29 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
 
     private static class OpForVoidCallBack extends OpBase<Void> {
 
-        static OpForVoidCallBack create(ByteBuf cmd, CompletableFuture<Void> callback) {
+
+        static OpForVoidCallBack create(ByteBuf cmd, CompletableFuture<Void> callback, PulsarClientImpl client) {
             OpForVoidCallBack op = RECYCLER.get();
             op.callback = callback;
             op.cmd = cmd;
+            op.backoff = new BackoffBuilder()
+                    .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
+                            TimeUnit.NANOSECONDS)
+                    .setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10, TimeUnit.NANOSECONDS)
+                    .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+                    .create();
             return op;
         }
+
         private OpForVoidCallBack(Recycler.Handle<OpForVoidCallBack> recyclerHandle) {
             this.recyclerHandle = recyclerHandle;
         }
 
         @Override
         void recycle() {
+            this.backoff = null;
+            this.cmd = null;
+            this.callback = null;
             recyclerHandle.recycle(this);
         }
 
@@ -433,9 +617,6 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
     }
 
     private boolean canSendRequest(CompletableFuture<?> callback) {
-        if (!isValidHandlerState(callback)) {
-            return false;
-        }
         try {
             if (blockIfReachMaxPendingOps) {
                 semaphore.acquire();
@@ -453,81 +634,89 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
         return true;
     }
 
-    private boolean isValidHandlerState(CompletableFuture<?> callback) {
+    private void checkStateAndSendRequest(OpBase<?> op) {
         switch (getState()) {
             case Ready:
-                return true;
+                ClientCnx cnx = cnx();
+                if (cnx != null) {
+                    op.cmd.retain();
+                    cnx.ctx().writeAndFlush(op.cmd, cnx().ctx().voidPromise());
+                } else {
+                    LOG.error("The cnx was null when the TC handler was ready", new NullPointerException());
+                }
+                break;
             case Connecting:
-                callback.completeExceptionally(
-                        new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException(
-                                "Transaction meta store handler for tcId "
-                                + transactionCoordinatorId
-                                + " is connecting now, please try later."));
-                return false;
+                break;
             case Closing:
             case Closed:
-                callback.completeExceptionally(
+                op.callback.completeExceptionally(
                         new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException(
                                 "Transaction meta store handler for tcId "
                                         + transactionCoordinatorId
                                         + " is closing or closed."));
-                return false;
+                onResponse(op);
+                break;
             case Failed:
             case Uninitialized:
-                callback.completeExceptionally(
+                op.callback.completeExceptionally(
                         new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException(
                                 "Transaction meta store handler for tcId "
                                         + transactionCoordinatorId
                                         + " not connected."));
-                return false;
+                onResponse(op);
+                break;
             default:
-                callback.completeExceptionally(
+                op.callback.completeExceptionally(
                         new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException(
                                 transactionCoordinatorId));
-                return false;
+                onResponse(op);
+                break;
         }
     }
 
     @Override
     public void run(Timeout timeout) throws Exception {
-        if (timeout.isCancelled()) {
-            return;
-        }
-        long timeToWaitMs;
-        if (getState() == State.Closing || getState() == State.Closed) {
-            return;
-        }
-        RequestTime peeked = timeoutQueue.peek();
-        while (peeked != null && peeked.creationTimeMs + client.getConfiguration().getOperationTimeoutMs()
-                - System.currentTimeMillis() <= 0) {
-            RequestTime lastPolled = timeoutQueue.poll();
-            if (lastPolled != null) {
-                OpBase<?> op = pendingRequests.remove(lastPolled.requestId);
-                if (op != null && !op.callback.isDone()) {
-                    op.callback.completeExceptionally(new PulsarClientException.TimeoutException(
-                            "Could not get response from transaction meta store within given timeout."));
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Transaction coordinator request {} is timeout.", lastPolled.requestId);
+        internalPinnedExecutor.execute(() -> {
+            if (timeout.isCancelled()) {
+                return;
+            }
+            long timeToWaitMs;
+            if (getState() == State.Closing || getState() == State.Closed) {
+                return;
+            }
+            RequestTime peeked = timeoutQueue.peek();
+            while (peeked != null && peeked.creationTimeMs + client.getConfiguration().getOperationTimeoutMs()
+                    - System.currentTimeMillis() <= 0) {
+                RequestTime lastPolled = timeoutQueue.poll();
+                if (lastPolled != null) {
+                    OpBase<?> op = pendingRequests.remove(lastPolled.requestId);
+                    if (op != null && !op.callback.isDone()) {
+                        op.callback.completeExceptionally(new PulsarClientException.TimeoutException(
+                                "Could not get response from transaction meta store within given timeout."));
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Transaction coordinator request {} is timeout.", lastPolled.requestId);
+                        }
+                        onResponse(op);
                     }
-                    onResponse(op);
+                } else {
+                    break;
                 }
-            } else {
-                break;
+                peeked = timeoutQueue.peek();
             }
-            peeked = timeoutQueue.peek();
-        }
 
-        if (peeked == null) {
-            timeToWaitMs = client.getConfiguration().getOperationTimeoutMs();
-        } else {
-            long diff = (peeked.creationTimeMs + client.getConfiguration().getOperationTimeoutMs()) - System.currentTimeMillis();
-            if (diff <= 0) {
+            if (peeked == null) {
                 timeToWaitMs = client.getConfiguration().getOperationTimeoutMs();
             } else {
-                timeToWaitMs = diff;
+                long diff = (peeked.creationTimeMs + client.getConfiguration().getOperationTimeoutMs())
+                        - System.currentTimeMillis();
+                if (diff <= 0) {
+                    timeToWaitMs = client.getConfiguration().getOperationTimeoutMs();
+                } else {
+                    timeToWaitMs = diff;
+                }
             }
-        }
-        requestTimeout = client.timer().newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS);
+            requestTimeout = client.timer().newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS);
+        });
     }
 
     private ClientCnx cnx() {

[pulsar] 02/15: [Config] Add readWorkerThreadsThrottlingEnabled to conf/bookkeeper.conf (#12666)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c81aec3a16139f8023c8e8021e3ce74c974212ba
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Nov 10 06:03:06 2021 +0200

    [Config] Add readWorkerThreadsThrottlingEnabled to conf/bookkeeper.conf (#12666)
    
    - https://github.com/apache/bookkeeper/pull/2646 added "Auto-throttle read operations" which is
      enabled by default
    
    (cherry picked from commit fc6d6dadaf77766189d0731196646d4c79874c8c)
---
 conf/bookkeeper.conf | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/conf/bookkeeper.conf b/conf/bookkeeper.conf
index 2fdb0fd..f26abe0 100644
--- a/conf/bookkeeper.conf
+++ b/conf/bookkeeper.conf
@@ -166,6 +166,11 @@ maxPendingReadRequestsPerThread=2500
 # avoid the executor queue to grow indefinitely
 maxPendingAddRequestsPerThread=10000
 
+# Use auto-throttling of the read-worker threads. This is done
+# to ensure the bookie is not using unlimited amount of memory
+# to respond to read-requests.
+readWorkerThreadsThrottlingEnabled=true
+
 # Option to enable busy-wait settings. Default is false.
 # WARNING: This option will enable spin-waiting on executors and IO threads in order to reduce latency during
 # context switches. The spinning will consume 100% CPU even when bookie is not doing any work. It is recommended to