You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 13:13:52 UTC

[pulsar] branch branch-2.9 updated (11c01cd -> 91a2b12)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 11c01cd  Fix java doc for MultipleListenerValidator (#12389)
     new 2014457  Websocket should pass the encryption context to the consumers (#12539)
     new c3459ef  [Python Client] Python client support using custom Avro schema definition (#12516)
     new eabd17d  Update Producer stats on producer close() (#12500)
     new 4d29ea6  [pulsar-admin] Modify exception of set-properties for namespace (#12436)
     new f769226  The count of topics on the bundle is less than 2,skip split (#12527)
     new 849cfe4  Optimize Tests (#12560)
     new 33b5d5f  [CAPI] Support setting priority for consumers (#12526)
     new ed51905  [Transaction]Fix maxReadPosition with normal publish (#12386)
     new 038bb43  [C++] Fix request timeout for GetLastMessageId doesn't work (#12586)
     new 10bbe0a  [Transaction] Merge transactionBuffer exception into a class (#12358)
     new 0b5430a  [docs] Fix doc for pulsar-admin bookies cmd (#12542)
     new efb36fe  fix log typo in NamespaceService#checkHeartbeatNamespace and NamespaceService#checkHeartbeatNamespaceV2 (#12582)
     new 2cc110e  Fix additional servlets nar might extract to null   directory (#12585)
     new 7e97e6c  Fix invalid firstSentAt value (#12588)
     new 91a2b12  [Test] Cleanup ProxyPublishConsumeTest (#12607)

The 15 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../web/plugin/servlet/AdditionalServlets.java     |  21 ++--
 .../broker/TransactionMetadataStoreService.java    |   6 +-
 .../loadbalance/impl/BundleSplitterTask.java       |   4 +-
 .../pulsar/broker/namespace/NamespaceService.java  |   4 +-
 .../PersistentDispatcherMultipleConsumers.java     |   4 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |   4 +-
 .../streamingdispatch/StreamingEntryReader.java    |   4 +-
 .../buffer/TransactionBufferReader.java            |   5 +-
 .../broker/transaction/buffer/TransactionMeta.java |   6 +-
 .../exceptions/EndOfTransactionException.java      |  31 -----
 .../NoTxnsCommittedAtLedgerException.java          |  31 -----
 .../TransactionBufferProviderException.java        |  30 -----
 .../exceptions/TransactionNotFoundException.java   |  31 -----
 .../exceptions/TransactionNotSealedException.java  |  31 -----
 .../exceptions/TransactionSealedException.java     |  33 ------
 .../exceptions/TransactionStatusException.java     |  37 ------
 .../exceptions/UnsupportedTxnActionException.java  |  34 ------
 .../buffer/impl/InMemTransactionBuffer.java        |  48 ++++----
 .../buffer/impl/InMemTransactionBufferReader.java  |   4 +-
 .../buffer/impl/TopicTransactionBuffer.java        |   7 +-
 .../exception/TransactionException.java            |  83 +++++++++++++
 .../buffer/TransactionBufferException.java         |  91 +++++++++++++++
 .../buffer}/package-info.java                      |   2 +-
 .../TransactionCoordinatorException.java}          |  29 ++++-
 .../coordinator}/package-info.java                 |   2 +-
 .../exceptions => exception}/package-info.java     |   2 +-
 .../TransactionPendingAckException.java}           |  25 +++-
 .../pendingack}/package-info.java                  |   2 +-
 ...ransactionPendingAckStoreProviderException.java |  32 ------
 .../pendingack/impl/MLPendingAckStoreProvider.java |   5 +-
 .../TopicTransactionBufferRecoverTest.java         |  36 +-----
 .../transaction/TransactionClientConnectTest.java  |  31 +----
 .../broker/transaction/TransactionConsumeTest.java |  17 ++-
 .../broker/transaction/TransactionProduceTest.java |  42 +------
 .../pulsar/broker/transaction/TransactionTest.java | 107 ++++++++++++-----
 .../broker/transaction/TransactionTestBase.java    |  69 ++++++++---
 .../buffer/InMemTransactionBufferReaderTest.java   |   4 +-
 .../transaction/buffer/TransactionBufferTest.java  |  16 ++-
 .../buffer/TransactionLowWaterMarkTest.java        |  39 +------
 .../buffer/TransactionStablePositionTest.java      |  26 +----
 .../TransactionMetaStoreAssignmentTest.java        |  14 +--
 .../pendingack/PendingAckInMemoryDeleteTest.java   |  43 +------
 .../pendingack/PendingAckPersistentTest.java       |  35 +-----
 .../client/impl/TransactionEndToEndTest.java       |  38 +-----
 .../websocket/proxy/ProxyPublishConsumeTest.java   |  73 ++++++++++--
 .../websocket/proxy/SimpleConsumerSocket.java      |   3 +
 .../include/pulsar/c/consumer_configuration.h      |   6 +
 pulsar-client-cpp/lib/ClientConnection.cc          |   7 +-
 pulsar-client-cpp/lib/Future.h                     |  32 +++---
 pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc |  10 ++
 pulsar-client-cpp/python/examples/company.avsc     |  19 +++
 .../python/pulsar/schema/schema_avro.py            |  29 +++--
 pulsar-client-cpp/python/schema_test.py            | 128 ++++++++++++++++++++-
 pulsar-client-cpp/tests/PromiseTest.cc             |  84 ++++++++++++++
 .../org/apache/pulsar/admin/cli/CmdBookies.java    |   4 +-
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |   9 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    |   2 +-
 .../client/impl/ProducerStatsRecorderImpl.java     |  90 ++++++++-------
 .../client/impl/PartitionedProducerImplTest.java   |  33 ++++++
 .../client/impl/ProducerStatsRecorderImplTest.java |  20 ++++
 .../apache/pulsar/websocket/ConsumerHandler.java   |   1 +
 .../pulsar/websocket/data/ConsumerMessage.java     |   3 +
 62 files changed, 930 insertions(+), 788 deletions(-)
 delete mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/EndOfTransactionException.java
 delete mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/NoTxnsCommittedAtLedgerException.java
 delete mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionBufferProviderException.java
 delete mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionNotFoundException.java
 delete mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionNotSealedException.java
 delete mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionSealedException.java
 delete mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java
 delete mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/UnsupportedTxnActionException.java
 create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/TransactionException.java
 create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/buffer/TransactionBufferException.java
 copy pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/{buffer/exceptions => exception/buffer}/package-info.java (93%)
 copy pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/{buffer/exceptions/TransactionBufferException.java => exception/coordinator/TransactionCoordinatorException.java} (51%)
 copy pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/{buffer/exceptions => exception/coordinator}/package-info.java (92%)
 copy pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/{buffer/exceptions => exception}/package-info.java (93%)
 rename pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/{buffer/exceptions/TransactionBufferException.java => exception/pendingack/TransactionPendingAckException.java} (58%)
 rename pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/{buffer/exceptions => exception/pendingack}/package-info.java (92%)
 delete mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/exceptions/TransactionPendingAckStoreProviderException.java
 create mode 100644 pulsar-client-cpp/python/examples/company.avsc
 create mode 100644 pulsar-client-cpp/tests/PromiseTest.cc

[pulsar] 06/15: Optimize Tests (#12560)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 849cfe4ac6bf366bce2a15a379e0fae0a14a7600
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Mon Nov 1 23:36:38 2021 +0800

    Optimize Tests (#12560)
    
    (cherry picked from commit cb703cab9b58bc7fe2da5694e2404538c44dab7a)
---
 .../TopicTransactionBufferRecoverTest.java         | 36 +----------
 .../transaction/TransactionClientConnectTest.java  | 31 +---------
 .../broker/transaction/TransactionProduceTest.java | 42 ++-----------
 .../pulsar/broker/transaction/TransactionTest.java | 26 +-------
 .../broker/transaction/TransactionTestBase.java    | 69 ++++++++++++++++------
 .../buffer/TransactionLowWaterMarkTest.java        | 39 +-----------
 .../buffer/TransactionStablePositionTest.java      | 26 +-------
 .../TransactionMetaStoreAssignmentTest.java        | 14 +----
 .../pendingack/PendingAckInMemoryDeleteTest.java   | 43 +-------------
 .../pendingack/PendingAckPersistentTest.java       | 35 ++---------
 .../client/impl/TransactionEndToEndTest.java       | 38 +-----------
 11 files changed, 72 insertions(+), 327 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 3607b45..335cecc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.broker.transaction;
 
-import com.google.common.collect.Sets;
-
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
@@ -55,10 +53,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.events.EventsTopicNames;
-import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
@@ -75,8 +70,6 @@ import static org.testng.Assert.assertTrue;
 @Slf4j
 public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
 
-    private static final String TENANT = "tnx";
-    private static final String NAMESPACE1 = TENANT + "/ns1";
     private static final String RECOVER_COMMIT = NAMESPACE1 + "/recover-commit";
     private static final String RECOVER_ABORT = NAMESPACE1 + "/recover-abort";
     private static final String SUBSCRIPTION_NAME = "test-recover";
@@ -85,36 +78,9 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
     private static final int NUM_PARTITIONS = 16;
     @BeforeMethod
     protected void setup() throws Exception {
-        setBrokerCount(1);
-        internalSetup();
-
-        String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1];
-        admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
-        admin.tenants().createTenant(TENANT,
-                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NAMESPACE1);
-        admin.topics().createNonPartitionedTopic(RECOVER_COMMIT);
+        setUpBase(1, NUM_PARTITIONS, RECOVER_COMMIT, 0);
         admin.topics().createNonPartitionedTopic(RECOVER_ABORT);
         admin.topics().createNonPartitionedTopic(TAKE_SNAPSHOT);
-
-        admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
-
-        if (pulsarClient != null) {
-            pulsarClient.shutdown();
-        }
-        pulsarClient = PulsarClient.builder()
-                .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
-
-
-        // wait tc init success to ready state
-        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java
index 42eadfe..a51eae8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java
@@ -18,20 +18,14 @@
  */
 package org.apache.pulsar.broker.transaction;
 
-import com.google.common.collect.Sets;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.TransactionMetadataStoreService;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
 import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
-import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
@@ -51,33 +45,12 @@ import static org.testng.FileAssert.fail;
 
 public class TransactionClientConnectTest extends TransactionTestBase {
 
-    private static final String RECONNECT_TOPIC = "persistent://public/txn/txn-client-reconnect-test";
+    private static final String RECONNECT_TOPIC = NAMESPACE1 + "/txn-client-reconnect-test";
     private static final int NUM_PARTITIONS = 1;
     @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
-        setBrokerCount(1);
-        super.internalSetup();
-
-        String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1];
-        admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
-        admin.tenants().createTenant("public",
-                new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace("public/txn", 10);
-        admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        admin.topics().createNonPartitionedTopic(RECONNECT_TOPIC);
+        setUpBase(1, NUM_PARTITIONS, RECONNECT_TOPIC, 0);
         admin.topics().createSubscription(RECONNECT_TOPIC, "test", MessageId.latest);
-        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
-
-        pulsarClient = PulsarClient.builder()
-                .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
-        // wait tc init success to ready state
-        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 0e63f53..cbae03b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.transaction;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -50,10 +49,7 @@ import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.api.proto.MarkerType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -69,9 +65,6 @@ import org.testng.annotations.Test;
 public class TransactionProduceTest extends TransactionTestBase {
 
     private static final int TOPIC_PARTITION = 3;
-
-    private static final String TENANT = "tnx";
-    private static final String NAMESPACE1 = TENANT + "/ns1";
     private static final String PRODUCE_COMMIT_TOPIC = NAMESPACE1 + "/produce-commit";
     private static final String PRODUCE_ABORT_TOPIC = NAMESPACE1 + "/produce-abort";
     private static final String ACK_COMMIT_TOPIC = NAMESPACE1 + "/ack-commit";
@@ -79,37 +72,10 @@ public class TransactionProduceTest extends TransactionTestBase {
     private static final int NUM_PARTITIONS = 16;
     @BeforeMethod
     protected void setup() throws Exception {
-        setBrokerCount(1);
-        internalSetup();
-
-        String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1];
-        admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
-        admin.tenants().createTenant(TENANT,
-                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NAMESPACE1);
-        admin.topics().createPartitionedTopic(PRODUCE_COMMIT_TOPIC, 3);
-        admin.topics().createPartitionedTopic(PRODUCE_ABORT_TOPIC, 3);
-        admin.topics().createPartitionedTopic(ACK_COMMIT_TOPIC, 3);
-        admin.topics().createPartitionedTopic(ACK_ABORT_TOPIC, 3);
-
-        admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
-
-        if (pulsarClient != null) {
-            pulsarClient.shutdown();
-        }
-        pulsarClient = PulsarClient.builder()
-                .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
-
-
-        // wait tc init success to ready state
-        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
+        setUpBase(1, NUM_PARTITIONS, PRODUCE_COMMIT_TOPIC, TOPIC_PARTITION);
+        admin.topics().createPartitionedTopic(PRODUCE_ABORT_TOPIC, TOPIC_PARTITION);
+        admin.topics().createPartitionedTopic(ACK_COMMIT_TOPIC, TOPIC_PARTITION);
+        admin.topics().createPartitionedTopic(ACK_ABORT_TOPIC, TOPIC_PARTITION);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 12a6b28..31d8113 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -86,36 +86,12 @@ import org.testng.annotations.Test;
 @Test(groups = "broker")
 public class TransactionTest extends TransactionTestBase {
 
-    private static final String TENANT = "tnx";
-    private static final String NAMESPACE1 = TENANT + "/ns1";
     private static final int NUM_BROKERS = 1;
     private static final int NUM_PARTITIONS = 1;
 
     @BeforeMethod
     protected void setup() throws Exception {
-        this.setBrokerCount(NUM_BROKERS);
-        this.internalSetup();
-
-        String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length - 1];
-        admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder()
-                .serviceUrl("http://localhost:" + webServicePort).build());
-        admin.tenants().createTenant(TENANT,
-                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NAMESPACE1);
-
-        admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
-        pulsarClient.close();
-        pulsarClient = PulsarClient.builder()
-                .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
-        // wait tc init success to ready state
-        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
+       setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0);
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 1dba73a..e13365d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.transaction;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.channel.EventLoopGroup;
 import java.util.ArrayList;
@@ -45,10 +46,10 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.SameThreadOrderedSafeExecutor;
 import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
-import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
-import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
-import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -61,6 +62,7 @@ import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.MockZooKeeperSession;
 import org.apache.zookeeper.ZooKeeper;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 
 @Slf4j
 public abstract class TransactionTestBase extends TestRetrySupport {
@@ -83,6 +85,9 @@ public abstract class TransactionTestBase extends TestRetrySupport {
     private OrderedExecutor bkExecutor;
     private NonClosableMockBookKeeper mockBookKeeper;
 
+    public static final String TENANT = "tnx";
+    protected static final String NAMESPACE1 = TENANT + "/ns1";
+
     public void internalSetup() throws Exception {
         incrementSetupNumber();
         init();
@@ -108,6 +113,40 @@ public abstract class TransactionTestBase extends TestRetrySupport {
         mockBookKeeper = createMockBookKeeper(bkExecutor);
         startBroker();
     }
+    protected void setUpBase(int numBroker,int numPartitionsOfTC, String topic, int numPartitions) throws Exception{
+        setBrokerCount(numBroker);
+        internalSetup();
+
+        String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
+        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1];
+        admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:"
+                + webServicePort).build());
+
+        admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
+        admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), numPartitionsOfTC);
+        if (topic != null) {
+            admin.tenants().createTenant(TENANT,
+                    new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
+            admin.namespaces().createNamespace(NAMESPACE1);
+            if (numPartitions == 0) {
+                admin.topics().createNonPartitionedTopic(topic);
+            } else {
+                admin.topics().createPartitionedTopic(topic, numPartitions);
+            }
+        }
+        if (pulsarClient != null) {
+            pulsarClient.shutdown();
+        }
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .enableTransaction(true)
+                .build();
+        // wait tc init success to ready state
+        waitForCoordinatorToBeAvailable(numPartitionsOfTC);
+    }
 
     protected void startBroker() throws Exception {
         for (int i = 0; i < brokerCount; i++) {
@@ -295,20 +334,12 @@ public abstract class TransactionTestBase extends TestRetrySupport {
     }
     public void waitForCoordinatorToBeAvailable(int numOfTCPerBroker){
         // wait tc init success to ready state
-        Awaitility.await().until(() -> {
-            Map<TransactionCoordinatorID, TransactionMetadataStore> stores =
-                    getPulsarServiceList().get(brokerCount-1).getTransactionMetadataStoreService().getStores();
-            if (stores.size() == numOfTCPerBroker) {
-                for (TransactionCoordinatorID transactionCoordinatorID : stores.keySet()) {
-                    if (((MLTransactionMetadataStore) stores.get(transactionCoordinatorID)).getState()
-                            != TransactionMetadataStoreState.State.Ready) {
-                        return false;
-                    }
-                }
-                return true;
-            } else {
-                return false;
-            }
-        });
+        Awaitility.await()
+                .untilAsserted(() -> {
+                    int transactionMetaStoreCount = pulsarServiceList.stream()
+                            .mapToInt(pulsarService -> pulsarService.getTransactionMetadataStoreService().getStores().size())
+                            .sum();
+                    Assert.assertEquals(transactionMetaStoreCount, numOfTCPerBroker);
+                });
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index db9d407..873509f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -23,17 +23,12 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
-import com.google.common.collect.Sets;
-
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-
-import javax.validation.constraints.AssertTrue;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 
@@ -47,7 +42,6 @@ import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
@@ -55,17 +49,13 @@ import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientExce
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
-import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
-import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -80,35 +70,12 @@ import org.testng.annotations.Test;
 @Test(groups = "broker")
 public class TransactionLowWaterMarkTest extends TransactionTestBase {
 
-    private static final String TENANT = "tnx";
-    private static final String NAMESPACE1 = TENANT + "/ns1";
     private static final String TOPIC = NAMESPACE1 + "/test-topic";
 
     @BeforeMethod(alwaysRun = true)
     protected void setup() throws Exception {
-        setBrokerCount(1);
-        internalSetup();
-
-        String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1];
-        admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
-        admin.tenants().createTenant(TENANT,
-                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NAMESPACE1);
-        admin.topics().createNonPartitionedTopic(TOPIC);
-        admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
-
-        if (pulsarClient != null) {
-            pulsarClient.shutdown();
-        }
-        pulsarClient = PulsarClient.builder()
-                .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
+        setUpBase(1, 16, TOPIC, 0);
+
         Map<TransactionCoordinatorID, TransactionMetadataStore> stores =
                 getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores();
         Awaitility.await().until(() -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
index e43f262..ef1c761 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
@@ -54,35 +54,11 @@ import org.testng.annotations.Test;
 @Test(groups = "broker")
 public class TransactionStablePositionTest extends TransactionTestBase {
 
-    private static final String TENANT = "tnx";
-    private static final String NAMESPACE1 = TENANT + "/ns1";
     private static final String TOPIC = NAMESPACE1 + "/test-topic";
 
     @BeforeMethod
     protected void setup() throws Exception {
-        internalSetup();
-
-        String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1];
-        admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
-        admin.tenants().createTenant(TENANT,
-                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NAMESPACE1);
-        admin.topics().createNonPartitionedTopic(TOPIC);
-        admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
-
-        if (pulsarClient != null) {
-            pulsarClient.shutdown();
-        }
-        pulsarClient = PulsarClient.builder()
-                .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
-
+        setUpBase(1, 16, TOPIC, 0);
         Awaitility.await().until(() -> ((PulsarClientImpl) pulsarClient)
                 .getTcClient().getState() == TransactionCoordinatorClient.State.READY);
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
index 1725305..0102786 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
@@ -20,15 +20,11 @@ package org.apache.pulsar.broker.transaction.coordinator;
 
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.collect.Sets;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.ServiceUrlProvider;
-import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -41,15 +37,7 @@ public class TransactionMetaStoreAssignmentTest extends TransactionTestBase {
     @Override
     @BeforeMethod(alwaysRun = true)
     protected void setup() throws Exception {
-        setBrokerCount(3);
-        super.internalSetup();
-        String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1];
-        admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
-        admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
+        setUpBase(3, 16, null, 0);
         pulsarClient.close();
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
index fc952c4..bc22473 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.transaction.pendingack;
 
-import com.google.common.collect.Sets;
 
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
@@ -35,21 +34,11 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
-import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
-import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
-import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -58,7 +47,6 @@ import org.testng.annotations.Test;
 
 import java.lang.reflect.Field;
 import java.util.HashMap;
-import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -71,39 +59,10 @@ import static org.testng.Assert.assertTrue;
 @Test(groups = "broker")
 public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
 
-    private static final String TENANT = "tnx";
-    private static final String NAMESPACE1 = TENANT + "/ns1";
     private static final int NUM_PARTITIONS = 16;
     @BeforeMethod
     protected void setup() throws Exception {
-        setBrokerCount(1);
-        internalSetup();
-
-        String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1];
-        admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
-        admin.tenants().createTenant(TENANT,
-                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
-
-        admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NAMESPACE1);
-        admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
-
-        if (pulsarClient != null) {
-            pulsarClient.shutdown();
-        }
-        pulsarClient = PulsarClient.builder()
-                .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
-
-        Map<TransactionCoordinatorID, TransactionMetadataStore> stores =
-                getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores();
-        // wait tc init success to ready state
-        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
+        setUpBase(1, NUM_PARTITIONS, NAMESPACE1 +"/test", 0);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 3820ebc..97f8f51d3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -62,36 +62,13 @@ import org.testng.annotations.Test;
 @Slf4j
 public class PendingAckPersistentTest extends TransactionTestBase {
 
-    private static final String PENDING_ACK_REPLAY_TOPIC = "persistent://public/txn/pending-ack-replay";
-
-    private static final String NAMESPACE = "public/txn";
+    private static final String PENDING_ACK_REPLAY_TOPIC = NAMESPACE1 + "/pending-ack-replay";
 
     private static final int NUM_PARTITIONS = 16;
 
     @BeforeMethod
     public void setup() throws Exception {
-        setBrokerCount(1);
-        super.internalSetup();
-
-        String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1];
-        admin.clusters().createCluster(CLUSTER_NAME, ClusterDataImpl.builder().serviceUrl("http://localhost:" + webServicePort).build());
-        admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
-        admin.tenants().createTenant("public",
-                new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NAMESPACE, 10);
-        admin.topics().createNonPartitionedTopic(PENDING_ACK_REPLAY_TOPIC);
-
-        pulsarClient = PulsarClient.builder()
-                .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
-        // wait tc init success to ready state
-        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
+        setUpBase(1, NUM_PARTITIONS, PENDING_ACK_REPLAY_TOPIC, 0);
     }
 
     @AfterMethod(alwaysRun = true)
@@ -312,7 +289,7 @@ public class PendingAckPersistentTest extends TransactionTestBase {
         String subName = "test-delete";
 
         String topic = TopicName.get(TopicDomain.persistent.toString(),
-                NamespaceName.get(NAMESPACE), "test-delete").toString();
+                NamespaceName.get(NAMESPACE1), "test-delete").toString();
         @Cleanup
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic(topic)
@@ -325,7 +302,7 @@ public class PendingAckPersistentTest extends TransactionTestBase {
 
         admin.topics().deleteSubscription(topic, subName);
 
-        List<String> topics = admin.namespaces().getTopics(NAMESPACE);
+        List<String> topics = admin.namespaces().getTopics(NAMESPACE1);
 
         TopicStats topicStats = admin.topics().getStats(topic, false);
 
@@ -341,7 +318,7 @@ public class PendingAckPersistentTest extends TransactionTestBase {
         String subName2 = "test-delete";
 
         String topic = TopicName.get(TopicDomain.persistent.toString(),
-                NamespaceName.get(NAMESPACE), "test-delete").toString();
+                NamespaceName.get(NAMESPACE1), "test-delete").toString();
         @Cleanup
         Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
                 .topic(topic)
@@ -364,7 +341,7 @@ public class PendingAckPersistentTest extends TransactionTestBase {
 
         admin.topics().delete(topic);
 
-        List<String> topics = admin.namespaces().getTopics(NAMESPACE);
+        List<String> topics = admin.namespaces().getTopics(NAMESPACE1);
 
         assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic, subName1)));
         assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic, subName2)));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 9c30f4a..4630449 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -23,9 +23,6 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
-import com.google.common.collect.Sets;
-
 import java.lang.reflect.Field;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -63,11 +60,8 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.api.proto.CommandAck;
-import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
@@ -87,42 +81,14 @@ import org.testng.annotations.Test;
 public class TransactionEndToEndTest extends TransactionTestBase {
 
     private static final int TOPIC_PARTITION = 3;
-
-    private static final String TENANT = "tnx";
-    private static final String NAMESPACE1 = TENANT + "/ns1";
     private static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
     private static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test";
     private static final int NUM_PARTITIONS = 16;
     @BeforeMethod
     protected void setup() throws Exception {
-        setBrokerCount(1);
-        internalSetup();
-
-        String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1];
-        admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
-        admin.tenants().createTenant(TENANT,
-                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NAMESPACE1);
-        admin.topics().createPartitionedTopic(TOPIC_OUTPUT, TOPIC_PARTITION);
+        setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
         admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
-
-        admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
-
-        if (pulsarClient != null) {
-            pulsarClient.close();
-        }
-        pulsarClient = PulsarClient.builder()
-                .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
-
-        // wait tc init success to ready state
-        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);    }
+    }
 
     @AfterMethod(alwaysRun = true)
     protected void cleanup() {

[pulsar] 12/15: fix log typo in NamespaceService#checkHeartbeatNamespace and NamespaceService#checkHeartbeatNamespaceV2 (#12582)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit efb36feec70e57cf3856f10d384b7ed8b7de370a
Author: Jason918 <ja...@qq.com>
AuthorDate: Thu Nov 4 22:44:04 2021 +0800

    fix log typo in NamespaceService#checkHeartbeatNamespace and NamespaceService#checkHeartbeatNamespaceV2 (#12582)
    
    (cherry picked from commit 6afbee6d0e7a6d21eb0fca400a19e38cbbad102e)
---
 .../java/org/apache/pulsar/broker/namespace/NamespaceService.java     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index dc8f383..f6cba9c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1323,7 +1323,7 @@ public class NamespaceService implements AutoCloseable {
     public static String checkHeartbeatNamespace(ServiceUnitId ns) {
         Matcher m = HEARTBEAT_NAMESPACE_PATTERN.matcher(ns.getNamespaceObject().toString());
         if (m.matches()) {
-            LOG.debug("SLAMonitoring namespace matched the lookup namespace {}", ns.getNamespaceObject().toString());
+            LOG.debug("Heartbeat namespace matched the lookup namespace {}", ns.getNamespaceObject().toString());
             return String.format("http://%s", m.group(1));
         } else {
             return null;
@@ -1333,7 +1333,7 @@ public class NamespaceService implements AutoCloseable {
     public static String checkHeartbeatNamespaceV2(ServiceUnitId ns) {
         Matcher m = HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(ns.getNamespaceObject().toString());
         if (m.matches()) {
-            LOG.debug("SLAMonitoring namespace matched the lookup namespace {}", ns.getNamespaceObject().toString());
+            LOG.debug("Heartbeat namespace v2 matched the lookup namespace {}", ns.getNamespaceObject().toString());
             return String.format("http://%s", m.group(1));
         } else {
             return null;

[pulsar] 05/15: The count of topics on the bundle is less than 2,skip split (#12527)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f7692269f9155b3da9e9a2e0ff028dbaa7e4b4fc
Author: chenlin <15...@qq.com>
AuthorDate: Mon Nov 1 16:01:32 2021 +0800

    The count of topics on the bundle is less than 2,skip split (#12527)
    
    The count of topics on the bundle is less than 2,skip split
    
    (cherry picked from commit 384c5dce8b9948ef8d01a20d18f3fd310e63f091)
---
 .../org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
index e81fb50..fa48618 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
@@ -72,8 +72,8 @@ public class BundleSplitterTask implements BundleSplitStrategy {
             for (final Map.Entry<String, NamespaceBundleStats> entry : localData.getLastStats().entrySet()) {
                 final String bundle = entry.getKey();
                 final NamespaceBundleStats stats = entry.getValue();
-                if (stats.topics == 1) {
-                    log.info("namespace bundle {} only have 1 topic", bundle);
+                if (stats.topics < 2) {
+                    log.info("The count of topics on the bundle {} is less than 2,skip split!", bundle);
                     continue;
                 }
                 double totalMessageRate = 0;

[pulsar] 08/15: [Transaction]Fix maxReadPosition with normal publish (#12386)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ed519055161f637c81349282176cb151a6dafc7a
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Wed Nov 3 12:57:06 2021 +0800

    [Transaction]Fix maxReadPosition with normal publish (#12386)
    
    now Transaction buffer syncMaxReadPositionForNormalPublish don't wait recover success. Transaction buffer recover is asynchronous, so we need to wait buffer recover success then syncMaxReadPositionForNormalPublish when producer normal message.
    
    We should not change maxReadPosition before TransactionBuffer recovers completely.
    
    It will change when the state of TB is NoSnapshot or the state of TB is Ready  but the ongoingTxn is empty.
    
    (cherry picked from commit 32c8f865f731f739065ef900126973be258d3801)
---
 .../buffer/impl/TopicTransactionBuffer.java        |  7 +-
 .../broker/transaction/TransactionConsumeTest.java | 17 ++++-
 .../pulsar/broker/transaction/TransactionTest.java | 81 +++++++++++++++++++++-
 3 files changed, 99 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 78f9295..84b209c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -455,9 +455,14 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
         // when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback
         // thread is the same tread, in this time the lastAddConfirm don't content transaction message.
         synchronized (TopicTransactionBuffer.this) {
-            if (ongoingTxns.isEmpty()) {
+            if (checkIfNoSnapshot()) {
                 maxReadPosition = position;
                 changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
+            } else if (checkIfReady()) {
+                if (ongoingTxns.isEmpty()) {
+                    maxReadPosition = position;
+                    changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
+                }
             }
         }
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
index 82fd1d1..d381487 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
@@ -36,12 +36,16 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -75,6 +79,11 @@ public class TransactionConsumeTest extends TransactionTestBase {
                 new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet(CLUSTER_NAME)));
         admin.namespaces().createNamespace("public/txn", 10);
         admin.topics().createNonPartitionedTopic(CONSUME_TOPIC);
+
+        admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
+        admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
     }
 
     @AfterMethod(alwaysRun = true)
@@ -229,7 +238,12 @@ public class TransactionConsumeTest extends TransactionTestBase {
 
     private List<MessageIdData> appendTransactionMessages(
             TxnID txnID, PersistentTopic topic, int transactionMsgCnt, List<String> sendMessageList)
-            throws ExecutionException, InterruptedException {
+            throws ExecutionException, InterruptedException, PulsarClientException {
+        //Change the state of TB to Ready.
+        @Cleanup
+        Producer<String> producer = PulsarClient.builder().serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                .enableTransaction(true).build()
+                .newProducer(Schema.STRING).topic(CONSUME_TOPIC).sendTimeout(0, TimeUnit.SECONDS).create();
         List<MessageIdData> positionList = new ArrayList<>();
         for (int i = 0; i < transactionMsgCnt; i++) {
             final int j = i;
@@ -239,7 +253,6 @@ public class TransactionConsumeTest extends TransactionTestBase {
                     .setTxnidMostBits(txnID.getMostSigBits())
                     .setTxnidLeastBits(txnID.getLeastSigBits())
                     .setPublishTime(System.currentTimeMillis());
-
             String msg = TXN_MSG_CONTENT + i;
             sendMessageList.add(msg);
             ByteBuf headerAndPayload = Commands.serializeMetadataAndPayload(
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 31d8113..a237314 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -26,7 +26,6 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
-import com.google.common.collect.Sets;
 import io.netty.buffer.Unpooled;
 import java.lang.reflect.Field;
 import java.util.List;
@@ -42,10 +41,13 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
@@ -68,9 +70,7 @@ import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
@@ -409,4 +409,79 @@ public class TransactionTest extends TransactionTestBase {
             return true;
         });
     }
+
+    @Test
+    public void testMaxReadPositionForNormalPublish() throws Exception{
+        String topic = "persistent://" + NAMESPACE1 + "/NormalPublish";
+        admin.topics().createNonPartitionedTopic(topic);
+        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
+                  .getTopic(topic, false).get().get();
+
+        TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
+        PulsarClient noTxnClient = PulsarClient.builder().enableTransaction(false)
+                .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()).build();
+
+        //test the state of TransactionBuffer is NoSnapshot
+        //before build Producer by pulsarClient that enables transaction.
+        Producer<String> normalProducer = noTxnClient.newProducer(Schema.STRING)
+                .producerName("testNormalPublish")
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+        Awaitility.await().untilAsserted(() -> Assert.assertTrue(topicTransactionBuffer.checkIfNoSnapshot()));
+
+        //test publishing normal messages will change maxReadPosition in the state of NoSnapshot.
+        MessageIdImpl messageId = (MessageIdImpl) normalProducer.newMessage().value("normal message").send();
+        PositionImpl position = topicTransactionBuffer.getMaxReadPosition();
+        Assert.assertEquals(position.getLedgerId(), messageId.getLedgerId());
+        Assert.assertEquals(position.getEntryId(), messageId.getEntryId());
+
+        //test the state of TransactionBuffer is Ready after build Producer by pulsarClient that enables transaction.
+        Producer<String> txnProducer = pulsarClient.newProducer(Schema.STRING)
+                .producerName("testTransactionPublish")
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        Awaitility.await().untilAsserted(() ->Assert.assertTrue(topicTransactionBuffer.checkIfReady()));
+        //test publishing txn messages will not change maxReadPosition if don`t commit or abort.
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
+        MessageIdImpl messageId1 = (MessageIdImpl) txnProducer.newMessage(transaction).value("txn message").send();
+        PositionImpl position1 = topicTransactionBuffer.getMaxReadPosition();
+        Assert.assertEquals(position1.getLedgerId(), messageId.getLedgerId());
+        Assert.assertEquals(position1.getEntryId(), messageId.getEntryId());
+
+        MessageIdImpl messageId2 = (MessageIdImpl) normalProducer.newMessage().value("normal message").send();
+        PositionImpl position2 = topicTransactionBuffer.getMaxReadPosition();
+        Assert.assertEquals(position2.getLedgerId(), messageId.getLedgerId());
+        Assert.assertEquals(position2.getEntryId(), messageId.getEntryId());
+        transaction.commit().get();
+        PositionImpl position3 = topicTransactionBuffer.getMaxReadPosition();
+
+        Assert.assertEquals(position3.getLedgerId(), messageId2.getLedgerId());
+        Assert.assertEquals(position3.getEntryId(), messageId2.getEntryId() + 1);
+
+        //test publishing normal messages will change maxReadPosition if the state of TB
+        //is Ready and ongoingTxns is empty.
+        MessageIdImpl messageId4 = (MessageIdImpl) normalProducer.newMessage().value("normal message").send();
+        PositionImpl position4 = topicTransactionBuffer.getMaxReadPosition();
+        Assert.assertEquals(position4.getLedgerId(), messageId4.getLedgerId());
+        Assert.assertEquals(position4.getEntryId(), messageId4.getEntryId());
+
+        //test publishing normal messages will not change maxReadPosition if the state o TB is Initializing.
+        Class<TopicTransactionBufferState> transactionBufferStateClass =
+                (Class<TopicTransactionBufferState>) topicTransactionBuffer.getClass().getSuperclass();
+        Field field = transactionBufferStateClass.getDeclaredField("state");
+        field.setAccessible(true);
+        Class<TopicTransactionBuffer> topicTransactionBufferClass = TopicTransactionBuffer.class;
+        Field maxReadPositionField = topicTransactionBufferClass.getDeclaredField("maxReadPosition");
+        maxReadPositionField.setAccessible(true);
+        field.set(topicTransactionBuffer, TopicTransactionBufferState.State.Initializing);
+        MessageIdImpl messageId5 = (MessageIdImpl) normalProducer.newMessage().value("normal message").send();
+        PositionImpl position5 = (PositionImpl) maxReadPositionField.get(topicTransactionBuffer);
+        Assert.assertEquals(position5.getLedgerId(), messageId4.getLedgerId());
+        Assert.assertEquals(position5.getEntryId(), messageId4.getEntryId());
+
+        }
 }

[pulsar] 04/15: [pulsar-admin] Modify exception of set-properties for namespace (#12436)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4d29ea63f2f3d4461e1b1d15222deda1aa993dc7
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Mon Nov 1 16:03:58 2021 +0800

    [pulsar-admin] Modify exception of set-properties for namespace (#12436)
    
    when I execute ./pulsar-admin namespaces set-properties --properties a=a=b test/app1
    we hope to display a readable prompt of set-properties for namespace when exception occurs.
    Modifications
    Change IllegalArgumentException to ParameterException
    
    (cherry picked from commit cd9a65ee3c648443ee13f3248add1fd4e10cae89)
---
 .../src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index c642288..f6fa123 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -2198,15 +2198,18 @@ public class CmdNamespaces extends CmdBase {
             String namespace = validateNamespace(params);
             Map<String, String> map = new HashMap<>();
             if (properties.size() == 0) {
-                throw new IllegalArgumentException("Required at least one property for the namespace.");
+                throw new ParameterException(String.format("Required at least one property for the namespace, " +
+                        "but found %d.", properties.size()));
             }
             for (String property : properties) {
                 if (!property.contains("=")) {
-                    throw new IllegalArgumentException("Invalid key value pair format.");
+                    throw new ParameterException(String.format("Invalid key value pair '%s', " +
+                            "valid format like 'a=a,b=b,c=c'.", property));
                 } else {
                     String[] keyValue = property.split("=");
                     if (keyValue.length != 2) {
-                        throw new IllegalArgumentException("Invalid key value pair format.");
+                        throw new ParameterException(String.format("Invalid key value pair '%s', " +
+                                "valid format like 'a=a,b=b,c=c'.", property));
                     }
                     map.put(keyValue[0], keyValue[1]);
                 }

[pulsar] 02/15: [Python Client] Python client support using custom Avro schema definition (#12516)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c3459ef6977505c6041faee5edbfe93cf55f1bb9
Author: ran <ga...@126.com>
AuthorDate: Sat Oct 30 08:17:45 2021 +0800

    [Python Client] Python client support using custom Avro schema definition (#12516)
    
    ### Motivation
    
    Currently, the Python client didn't support using schema definition to generate `AvroSchema`, so users couldn't use the schema definition file in the Python client.
    
    ### Modifications
    
    Add a new init-param `schema_definition` for `AvroSchema`  to support initializing the `AvroSchema` by an Avro schema definition.
    
    ```
    class AvroSchema(Schema):
        def __init__(self, record_cls, schema_definition=None):
            if record_cls is None and schema_definition is None:
                raise AssertionError("The param record_cls and schema_definition shouldn't be both None.")
    
            if record_cls is not None:
                self._schema = record_cls.schema()
            else:
                self._schema = schema_definition
            super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, self._schema, 'AVRO')
    ```
    
    ### How to use
    
    Assume that there is a company Avro schema definition file `company.avsc` like this.
    ```
    {
        "doc": "this is doc",
        "namespace": "example.avro",
        "type": "record",
        "name": "Company",
        "fields": [
            {"name": "name", "type": ["null", "string"]},
            {"name": "address", "type": ["null", "string"]},
            {"name": "employees", "type": ["null", {"type": "array", "items": {
                "type": "record",
                "name": "Employee",
                "fields": [
                    {"name": "name", "type": ["null", "string"]},
                    {"name": "age", "type": ["null", "int"]}
                ]
            }}]},
            {"name": "labels", "type": ["null", {"type": "map", "values": "string"}]}
        ]
    }
    ```
    
    Users could load schema from file by `avro.schema` or `fastavro.schema`
    > refer to [load_schema](https://fastavro.readthedocs.io/en/latest/schema.html#fastavro._schema_py.load_schema) or [Avro Schema](http://avro.apache.org/docs/current/gettingstartedpython.html)
    ```
    schema_definition = load_schema("examples/company.avsc")
    # schema_definition = avro.schema.parse(open("examples/company.avsc", "rb").read()).to_json()
    avro_schema = AvroSchema(None, schema_definition=schema_definition)
    
    producer = client.create_producer(
        topic=topic,
        schema=avro_schema)
    consumer = client.subscribe(topic, 'test', schema=avro_schema)
    
    company = {
        "name": "company-name" + str(i),
        "address": 'xxx road xxx street ' + str(i),
        "employees": [
            {"name": "user" + str(i), "age": 20 + i},
            {"name": "user" + str(i), "age": 30 + i},
            {"name": "user" + str(i), "age": 35 + i},
        ],
        "labels": {
            "industry": "software" + str(i),
            "scale": ">100",
            "funds": "1000000.0"
        }
    }
    producer.send(company)
    
    msg = consumer.receive()
    # Users could get a dict object by `value()` method.
    msg.value()
    ```
    
    (cherry picked from commit 85575f4f5d8eb54516c1c02e1cfcca0d936f2e49)
---
 pulsar-client-cpp/python/examples/company.avsc     |  19 +++
 .../python/pulsar/schema/schema_avro.py            |  29 +++--
 pulsar-client-cpp/python/schema_test.py            | 128 ++++++++++++++++++++-
 3 files changed, 162 insertions(+), 14 deletions(-)

diff --git a/pulsar-client-cpp/python/examples/company.avsc b/pulsar-client-cpp/python/examples/company.avsc
new file mode 100644
index 0000000..cdb595f
--- /dev/null
+++ b/pulsar-client-cpp/python/examples/company.avsc
@@ -0,0 +1,19 @@
+{
+    "doc": "this is doc",
+    "namespace": "example.avro",
+    "type": "record",
+    "name": "Company",
+    "fields": [
+        {"name": "name", "type": ["null", "string"]},
+        {"name": "address", "type": ["null", "string"]},
+        {"name": "employees", "type": ["null", {"type": "array", "items": {
+            "type": "record",
+            "name": "Employee",
+            "fields": [
+                {"name": "name", "type": ["null", "string"]},
+                {"name": "age", "type": ["null", "int"]}
+            ]
+        }}]},
+        {"name": "labels", "type": ["null", {"type": "map", "values": "string"}]}
+    ]
+}
\ No newline at end of file
diff --git a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
index e76fc51..5861505 100644
--- a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
+++ b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
@@ -32,10 +32,15 @@ except ModuleNotFoundError:
 
 if HAS_AVRO:
     class AvroSchema(Schema):
-        def __init__(self, record_cls):
-            super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO,
-                                             record_cls.schema(), 'AVRO')
-            self._schema = record_cls.schema()
+        def __init__(self, record_cls, schema_definition=None):
+            if record_cls is None and schema_definition is None:
+                raise AssertionError("The param record_cls and schema_definition shouldn't be both None.")
+
+            if record_cls is not None:
+                self._schema = record_cls.schema()
+            else:
+                self._schema = schema_definition
+            super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, self._schema, 'AVRO')
 
         def _get_serialized_value(self, x):
             if isinstance(x, enum.Enum):
@@ -53,9 +58,14 @@ if HAS_AVRO:
                 return x
 
         def encode(self, obj):
-            self._validate_object_type(obj)
             buffer = io.BytesIO()
-            m = self.encode_dict(obj.__dict__)
+            m = obj
+            if self._record_cls is not None:
+                self._validate_object_type(obj)
+                m = self.encode_dict(obj.__dict__)
+            elif not isinstance(obj, dict):
+                raise ValueError('If using the custom schema, the record data should be dict type.')
+
             fastavro.schemaless_writer(buffer, self._schema, m)
             return buffer.getvalue()
 
@@ -68,11 +78,14 @@ if HAS_AVRO:
         def decode(self, data):
             buffer = io.BytesIO(data)
             d = fastavro.schemaless_reader(buffer, self._schema)
-            return self._record_cls(**d)
+            if self._record_cls is not None:
+                return self._record_cls(**d)
+            else:
+                return d
 
 else:
     class AvroSchema(Schema):
-        def __init__(self, _record_cls):
+        def __init__(self, _record_cls, _schema_definition):
             raise Exception("Avro library support was not found. Make sure to install Pulsar client " +
                             "with Avro support: pip3 install 'pulsar-client[avro]'")
 
diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py
index 7adbcbe..d2554da 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -25,6 +25,7 @@ import pulsar
 from pulsar.schema import *
 from enum import Enum
 import json
+from fastavro.schema import load_schema
 
 
 class SchemaTest(TestCase):
@@ -1145,12 +1146,127 @@ class SchemaTest(TestCase):
 
         client.close()
 
-    def test(self):
-        class NamespaceDemo(Record):
-            _namespace = 'xxx.xxx.xxx'
-            x = String()
-            y = Integer()
-        print('schema: ', NamespaceDemo.schema())
+    def custom_schema_test(self):
+
+        def encode_and_decode(schema_definition):
+            avro_schema = AvroSchema(None, schema_definition=schema_definition)
+
+            company = {
+                "name": "company-name",
+                "address": 'xxx road xxx street',
+                "employees": [
+                    {"name": "user1", "age": 25},
+                    {"name": "user2", "age": 30},
+                    {"name": "user3", "age": 35},
+                ],
+                "labels": {
+                    "industry": "software",
+                    "scale": ">100",
+                    "funds": "1000000.0"
+                }
+            }
+            data = avro_schema.encode(company)
+            company_decode = avro_schema.decode(data)
+            self.assertEqual(company, company_decode)
+
+        schema_definition = {
+            'doc': 'this is doc',
+            'namespace': 'example.avro',
+            'type': 'record',
+            'name': 'Company',
+            'fields': [
+                {'name': 'name', 'type': ['null', 'string']},
+                {'name': 'address', 'type': ['null', 'string']},
+                {'name': 'employees', 'type': ['null', {'type': 'array', 'items': {
+                    'type': 'record',
+                    'name': 'Employee',
+                    'fields': [
+                        {'name': 'name', 'type': ['null', 'string']},
+                        {'name': 'age', 'type': ['null', 'int']}
+                    ]
+                }}]},
+                {'name': 'labels', 'type': ['null', {'type': 'map', 'values': 'string'}]}
+            ]
+        }
+        encode_and_decode(schema_definition)
+        # Users could load schema from file by `fastavro.schema`
+        # Or use `avro.schema` like this `avro.schema.parse(open("examples/company.avsc", "rb").read()).to_json()`
+        encode_and_decode(load_schema("examples/company.avsc"))
+
+    def custom_schema_produce_and_consume_test(self):
+        client = pulsar.Client(self.serviceUrl)
+
+        def produce_and_consume(topic, schema_definition):
+            print('custom schema produce and consume test topic - ', topic)
+            example_avro_schema = AvroSchema(None, schema_definition=schema_definition)
+
+            producer = client.create_producer(
+                topic=topic,
+                schema=example_avro_schema)
+            consumer = client.subscribe(topic, 'test', schema=example_avro_schema)
+
+            for i in range(0, 10):
+                company = {
+                    "name": "company-name" + str(i),
+                    "address": 'xxx road xxx street ' + str(i),
+                    "employees": [
+                        {"name": "user" + str(i), "age": 20 + i},
+                        {"name": "user" + str(i), "age": 30 + i},
+                        {"name": "user" + str(i), "age": 35 + i},
+                    ],
+                    "labels": {
+                        "industry": "software" + str(i),
+                        "scale": ">100",
+                        "funds": "1000000.0"
+                    }
+                }
+                producer.send(company)
+
+            for i in range(0, 10):
+                msg = consumer.receive()
+                company = {
+                    "name": "company-name" + str(i),
+                    "address": 'xxx road xxx street ' + str(i),
+                    "employees": [
+                        {"name": "user" + str(i), "age": 20 + i},
+                        {"name": "user" + str(i), "age": 30 + i},
+                        {"name": "user" + str(i), "age": 35 + i},
+                    ],
+                    "labels": {
+                        "industry": "software" + str(i),
+                        "scale": ">100",
+                        "funds": "1000000.0"
+                    }
+                }
+                self.assertEqual(msg.value(), company)
+                consumer.acknowledge(msg)
+
+            consumer.close()
+            producer.close()
+
+        schema_definition = {
+            'doc': 'this is doc',
+            'namespace': 'example.avro',
+            'type': 'record',
+            'name': 'Company',
+            'fields': [
+                {'name': 'name', 'type': ['null', 'string']},
+                {'name': 'address', 'type': ['null', 'string']},
+                {'name': 'employees', 'type': ['null', {'type': 'array', 'items': {
+                    'type': 'record',
+                    'name': 'Employee',
+                    'fields': [
+                        {'name': 'name', 'type': ['null', 'string']},
+                        {'name': 'age', 'type': ['null', 'int']}
+                    ]
+                }}]},
+                {'name': 'labels', 'type': ['null', {'type': 'map', 'values': 'string'}]}
+            ]
+        }
+        produce_and_consume('custom-schema-test-1', schema_definition=schema_definition)
+        produce_and_consume('custom-schema-test-2', schema_definition=load_schema("examples/company.avsc"))
+
+        client.close()
 
 if __name__ == '__main__':
     main()

[pulsar] 14/15: Fix invalid firstSentAt value (#12588)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7e97e6cc8ddac16b26a8ab6161b91221ceac470b
Author: Zhanpeng Wu <zh...@qq.com>
AuthorDate: Thu Nov 4 23:15:34 2021 +0800

    Fix invalid firstSentAt value (#12588)
    
    (cherry picked from commit 2ca96e4e0e63f99af0b079f3d590bbc8aa179676)
---
 .../src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java       | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index f8e59a2..e28721c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1227,7 +1227,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                         "%s : createdAt %s ns ago, firstSentAt %s ns ago, lastSentAt %s ns ago, retryCount %s",
                         te.getMessage(),
                         ns - this.createdAt,
-                        ns - this.firstSentAt,
+                        this.firstSentAt <= 0 ? ns - this.lastSentAt : ns - this.firstSentAt,
                         ns - this.lastSentAt,
                         retryCount
                     );

[pulsar] 03/15: Update Producer stats on producer close() (#12500)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eabd17d3ee4bfa239c3e14e873c571ddd4de15b5
Author: Kai Wang <wa...@gmail.com>
AuthorDate: Wed Oct 27 00:50:47 2021 +0800

    Update Producer stats on producer close() (#12500)
    
    (cherry picked from commit 848690621299353284932e9281e4689813835855)
---
 .../client/impl/ProducerStatsRecorderImpl.java     | 90 +++++++++++-----------
 .../client/impl/PartitionedProducerImplTest.java   | 33 ++++++++
 .../client/impl/ProducerStatsRecorderImplTest.java | 20 +++++
 3 files changed, 100 insertions(+), 43 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index faf73cb..6b435d6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -111,49 +111,7 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder {
             }
 
             try {
-                long now = System.nanoTime();
-                double elapsed = (now - oldTime) / 1e9;
-                oldTime = now;
-
-                long currentNumMsgsSent = numMsgsSent.sumThenReset();
-                long currentNumBytesSent = numBytesSent.sumThenReset();
-                long currentNumSendFailedMsgs = numSendFailed.sumThenReset();
-                long currentNumAcksReceived = numAcksReceived.sumThenReset();
-
-                totalMsgsSent.add(currentNumMsgsSent);
-                totalBytesSent.add(currentNumBytesSent);
-                totalSendFailed.add(currentNumSendFailedMsgs);
-                totalAcksReceived.add(currentNumAcksReceived);
-
-                synchronized (ds) {
-                    latencyPctValues = ds.getQuantiles(PERCENTILES);
-                    ds.reset();
-                }
-
-                sendMsgsRate = currentNumMsgsSent / elapsed;
-                sendBytesRate = currentNumBytesSent / elapsed;
-
-                if ((currentNumMsgsSent | currentNumSendFailedMsgs | currentNumAcksReceived
-                        | currentNumMsgsSent) != 0) {
-
-                    for (int i = 0; i < latencyPctValues.length; i++) {
-                        if (Double.isNaN(latencyPctValues[i])) {
-                            latencyPctValues[i] = 0;
-                        }
-                    }
-
-                    log.info("[{}] [{}] Pending messages: {} --- Publish throughput: {} msg/s --- {} Mbit/s --- "
-                            + "Latency: med: {} ms - 95pct: {} ms - 99pct: {} ms - 99.9pct: {} ms - max: {} ms --- "
-                            + "Ack received rate: {} ack/s --- Failed messages: {}", producer.getTopic(),
-                            producer.getProducerName(), producer.getPendingQueueSize(),
-                            THROUGHPUT_FORMAT.format(sendMsgsRate),
-                            THROUGHPUT_FORMAT.format(sendBytesRate / 1024 / 1024 * 8),
-                            DEC.format(latencyPctValues[0]), DEC.format(latencyPctValues[2]),
-                            DEC.format(latencyPctValues[3]), DEC.format(latencyPctValues[4]),
-                            DEC.format(latencyPctValues[5]),
-                            THROUGHPUT_FORMAT.format(currentNumAcksReceived / elapsed), currentNumSendFailedMsgs);
-                }
-
+                updateStats();
             } catch (Exception e) {
                 log.error("[{}] [{}]: {}", producer.getTopic(), producer.getProducerName(), e.getMessage());
             } finally {
@@ -171,6 +129,51 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder {
         return statTimeout;
     }
 
+    protected void updateStats() {
+        long now = System.nanoTime();
+        double elapsed = (now - oldTime) / 1e9;
+        oldTime = now;
+
+        long currentNumMsgsSent = numMsgsSent.sumThenReset();
+        long currentNumBytesSent = numBytesSent.sumThenReset();
+        long currentNumSendFailedMsgs = numSendFailed.sumThenReset();
+        long currentNumAcksReceived = numAcksReceived.sumThenReset();
+
+        totalMsgsSent.add(currentNumMsgsSent);
+        totalBytesSent.add(currentNumBytesSent);
+        totalSendFailed.add(currentNumSendFailedMsgs);
+        totalAcksReceived.add(currentNumAcksReceived);
+
+        synchronized (ds) {
+            latencyPctValues = ds.getQuantiles(PERCENTILES);
+            ds.reset();
+        }
+
+        sendMsgsRate = currentNumMsgsSent / elapsed;
+        sendBytesRate = currentNumBytesSent / elapsed;
+
+        if ((currentNumMsgsSent | currentNumSendFailedMsgs | currentNumAcksReceived
+                | currentNumMsgsSent) != 0) {
+
+            for (int i = 0; i < latencyPctValues.length; i++) {
+                if (Double.isNaN(latencyPctValues[i])) {
+                    latencyPctValues[i] = 0;
+                }
+            }
+
+            log.info("[{}] [{}] Pending messages: {} --- Publish throughput: {} msg/s --- {} Mbit/s --- "
+                            + "Latency: med: {} ms - 95pct: {} ms - 99pct: {} ms - 99.9pct: {} ms - max: {} ms --- "
+                            + "Ack received rate: {} ack/s --- Failed messages: {}", producer.getTopic(),
+                    producer.getProducerName(), producer.getPendingQueueSize(),
+                    THROUGHPUT_FORMAT.format(sendMsgsRate),
+                    THROUGHPUT_FORMAT.format(sendBytesRate / 1024 / 1024 * 8),
+                    DEC.format(latencyPctValues[0]), DEC.format(latencyPctValues[2]),
+                    DEC.format(latencyPctValues[3]), DEC.format(latencyPctValues[4]),
+                    DEC.format(latencyPctValues[5]),
+                    THROUGHPUT_FORMAT.format(currentNumAcksReceived / elapsed), currentNumSendFailedMsgs);
+        }
+    }
+
     @Override
     public void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
         numMsgsSent.add(numMsgs);
@@ -297,6 +300,7 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder {
     }
 
     public void cancelStatsTimeout() {
+        this.updateStats();
         if (statTimeout != null) {
             statTimeout.cancel();
             statTimeout = null;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
index 1f9496b..ad2c992 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
@@ -202,4 +202,37 @@ public class PartitionedProducerImplTest {
         impl.getStats();
     }
 
+    @Test
+    public void testGetStatsWithoutArriveUpdateInterval() throws Exception {
+        String topicName = "test-stats-without-arrive-interval";
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setServiceUrl("pulsar://localhost:6650");
+        conf.setStatsIntervalSeconds(100);
+
+        ThreadFactory threadFactory =
+                new DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
+        EventLoopGroup eventLoopGroup = EventLoopUtil
+                .newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory);
+
+        PulsarClientImpl clientImpl = new PulsarClientImpl(conf, eventLoopGroup);
+
+        ProducerConfigurationData producerConfData = new ProducerConfigurationData();
+        producerConfData.setMessageRoutingMode(MessageRoutingMode.CustomPartition);
+        producerConfData.setCustomMessageRouter(new CustomMessageRouter());
+
+        assertEquals(Long.parseLong("100"), clientImpl.getConfiguration().getStatsIntervalSeconds());
+
+        PartitionedProducerImpl<byte[]> impl = new PartitionedProducerImpl<>(
+                clientImpl, topicName, producerConfData,
+                1, null, null, null);
+
+        impl.getProducers().get(0).getStats().incrementSendFailed();
+        ProducerStatsRecorderImpl stats = impl.getStats();
+        assertEquals(stats.getTotalSendFailed(), 0);
+        // When close producer, the ProducerStatsRecorder will update stats immediately
+        impl.close();
+        stats = impl.getStats();
+        assertEquals(stats.getTotalSendFailed(), 1);
+    }
+
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
index d654158..f6e7f28 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
@@ -54,4 +54,24 @@ public class ProducerStatsRecorderImplTest {
         Thread.sleep(1200);
         assertEquals(1000.0, recorder.getSendLatencyMillisMax(), 0.5);
     }
+
+    @Test
+    public void testGetStatsAndCancelStatsTimeoutWithoutArriveUpdateInterval() {
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setStatsIntervalSeconds(60);
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        when(client.getConfiguration()).thenReturn(conf);
+        Timer timer = new HashedWheelTimer();
+        when(client.timer()).thenReturn(timer);
+        ProducerImpl<?> producer = mock(ProducerImpl.class);
+        when(producer.getTopic()).thenReturn("topic-test");
+        when(producer.getProducerName()).thenReturn("producer-test");
+        when(producer.getPendingQueueSize()).thenReturn(1);
+        ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
+        ProducerStatsRecorderImpl recorder = new ProducerStatsRecorderImpl(client, producerConfigurationData, producer);
+        long latencyNs = TimeUnit.SECONDS.toNanos(1);
+        recorder.incrementNumAcksReceived(latencyNs);
+        recorder.cancelStatsTimeout();
+        assertEquals(1000.0, recorder.getSendLatencyMillisMax(), 0.5);
+    }
 }

[pulsar] 09/15: [C++] Fix request timeout for GetLastMessageId doesn't work (#12586)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 038bb43e7c1e7749348807864cca63586a5a321c
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Nov 3 23:10:15 2021 +0800

    [C++] Fix request timeout for GetLastMessageId doesn't work (#12586)
    
    * Fix request timeout for GetLastMessageId doesn't work
    
    * Fix CentOS 7 build error
    
    * Revert refactors
    
    * Remove redundant clear for listeners
    
    * Use swap instead of move
    
    (cherry picked from commit a54c6c003c626cb16d90200ad81dd3ec37be2133)
---
 pulsar-client-cpp/lib/ClientConnection.cc |  7 ++-
 pulsar-client-cpp/lib/Future.h            | 32 +++++++-----
 pulsar-client-cpp/tests/PromiseTest.cc    | 84 +++++++++++++++++++++++++++++++
 3 files changed, 109 insertions(+), 14 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 2128689..3ad6f40 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -1620,7 +1620,12 @@ Future<Result, MessageId> ClientConnection::newGetLastMessageId(uint64_t consume
 
     pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, promise));
     lock.unlock();
-    sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId);
+    sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId)
+        .addListener([promise](Result result, const ResponseData& data) {
+            if (result != ResultOk) {
+                promise.setFailed(result);
+            }
+        });
     return promise.getFuture();
 }
 
diff --git a/pulsar-client-cpp/lib/Future.h b/pulsar-client-cpp/lib/Future.h
index cafb63f..b695e5e 100644
--- a/pulsar-client-cpp/lib/Future.h
+++ b/pulsar-client-cpp/lib/Future.h
@@ -90,7 +90,8 @@ class Promise {
    public:
     Promise() : state_(std::make_shared<InternalState<Result, Type> >()) {}
 
-    bool setValue(const Type& value) {
+    bool setValue(const Type& value) const {
+        static Result DEFAULT_RESULT;
         InternalState<Result, Type>* state = state_.get();
         Lock lock(state->mutex);
 
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        decltype(state->listeners) listeners;
+        listeners.swap(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
-        state->listeners.clear();
         state->condition.notify_all();
         return true;
     }
 
-    bool setFailed(Result result) {
+    bool setFailed(Result result) const {
+        static Type DEFAULT_VALUE;
         InternalState<Result, Type>* state = state_.get();
         Lock lock(state->mutex);
 
@@ -124,13 +128,15 @@ class Promise {
         state->result = result;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        decltype(state->listeners) listeners;
+        listeners.swap(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(result, DEFAULT_VALUE);
         }
 
-        state->listeners.clear();
         state->condition.notify_all();
         return true;
     }
diff --git a/pulsar-client-cpp/tests/PromiseTest.cc b/pulsar-client-cpp/tests/PromiseTest.cc
new file mode 100644
index 0000000..73c6f8c
--- /dev/null
+++ b/pulsar-client-cpp/tests/PromiseTest.cc
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <lib/Future.h>
+#include <chrono>
+#include <string>
+#include <thread>
+#include <vector>
+
+using namespace pulsar;
+
+TEST(PromiseTest, testSetValue) {
+    Promise<int, std::string> promise;
+    std::thread t{[promise] {
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        promise.setValue("hello");
+    }};
+    t.detach();
+
+    std::string value;
+    ASSERT_EQ(promise.getFuture().get(value), 0);
+    ASSERT_EQ(value, "hello");
+}
+
+TEST(PromiseTest, testSetFailed) {
+    Promise<int, std::string> promise;
+    std::thread t{[promise] {
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        promise.setFailed(-1);
+    }};
+    t.detach();
+
+    std::string value;
+    ASSERT_EQ(promise.getFuture().get(value), -1);
+    ASSERT_EQ(value, "");
+}
+
+TEST(PromiseTest, testListeners) {
+    Promise<int, std::string> promise;
+    auto future = promise.getFuture();
+
+    bool resultSetFailed = true;
+    bool resultSetValue = true;
+    std::vector<int> results;
+    std::vector<std::string> values;
+
+    future
+        .addListener([promise, &resultSetFailed, &results, &values](int result, const std::string& value) {
+            resultSetFailed = promise.setFailed(-1L);
+            results.emplace_back(result);
+            values.emplace_back(value);
+        })
+        .addListener([promise, &resultSetValue, &results, &values](int result, const std::string& value) {
+            resultSetValue = promise.setValue("WRONG");
+            results.emplace_back(result);
+            values.emplace_back(value);
+        });
+
+    promise.setValue("hello");
+    std::string value;
+    ASSERT_EQ(future.get(value), 0);
+    ASSERT_EQ(value, "hello");
+
+    ASSERT_FALSE(resultSetFailed);
+    ASSERT_FALSE(resultSetValue);
+    ASSERT_EQ(results, (std::vector<int>(2, 0)));
+    ASSERT_EQ(values, (std::vector<std::string>(2, "hello")));
+}

[pulsar] 01/15: Websocket should pass the encryption context to the consumers (#12539)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2014457ae2b56eb9b231bfe2c6e8bee3e656cf53
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Oct 29 17:54:27 2021 -0700

    Websocket should pass the encryption context to the consumers (#12539)
    
    (cherry picked from commit d20efe5b0becd73ef33816c4bf33eafa2e28efa4)
---
 .../websocket/proxy/ProxyPublishConsumeTest.java   | 62 ++++++++++++++++++++++
 .../websocket/proxy/SimpleConsumerSocket.java      |  3 ++
 .../apache/pulsar/websocket/ConsumerHandler.java   |  1 +
 .../pulsar/websocket/data/ConsumerMessage.java     |  3 ++
 4 files changed, 69 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 3e3cf9a..6386c51 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -29,12 +29,14 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -54,9 +56,11 @@ import javax.ws.rs.core.Response;
 
 import lombok.Cleanup;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.TopicType;
@@ -935,6 +939,64 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
         }
     }
 
+    @Test(timeOut = 20000)
+    public void consumeEncryptedMessages() throws Exception {
+        final String subscription = "my-sub";
+        final String topic = "my-property/my-ns/encrypted" + UUID.randomUUID();
+        final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
+                "/ws/v2/consumer/persistent/" + topic + "/" + subscription + "?cryptoFailureAction=CONSUME";
+        final int messages = 10;
+
+        WebSocketClient consumerClient = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+
+
+        final String rsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRu [...]
+        final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQ [...]
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .defaultCryptoKeyReader(rsaPublicKeyData)
+                .addEncryptionKey("ws-consumer-a")
+                .create();
+
+        try {
+            consumerClient.start();
+            ClientUpgradeRequest consumerRequest = new ClientUpgradeRequest();
+            Future<Session> consumerFuture = consumerClient.connect(consumeSocket, URI.create(consumerUri), consumerRequest);
+
+            assertTrue(consumerFuture.get().isOpen());
+            assertEquals(consumeSocket.getReceivedMessagesCount(), 0);
+
+            for (int i = 0; i < messages; i++) {
+                producer.sendAsync(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
+            }
+
+            producer.flush();
+            consumeSocket.sendPermits(messages);
+            Awaitility.await().untilAsserted(() ->
+                    Assert.assertEquals(consumeSocket.getReceivedMessagesCount(), messages));
+
+            for (JsonObject msg : consumeSocket.messages) {
+                assertTrue(msg.has("encryptionContext"));
+                JsonObject encryptionCtx = msg.getAsJsonObject("encryptionContext");
+                JsonObject keys = encryptionCtx.getAsJsonObject("keys");
+                assertTrue(keys.has("ws-consumer-a"));
+
+                assertTrue(keys.getAsJsonObject("ws-consumer-a").has("keyValue"));
+            }
+
+            // The message should not be acked since we only acked 1 message of the batch message
+            Awaitility.await().untilAsserted(() ->
+                    Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions()
+                            .get(subscription).getMsgBacklog(), 0));
+
+        } finally {
+            stopWebSocketClient(consumerClient);
+        }
+    }
+
     private void verifyTopicStat(Client client, String baseUrl, String topic) {
         String statUrl = baseUrl + topic + "/stats";
         WebTarget webTarget = client.target(statUrl);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
index 749bfdcd..b1a9908 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
@@ -44,6 +44,7 @@ public class SimpleConsumerSocket {
     private final CountDownLatch closeLatch;
     private Session session;
     private final ArrayList<String> consumerBuffer;
+    final ArrayList<JsonObject> messages;
     private final AtomicInteger receivedMessages = new AtomicInteger();
     // Custom message handler to override standard message processing, if it's needed
     private SimpleConsumerMessageHandler customMessageHandler;
@@ -51,6 +52,7 @@ public class SimpleConsumerSocket {
     public SimpleConsumerSocket() {
         this.closeLatch = new CountDownLatch(1);
         consumerBuffer = new ArrayList<>();
+        this.messages = new ArrayList<>();
     }
 
     public boolean awaitClose(int duration, TimeUnit unit) throws InterruptedException {
@@ -79,6 +81,7 @@ public class SimpleConsumerSocket {
     public synchronized void onMessage(String msg) throws JsonParseException, IOException {
         receivedMessages.incrementAndGet();
         JsonObject message = new Gson().fromJson(msg, JsonObject.class);
+        this.messages.add(message);
         if (message.get(X_PULSAR_MESSAGE_ID) != null) {
             String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString();
             consumerBuffer.add(messageId);
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 54a5a62..0192188 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -160,6 +160,7 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
             dm.properties = msg.getProperties();
             dm.publishTime = DateFormatter.format(msg.getPublishTime());
             dm.redeliveryCount = msg.getRedeliveryCount();
+            dm.encryptionContext = msg.getEncryptionCtx().orElse(null);
             if (msg.getEventTime() != 0) {
                 dm.eventTime = DateFormatter.format(msg.getEventTime());
             }
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerMessage.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerMessage.java
index 9660c95..9091a7e 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerMessage.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerMessage.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import org.apache.pulsar.common.api.EncryptionContext;
 
 @JsonInclude(Include.NON_NULL)
 public class ConsumerMessage {
@@ -32,5 +33,7 @@ public class ConsumerMessage {
     public int redeliveryCount;
     public String eventTime;
 
+    public EncryptionContext encryptionContext;
+
     public String key;
 }

[pulsar] 15/15: [Test] Cleanup ProxyPublishConsumeTest (#12607)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 91a2b1230223a3e134785dd09d17705fb0ebeb24
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Nov 4 09:51:35 2021 +0800

    [Test] Cleanup ProxyPublishConsumeTest (#12607)
    
    (cherry picked from commit 556ba0ea57a518af931edc935a8a6b4799ead10e)
---
 .../websocket/proxy/ProxyPublishConsumeTest.java   | 23 +++++++++-------------
 1 file changed, 9 insertions(+), 14 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 6386c51..b12f670 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -29,14 +29,12 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -56,11 +54,9 @@ import javax.ws.rs.core.Response;
 
 import lombok.Cleanup;
 import org.apache.pulsar.broker.BrokerTestUtil;
-import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.TopicType;
@@ -82,7 +78,6 @@ import org.glassfish.jersey.client.ClientConfig;
 import org.glassfish.jersey.logging.LoggingFeature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -295,14 +290,14 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
             Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
             consumerFuture.get();
             List<String> subs = admin.topics().getSubscriptions(topic);
-            Assert.assertEquals(subs.size(), 1);
-            Assert.assertEquals(subs.get(0), subscription);
+            assertEquals(subs.size(), 1);
+            assertEquals(subs.get(0), subscription);
             // do unsubscribe
             consumeSocket.unsubscribe();
             //wait for delete
             Thread.sleep(1000);
             subs = admin.topics().getSubscriptions(topic);
-            Assert.assertEquals(subs.size(), 0);
+            assertEquals(subs.size(), 0);
         } finally {
             stopWebSocketClient(consumeClient);
         }
@@ -927,11 +922,11 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
             producer.flush();
             consumeSocket.sendPermits(messages);
             Awaitility.await().untilAsserted(() ->
-                    Assert.assertEquals(consumeSocket.getReceivedMessagesCount(), messages));
+                    assertEquals(consumeSocket.getReceivedMessagesCount(), messages));
 
             // The message should not be acked since we only acked 1 message of the batch message
             Awaitility.await().untilAsserted(() ->
-                    Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions()
+                    assertEquals(admin.topics().getStats(topic).getSubscriptions()
                             .get(subscription).getMsgBacklog(), 0));
 
         } finally {
@@ -976,7 +971,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
             producer.flush();
             consumeSocket.sendPermits(messages);
             Awaitility.await().untilAsserted(() ->
-                    Assert.assertEquals(consumeSocket.getReceivedMessagesCount(), messages));
+                    assertEquals(consumeSocket.getReceivedMessagesCount(), messages));
 
             for (JsonObject msg : consumeSocket.messages) {
                 assertTrue(msg.has("encryptionContext"));
@@ -989,7 +984,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
 
             // The message should not be acked since we only acked 1 message of the batch message
             Awaitility.await().untilAsserted(() ->
-                    Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions()
+                    assertEquals(admin.topics().getStats(topic).getSubscriptions()
                             .get(subscription).getMsgBacklog(), 0));
 
         } finally {
@@ -1051,13 +1046,13 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
         // number of consumers are connected = 2 (one is reader)
         assertEquals(stats.consumerStats.size(), 2);
         ConsumerStats consumerStats = stats.consumerStats.iterator().next();
-        // Assert.assertTrue(consumerStats.numberOfMsgDelivered > 0);
+        assertTrue(consumerStats.numberOfMsgDelivered > 0);
         assertNotNull(consumerStats.remoteConnection);
 
         // number of producers are connected = 1
         assertEquals(stats.producerStats.size(), 1);
         ProducerStats producerStats = stats.producerStats.iterator().next();
-        // Assert.assertTrue(producerStats.numberOfMsgPublished > 0);
+        assertTrue(producerStats.numberOfMsgPublished > 0);
         assertNotNull(producerStats.remoteConnection);
     }
 

[pulsar] 13/15: Fix additional servlets nar might extract to null directory (#12585)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2cc110e1df8505a4bfbc9ea072ee912c9fe29463
Author: Kai Wang <kw...@streamnative.io>
AuthorDate: Thu Nov 4 21:01:24 2021 +0800

    Fix additional servlets nar might extract to null   directory (#12585)
    
    ### Motivation
    The additional servlets use NAR package to implantation plugin mechanism, it need extract to specific directory.
    
    However, the `narExtractionDirectory` is from `Properties`, but the properties has only the configuration in the
    configuration file. The default value of `narExtractionDirectory ` in `ServiceConfiguration` can't be use.
    
    ### Modifications
    When `narExtractionDirectory ` configuration is not set, use `NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR` as default directory.
    
    (cherry picked from commit 9ecd613c0fdaf7f5306fc7f633a4d12218a7a4d2)
---
 .../web/plugin/servlet/AdditionalServlets.java      | 21 ++++++++++++---------
 1 file changed, 12 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlets.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlets.java
index 2451cf5..080e1c7 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlets.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlets.java
@@ -21,13 +21,12 @@ package org.apache.pulsar.broker.web.plugin.servlet;
 import com.google.common.collect.ImmutableMap;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
 import java.util.Map;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
+import org.apache.pulsar.common.nar.NarClassLoader;
 
 /**
  * A collection of loaded additional servlets.
@@ -71,18 +70,23 @@ public class AdditionalServlets implements AutoCloseable {
         if (additionalServlets == null) {
             additionalServlets = conf.getProperties().getProperty(PROXY_ADDITIONAL_SERVLETS);
         }
+
+        String narExtractionDirectory = conf.getProperties().getProperty(NAR_EXTRACTION_DIRECTORY);
+        if(narExtractionDirectory == null) {
+            narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
+        }
+
         if (additionalServletDirectory == null || additionalServlets == null) {
             return null;
         }
 
         AdditionalServletDefinitions definitions =
                 AdditionalServletUtils.searchForServlets(additionalServletDirectory
-                        , null);
+                        , narExtractionDirectory);
         ImmutableMap.Builder<String, AdditionalServletWithClassLoader> builder = ImmutableMap.builder();
 
-        List<String> additionalServletsList = Arrays.asList(additionalServlets.split(","));
-        additionalServletsList.forEach(servletName -> {
-
+        String[] additionalServletsList = additionalServlets.split(",");
+        for (String servletName : additionalServletsList) {
             AdditionalServletMetadata definition = definitions.servlets().get(servletName);
             if (null == definition) {
                 throw new RuntimeException("No additional servlet is found for name `" + servletName
@@ -91,8 +95,7 @@ public class AdditionalServlets implements AutoCloseable {
 
             AdditionalServletWithClassLoader servletWithClassLoader;
             try {
-                servletWithClassLoader = AdditionalServletUtils.load(definition,
-                        conf.getProperties().getProperty(NAR_EXTRACTION_DIRECTORY));
+                servletWithClassLoader = AdditionalServletUtils.load(definition, narExtractionDirectory);
                 if (servletWithClassLoader != null) {
                     builder.put(servletName, servletWithClassLoader);
                 }
@@ -101,7 +104,7 @@ public class AdditionalServlets implements AutoCloseable {
                 log.error("Failed to load the additional servlet for name `" + servletName + "`", e);
                 throw new RuntimeException("Failed to load the additional servlet for name `" + servletName + "`");
             }
-        });
+        }
 
         Map<String, AdditionalServletWithClassLoader> servlets = builder.build();
         if (servlets != null && !servlets.isEmpty()) {

[pulsar] 10/15: [Transaction] Merge transactionBuffer exception into a class (#12358)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 10bbe0a0bb09f3c626cfe94eca765a3806466474
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Nov 4 23:03:13 2021 +0800

    [Transaction] Merge transactionBuffer exception into a class (#12358)
    
    (cherry picked from commit 15a23030a7162ee5d5613d1b3bd4f483c1e8100f)
---
 .../broker/TransactionMetadataStoreService.java    |  6 +-
 .../PersistentDispatcherMultipleConsumers.java     |  4 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |  4 +-
 .../streamingdispatch/StreamingEntryReader.java    |  4 +-
 .../buffer/TransactionBufferReader.java            |  5 +-
 .../broker/transaction/buffer/TransactionMeta.java |  6 +-
 .../NoTxnsCommittedAtLedgerException.java          | 31 --------
 .../exceptions/TransactionNotSealedException.java  | 31 --------
 .../exceptions/TransactionSealedException.java     | 33 --------
 .../exceptions/TransactionStatusException.java     | 37 ---------
 .../exceptions/UnsupportedTxnActionException.java  | 34 --------
 .../buffer/impl/InMemTransactionBuffer.java        | 48 ++++++------
 .../buffer/impl/InMemTransactionBufferReader.java  |  4 +-
 .../exception/TransactionException.java            | 83 ++++++++++++++++++++
 .../buffer/TransactionBufferException.java         | 91 ++++++++++++++++++++++
 .../buffer}/package-info.java                      |  2 +-
 .../TransactionCoordinatorException.java}          | 29 +++++--
 .../coordinator/package-info.java}                 | 12 +--
 .../package-info.java}                             | 13 +---
 .../TransactionPendingAckException.java}           | 25 ++++--
 .../pendingack/package-info.java}                  | 13 +---
 ...ransactionPendingAckStoreProviderException.java | 32 --------
 .../pendingack/impl/MLPendingAckStoreProvider.java |  5 +-
 .../buffer/InMemTransactionBufferReaderTest.java   |  4 +-
 .../transaction/buffer/TransactionBufferTest.java  | 16 ++--
 25 files changed, 278 insertions(+), 294 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 31a8714..607f05e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
-import org.apache.pulsar.broker.transaction.buffer.exceptions.UnsupportedTxnActionException;
+import org.apache.pulsar.broker.transaction.exception.coordinator.TransactionCoordinatorException;
 import org.apache.pulsar.broker.transaction.recover.TransactionRecoverTrackerImpl;
 import org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
 import org.apache.pulsar.client.api.PulsarClientException.BrokerPersistenceException;
@@ -331,8 +331,8 @@ public class TransactionMetadataStoreService {
                 newStatus = ABORTING;
                 break;
             default:
-                UnsupportedTxnActionException exception =
-                        new UnsupportedTxnActionException(txnID, txnAction);
+                TransactionCoordinatorException.UnsupportedTxnActionException exception =
+                        new TransactionCoordinatorException.UnsupportedTxnActionException(txnID, txnAction);
                 LOG.error(exception.getMessage());
                 completableFuture.completeExceptionally(exception);
                 return completableFuture;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 80e3d60..0d06005 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -55,7 +55,7 @@ import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
-import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
+import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -610,7 +610,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 // Notify the consumer only if all the messages were already acknowledged
                 consumerList.forEach(Consumer::reachedEndOfTopic);
             }
-        } else if (exception.getCause() instanceof TransactionNotSealedException) {
+        } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException) {
             waitTimeMillis = 1;
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Error reading transaction entries : {}, Read Type {} - Retrying to read in {} seconds",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 653c1c1..848cbb4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
-import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
+import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.policies.data.DispatchRate;
@@ -469,7 +469,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
                 // Notify the consumer only if all the messages were already acknowledged
                 consumers.forEach(Consumer::reachedEndOfTopic);
             }
-        } else if (exception.getCause() instanceof TransactionNotSealedException) {
+        } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException) {
             waitTimeMillis = 1;
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Error reading transaction entries : {}, - Retrying to read in {} seconds", name,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
index 12f5600..85083da 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
@@ -37,7 +37,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
+import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
 import org.apache.pulsar.client.impl.Backoff;
 
 /**
@@ -197,7 +197,7 @@ public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback, W
         PositionImpl readPosition = pendingReadEntryRequest.position;
         pendingReadEntryRequest.retry++;
         long waitTimeMillis = readFailureBackoff.next();
-        if (exception.getCause() instanceof TransactionNotSealedException) {
+        if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException) {
             waitTimeMillis = 1;
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Error reading transaction entries : {}, - Retrying to read in {} seconds",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferReader.java
index 4547de0..54dafd5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferReader.java
@@ -21,7 +21,8 @@ package org.apache.pulsar.broker.transaction.buffer;
 import com.google.common.annotations.Beta;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.broker.transaction.buffer.exceptions.EndOfTransactionException;
+import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
+
 
 /**
  * A reader to read entries of a given transaction from transaction buffer.
@@ -38,7 +39,7 @@ public interface TransactionBufferReader extends AutoCloseable {
      *
      * @param numEntries the number of entries to read from transaction buffer.
      * @return a future represents the result of the read operations.
-     * @throws EndOfTransactionException if reaching end of the transaction and no
+     * @throws TransactionBufferException.EndOfTransactionException if reaching end of the transaction and no
      *         more entries to return.
      */
     CompletableFuture<List<TransactionEntry>> readNext(int numEntries);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionMeta.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionMeta.java
index 9afc05e..347ab74 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionMeta.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionMeta.java
@@ -22,7 +22,7 @@ import com.google.common.annotations.Beta;
 import java.util.SortedMap;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.Position;
-import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionStatusException;
+import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
 
@@ -57,9 +57,9 @@ public interface TransactionMeta {
      * Return messages number in one transaction.
      *
      * @return the number of transaction messages
-     * @throws TransactionStatusException
+     * @throws TransactionBufferException.TransactionStatusException
      */
-    int numMessageInTxn() throws TransactionStatusException;
+    int numMessageInTxn() throws TransactionBufferException.TransactionStatusException;
 
     /**
      * Return the committed ledger id at data ledger.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/NoTxnsCommittedAtLedgerException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/NoTxnsCommittedAtLedgerException.java
deleted file mode 100644
index b20b29a..0000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/NoTxnsCommittedAtLedgerException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.transaction.buffer.exceptions;
-
-/**
- * Exception is thrown when no transactions found committed at a given ledger.
- */
-public class NoTxnsCommittedAtLedgerException extends TransactionBufferException {
-
-    private static final long serialVersionUID = 0L;
-
-    public NoTxnsCommittedAtLedgerException(String message) {
-        super(message);
-    }
-}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionNotSealedException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionNotSealedException.java
deleted file mode 100644
index c732c88..0000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionNotSealedException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.transaction.buffer.exceptions;
-
-/**
- * Exception is thrown when opening a reader on a transaction that is not sealed yet.
- */
-public class TransactionNotSealedException extends TransactionBufferException {
-
-    private static final long serialVersionUID = 0L;
-
-    public TransactionNotSealedException(String message) {
-        super(message);
-    }
-}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionSealedException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionSealedException.java
deleted file mode 100644
index 13fdec5..0000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionSealedException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.transaction.buffer.exceptions;
-
-/**
- * Exception thrown if a transaction is already sealed.
- *
- * <p>If a transaction is sealed, no more entries should be appended to this transaction.
- */
-public class TransactionSealedException extends TransactionBufferException {
-
-    private static final long serialVersionUID = 5366602873819540477L;
-
-    public TransactionSealedException(String message) {
-        super(message);
-    }
-}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java
deleted file mode 100644
index 008f27f..0000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.transaction.buffer.exceptions;
-
-import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
-
-/**
- * Exceptions are thrown when operations are applied to a transaction which is not in expected txn status.
- */
-public class TransactionStatusException extends TransactionBufferException {
-
-    private static final long serialVersionUID = 0L;
-
-    public TransactionStatusException(TxnID txnId,
-                                      TxnStatus expectedStatus,
-                                      TxnStatus actualStatus) {
-        super("Transaction `" + txnId + "` is not in an expected status `" + expectedStatus
-            + "`, but is in status `" + actualStatus + "`");
-    }
-}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/UnsupportedTxnActionException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/UnsupportedTxnActionException.java
deleted file mode 100644
index e4a8341..0000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/UnsupportedTxnActionException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.transaction.buffer.exceptions;
-
-import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.common.api.proto.TxnAction;
-
-/**
- * Exceptions are thrown when txnAction is unsupported.
- */
-public class UnsupportedTxnActionException extends TransactionBufferException {
-
-    private static final long serialVersionUID = 0L;
-
-    public UnsupportedTxnActionException(TxnID txnId, int txnAction) {
-        super("Transaction `" + txnId + "` receive unsupported txnAction " + TxnAction.valueOf(txnAction));
-    }
-}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 2c32d0b..f66b263 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -36,10 +36,7 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
 import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
-import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotFoundException;
-import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
-import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionSealedException;
-import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionStatusException;
+import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.policies.data.TransactionBufferStats;
 import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
@@ -86,7 +83,7 @@ class InMemTransactionBuffer implements TransactionBuffer {
         }
 
         @Override
-        public int numMessageInTxn() throws TransactionStatusException {
+        public int numMessageInTxn() throws TransactionBufferException.TransactionStatusException {
             return -1;
         }
 
@@ -125,7 +122,7 @@ class InMemTransactionBuffer implements TransactionBuffer {
         public CompletableFuture<TransactionMeta> commitTxn(long committedAtLedgerId, long committedAtEntryId) {
             try {
                 return CompletableFuture.completedFuture(commitAt(committedAtLedgerId, committedAtEntryId));
-            } catch (TransactionStatusException e) {
+            } catch (TransactionBufferException.TransactionStatusException e) {
                 return FutureUtil.failedFuture(e);
             }
         }
@@ -134,23 +131,23 @@ class InMemTransactionBuffer implements TransactionBuffer {
         public CompletableFuture<TransactionMeta> abortTxn() {
             try {
                 return CompletableFuture.completedFuture(abort());
-            } catch (TransactionStatusException e) {
+            } catch (TransactionBufferException.TransactionStatusException e) {
                 return FutureUtil.failedFuture(e);
             }
         }
 
-        synchronized TxnBuffer abort() throws TransactionStatusException {
+        synchronized TxnBuffer abort() throws TransactionBufferException.TransactionStatusException {
             if (TxnStatus.OPEN != status) {
-                throw new TransactionStatusException(txnid, TxnStatus.OPEN, status);
+                throw new TransactionBufferException.TransactionStatusException(txnid, TxnStatus.OPEN, status);
             }
             this.status = TxnStatus.ABORTED;
             return this;
         }
 
         synchronized TxnBuffer commitAt(long committedAtLedgerId, long committedAtEntryId)
-                throws TransactionStatusException {
+                throws TransactionBufferException.TransactionStatusException {
             if (TxnStatus.OPEN != status) {
-                throw new TransactionStatusException(txnid, TxnStatus.OPEN, status);
+                throw new TransactionBufferException.TransactionStatusException(txnid, TxnStatus.OPEN, status);
             }
 
             this.committedAtLedgerId = committedAtLedgerId;
@@ -170,11 +167,13 @@ class InMemTransactionBuffer implements TransactionBuffer {
             }
         }
 
-        public void appendEntry(long sequenceId, ByteBuf entry) throws TransactionSealedException {
+        public void appendEntry(long sequenceId, ByteBuf entry) throws
+                TransactionBufferException.TransactionSealedException {
             synchronized (this) {
                 if (TxnStatus.OPEN != status) {
                     // the transaction is not open anymore, reject the append operations
-                    throw new TransactionSealedException("Transaction `" + txnid + "` is already sealed");
+                    throw new TransactionBufferException
+                            .TransactionSealedException("Transaction `" + txnid + "` is already sealed");
                 }
             }
 
@@ -183,11 +182,13 @@ class InMemTransactionBuffer implements TransactionBuffer {
             }
         }
 
-        public TransactionBufferReader newReader(long sequenceId) throws TransactionNotSealedException {
+        public TransactionBufferReader newReader(long sequenceId) throws
+                TransactionBufferException.TransactionNotSealedException {
             synchronized (this) {
                 if (TxnStatus.COMMITTED != status) {
                     // the transaction is not committed yet, hence the buffer is not sealed
-                    throw new TransactionNotSealedException("Transaction `" + txnid + "` is not sealed yet");
+                    throw new TransactionBufferException
+                            .TransactionNotSealedException("Transaction `" + txnid + "` is not sealed yet");
                 }
             }
 
@@ -220,17 +221,17 @@ class InMemTransactionBuffer implements TransactionBuffer {
         CompletableFuture<TransactionMeta> getFuture = new CompletableFuture<>();
         try {
             getFuture.complete(getTxnBufferOrThrowNotFoundException(txnID));
-        } catch (TransactionNotFoundException e) {
+        } catch (TransactionBufferException.TransactionNotFoundException e) {
             getFuture.completeExceptionally(e);
         }
         return getFuture;
     }
 
     private TxnBuffer getTxnBufferOrThrowNotFoundException(TxnID txnID)
-            throws TransactionNotFoundException {
+            throws TransactionBufferException.TransactionNotFoundException {
         TxnBuffer buffer = buffers.get(txnID);
         if (null == buffer) {
-            throw new TransactionNotFoundException(
+            throw new TransactionBufferException.TransactionNotFoundException(
                 "Transaction `" + txnID + "` doesn't exist in the transaction buffer");
         }
         return buffer;
@@ -262,7 +263,7 @@ class InMemTransactionBuffer implements TransactionBuffer {
         try {
             txnBuffer.appendEntry(sequenceId, buffer);
             appendFuture.complete(null);
-        } catch (TransactionSealedException e) {
+        } catch (TransactionBufferException.TransactionSealedException e) {
             appendFuture.completeExceptionally(e);
         }
         return appendFuture;
@@ -276,7 +277,8 @@ class InMemTransactionBuffer implements TransactionBuffer {
             TxnBuffer txnBuffer = getTxnBufferOrThrowNotFoundException(txnID);
             TransactionBufferReader reader = txnBuffer.newReader(startSequenceId);
             openFuture.complete(reader);
-        } catch (TransactionNotFoundException | TransactionNotSealedException e) {
+        } catch (TransactionBufferException.TransactionNotFoundException
+                | TransactionBufferException.TransactionNotSealedException e) {
             openFuture.completeExceptionally(e);
         }
         return openFuture;
@@ -295,7 +297,8 @@ class InMemTransactionBuffer implements TransactionBuffer {
                 addTxnToTxnIdex(txnID, committedAtLedgerId);
             }
             commitFuture.complete(null);
-        } catch (TransactionNotFoundException | TransactionStatusException e) {
+        } catch (TransactionBufferException.TransactionNotFoundException
+                | TransactionBufferException.TransactionStatusException e) {
             commitFuture.completeExceptionally(e);
         }
         return commitFuture;
@@ -318,7 +321,8 @@ class InMemTransactionBuffer implements TransactionBuffer {
             txnBuffer.abort();
             buffers.remove(txnID, txnBuffer);
             abortFuture.complete(null);
-        } catch (TransactionNotFoundException | TransactionStatusException e) {
+        } catch (TransactionBufferException.TransactionNotFoundException
+                | TransactionBufferException.TransactionStatusException e) {
             abortFuture.completeExceptionally(e);
         }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferReader.java
index 91b095e..d81ce02 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferReader.java
@@ -27,7 +27,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
 import org.apache.pulsar.broker.transaction.buffer.TransactionEntry;
-import org.apache.pulsar.broker.transaction.buffer.exceptions.EndOfTransactionException;
+import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 
 /**
@@ -78,7 +78,7 @@ public class InMemTransactionBufferReader implements TransactionBufferReader {
         }
 
         if (txnEntries.isEmpty()) {
-            readFuture.completeExceptionally(new EndOfTransactionException(
+            readFuture.completeExceptionally(new TransactionBufferException.EndOfTransactionException(
                 "No more entries found in transaction `" + txnId + "`"
             ));
         } else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/TransactionException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/TransactionException.java
new file mode 100644
index 0000000..f81fc8a
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/TransactionException.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.exception;
+
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
+
+/**
+ * The base exception class for the errors thrown from Transaction.
+ */
+public abstract class TransactionException extends Exception {
+
+    private static final long serialVersionUID = 0L;
+
+    public TransactionException(String message) {
+        super(message);
+    }
+
+    public TransactionException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public TransactionException(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * Exception is thrown when opening a reader on a transaction that is not sealed yet.
+     */
+    public static class TransactionNotSealedException extends TransactionException {
+
+        private static final long serialVersionUID = 0L;
+
+        public TransactionNotSealedException(String message) {
+            super(message);
+        }
+    }
+
+    /**
+     * Exception thrown if a transaction is already sealed.
+     *
+     * <p>If a transaction is sealed, no more entries should be appended to this transaction.
+     */
+    public static class TransactionSealedException extends TransactionException {
+
+        private static final long serialVersionUID = 5366602873819540477L;
+
+        public TransactionSealedException(String message) {
+            super(message);
+        }
+    }
+
+    /**
+     * Exceptions are thrown when operations are applied to a transaction which is not in expected txn status.
+     */
+    public static class TransactionStatusException extends TransactionException {
+
+        private static final long serialVersionUID = 0L;
+
+        public TransactionStatusException(TxnID txnId,
+                                          TxnStatus expectedStatus,
+                                          TxnStatus actualStatus) {
+            super("Transaction `q" + txnId + "` is not in an expected status `" + expectedStatus
+                    + "`, but is in status `" + actualStatus + "`");
+        }
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/buffer/TransactionBufferException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/buffer/TransactionBufferException.java
new file mode 100644
index 0000000..ab301c9
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/buffer/TransactionBufferException.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.exception.buffer;
+
+import org.apache.pulsar.broker.transaction.exception.TransactionException;
+
+/**
+ * The base exception class for the errors thrown from Transaction Buffer.
+ */
+public abstract class TransactionBufferException extends TransactionException {
+
+    private static final long serialVersionUID = 0L;
+
+    public TransactionBufferException(String message) {
+        super(message);
+    }
+
+    public TransactionBufferException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public TransactionBufferException(Throwable cause) {
+        super(cause);
+    }
+
+
+    /**
+     * Exception thrown when reaching end of a transaction.
+     */
+    public static class EndOfTransactionException extends TransactionBufferException {
+
+        private static final long serialVersionUID = 0L;
+
+        public EndOfTransactionException(String message) {
+            super(message);
+        }
+    }
+
+    /**
+     * Exception is thrown when no transactions found committed at a given ledger.
+     */
+    public class NoTxnsCommittedAtLedgerException extends TransactionBufferException {
+
+        private static final long serialVersionUID = 0L;
+
+        public NoTxnsCommittedAtLedgerException(String message) {
+            super(message);
+        }
+    }
+
+    /**
+     * Transaction buffer provider exception.
+     */
+    public class TransactionBufferProviderException extends TransactionBufferException {
+
+        public TransactionBufferProviderException(String message) {
+            super(message);
+        }
+
+    }
+
+    /**
+     * Exception is thrown when the transaction is not found in the transaction buffer.
+     */
+    public static class TransactionNotFoundException extends TransactionBufferException {
+
+        private static final long serialVersionUID = 0L;
+
+        public TransactionNotFoundException(String message) {
+            super(message);
+        }
+    }
+
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/buffer/package-info.java
similarity index 93%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/package-info.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/buffer/package-info.java
index 2aee740..60bb5ee2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/package-info.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/buffer/package-info.java
@@ -19,4 +19,4 @@
 /**
  * Exceptions thrown when encountering errors in transaction buffer.
  */
-package org.apache.pulsar.broker.transaction.buffer.exceptions;
+package org.apache.pulsar.broker.transaction.exception.buffer;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionBufferException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/coordinator/TransactionCoordinatorException.java
similarity index 51%
copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionBufferException.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/coordinator/TransactionCoordinatorException.java
index 3ecbf2b..8da04ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionBufferException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/coordinator/TransactionCoordinatorException.java
@@ -16,24 +16,41 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.exceptions;
+package org.apache.pulsar.broker.transaction.exception.coordinator;
+
+import org.apache.pulsar.broker.transaction.exception.TransactionException;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.TxnAction;
 
 /**
- * The base exception class for the errors thrown from Transaction Buffer.
+ * The base exception class for the errors thrown from Transaction Coordinator.
  */
-public abstract class TransactionBufferException extends Exception {
+public abstract class TransactionCoordinatorException extends TransactionException {
 
     private static final long serialVersionUID = 0L;
 
-    public TransactionBufferException(String message) {
+    public TransactionCoordinatorException(String message) {
         super(message);
     }
 
-    public TransactionBufferException(String message, Throwable cause) {
+    public TransactionCoordinatorException(String message, Throwable cause) {
         super(message, cause);
     }
 
-    public TransactionBufferException(Throwable cause) {
+    public TransactionCoordinatorException(Throwable cause) {
         super(cause);
     }
+
+
+    /**
+     * Exceptions are thrown when txnAction is unsupported.
+     */
+    public static class UnsupportedTxnActionException extends TransactionCoordinatorException {
+
+        private static final long serialVersionUID = 0L;
+
+        public UnsupportedTxnActionException(TxnID txnId, int txnAction) {
+            super("Transaction `" + txnId + "` receive unsupported txnAction " + TxnAction.valueOf(txnAction));
+        }
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionBufferProviderException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/coordinator/package-info.java
similarity index 73%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionBufferProviderException.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/coordinator/package-info.java
index e7a7a62..ceaff6a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionBufferProviderException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/coordinator/package-info.java
@@ -16,15 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.exceptions;
-
 /**
- * Transaction buffer provider exception.
+ * Exceptions thrown when encountering errors in transaction buffer.
  */
-public class TransactionBufferProviderException extends TransactionBufferException {
-
-    public TransactionBufferProviderException(String message) {
-        super(message);
-    }
-
-}
+package org.apache.pulsar.broker.transaction.exception.coordinator;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/EndOfTransactionException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/package-info.java
similarity index 70%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/EndOfTransactionException.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/package-info.java
index 57fce9a..222d871 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/EndOfTransactionException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/package-info.java
@@ -16,16 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.exceptions;
-
 /**
- * Exception thrown when reaching end of a transaction.
+ * Exceptions thrown when encountering errors in transaction buffer.
  */
-public class EndOfTransactionException extends TransactionBufferException {
-
-    private static final long serialVersionUID = 0L;
-
-    public EndOfTransactionException(String message) {
-        super(message);
-    }
-}
+package org.apache.pulsar.broker.transaction.exception;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionBufferException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/pendingack/TransactionPendingAckException.java
similarity index 58%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionBufferException.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/pendingack/TransactionPendingAckException.java
index 3ecbf2b..74999bf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionBufferException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/pendingack/TransactionPendingAckException.java
@@ -16,24 +16,37 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.exceptions;
+package org.apache.pulsar.broker.transaction.exception.pendingack;
+
+import org.apache.pulsar.broker.transaction.exception.TransactionException;
 
 /**
- * The base exception class for the errors thrown from Transaction Buffer.
+ * The base exception class for the errors thrown from Transaction Pending ACk.
  */
-public abstract class TransactionBufferException extends Exception {
+public abstract class TransactionPendingAckException extends TransactionException {
 
     private static final long serialVersionUID = 0L;
 
-    public TransactionBufferException(String message) {
+    public TransactionPendingAckException(String message) {
         super(message);
     }
 
-    public TransactionBufferException(String message, Throwable cause) {
+    public TransactionPendingAckException(String message, Throwable cause) {
         super(message, cause);
     }
 
-    public TransactionBufferException(Throwable cause) {
+    public TransactionPendingAckException(Throwable cause) {
         super(cause);
     }
+
+    /**
+     * Transaction pending ack store provider exception.
+     */
+    public static class TransactionPendingAckStoreProviderException extends TransactionPendingAckException {
+
+        public TransactionPendingAckStoreProviderException(String message) {
+            super(message);
+        }
+
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionNotFoundException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/pendingack/package-info.java
similarity index 68%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionNotFoundException.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/pendingack/package-info.java
index 0f1dc46..0d0df63 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionNotFoundException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/exception/pendingack/package-info.java
@@ -16,16 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.exceptions;
-
 /**
- * Exception is thrown when the transaction is not found in the transaction buffer.
+ * Exceptions thrown when encountering errors in transaction buffer.
  */
-public class TransactionNotFoundException extends TransactionBufferException {
-
-    private static final long serialVersionUID = 0L;
-
-    public TransactionNotFoundException(String message) {
-        super(message);
-    }
-}
+package org.apache.pulsar.broker.transaction.exception.pendingack;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/exceptions/TransactionPendingAckStoreProviderException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/exceptions/TransactionPendingAckStoreProviderException.java
deleted file mode 100644
index 0f41e2c..0000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/exceptions/TransactionPendingAckStoreProviderException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.transaction.pendingack.exceptions;
-
-import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionBufferException;
-
-/**
- * Transaction pending ack store provider exception.
- */
-public class TransactionPendingAckStoreProviderException extends TransactionBufferException {
-
-    public TransactionPendingAckStoreProviderException(String message) {
-        super(message);
-    }
-
-}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
index 548dd8a..5417cae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
@@ -26,9 +26,9 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.exception.pendingack.TransactionPendingAckException;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
-import org.apache.pulsar.broker.transaction.pendingack.exceptions.TransactionPendingAckStoreProviderException;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.naming.TopicName;
 
@@ -45,7 +45,8 @@ public class MLPendingAckStoreProvider implements TransactionPendingAckStoreProv
 
         if (subscription == null) {
             pendingAckStoreFuture.completeExceptionally(
-                    new TransactionPendingAckStoreProviderException("The subscription is null."));
+                    new TransactionPendingAckException
+                            .TransactionPendingAckStoreProviderException("The subscription is null."));
             return pendingAckStoreFuture;
         }
         PersistentTopic originPersistentTopic = (PersistentTopic) subscription.getTopic();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/InMemTransactionBufferReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/InMemTransactionBufferReaderTest.java
index 513a78d..d4234fb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/InMemTransactionBufferReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/InMemTransactionBufferReaderTest.java
@@ -33,7 +33,7 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
-import org.apache.pulsar.broker.transaction.buffer.exceptions.EndOfTransactionException;
+import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
 import org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBufferReader;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.testng.annotations.Test;
@@ -116,7 +116,7 @@ public class InMemTransactionBufferReaderTest {
                 reader.readNext(1).get();
                 fail("should fail to read entries if there is no more in the transaction buffer");
             } catch (ExecutionException ee) {
-                assertTrue(ee.getCause() instanceof EndOfTransactionException);
+                assertTrue(ee.getCause() instanceof TransactionBufferException.EndOfTransactionException);
             }
         }
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java
index 2238490..c10444a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java
@@ -33,11 +33,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
 import org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBufferProvider;
 import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotFoundException;
-import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
-import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionStatusException;
 import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -86,7 +84,7 @@ public class TransactionBufferTest {
             buffer.openTransactionBufferReader(txnId, 0L).get();
             fail("Should fail to open reader if a transaction doesn't exist");
         } catch (ExecutionException ee) {
-            assertTrue(ee.getCause() instanceof TransactionNotFoundException);
+            assertTrue(ee.getCause() instanceof TransactionBufferException.TransactionNotFoundException);
         }
     }
 
@@ -102,7 +100,7 @@ public class TransactionBufferTest {
             buffer.openTransactionBufferReader(txnId, 0L).get();
             fail("Should fail to open a reader on an OPEN transaction");
         } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof TransactionNotSealedException);
+            assertTrue(e.getCause() instanceof TransactionBufferException.TransactionNotSealedException);
         }
     }
 
@@ -136,7 +134,7 @@ public class TransactionBufferTest {
             buffer.commitTxn(txnId, Long.MIN_VALUE).get();
             fail("Should fail to commit a transaction if it doesn't exist");
         } catch (ExecutionException ee) {
-            assertTrue(ee.getCause() instanceof TransactionNotFoundException);
+            assertTrue(ee.getCause() instanceof TransactionBufferException.TransactionNotFoundException);
         }
     }
 
@@ -160,7 +158,7 @@ public class TransactionBufferTest {
             buffer.abortTxn(txnId, Long.MIN_VALUE).get();
             fail("Should fail to abort a transaction if it doesn't exist");
         } catch (ExecutionException ee) {
-            assertTrue(ee.getCause() instanceof TransactionNotFoundException);
+            assertTrue(ee.getCause() instanceof TransactionBufferException.TransactionNotFoundException);
         }
     }
 
@@ -181,7 +179,7 @@ public class TransactionBufferTest {
             buffer.abortTxn(txnId, Long.MIN_VALUE).get();
             fail("Should fail to abort a committed transaction");
         } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof TransactionStatusException);
+            assertTrue(e.getCause() instanceof TransactionBufferException.TransactionStatusException);
         }
         txnMeta = buffer.getTransactionMeta(txnId).get();
         assertEquals(txnId, txnMeta.id());
@@ -277,7 +275,7 @@ public class TransactionBufferTest {
             buffer.getTransactionMeta(txnID).get();
             fail("Should fail to get transaction metadata if it doesn't exist");
         } catch (ExecutionException ee) {
-            assertTrue(ee.getCause() instanceof TransactionNotFoundException);
+            assertTrue(ee.getCause() instanceof TransactionBufferException.TransactionNotFoundException);
         }
     }
 }

[pulsar] 07/15: [CAPI] Support setting priority for consumers (#12526)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 33b5d5fa5b965fc7da52542cb6a7f004e852a408
Author: Yuto Furuta <mz...@gmail.com>
AuthorDate: Tue Nov 2 21:05:31 2021 +0900

    [CAPI] Support setting priority for consumers (#12526)
    
    ### Motivation
    
    We would like to make it possible for C API based client library to set the priority level for consumers.
    
    ### Modifications
    
    Add methods to set/get priority level for consumers.
    
    Co-authored-by: k2la <yf...@yahoo-corp.jp>
    (cherry picked from commit a75a9746a12a6461c87518434f0f550563718e9d)
---
 pulsar-client-cpp/include/pulsar/c/consumer_configuration.h |  6 ++++++
 pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc          | 10 ++++++++++
 2 files changed, 16 insertions(+)

diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
index efe353a..a11e11e 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -275,6 +275,12 @@ PULSAR_PUBLIC void pulsar_consumer_set_subscription_initial_position(
 PULSAR_PUBLIC void pulsar_consumer_configuration_set_property(pulsar_consumer_configuration_t *conf,
                                                               const char *name, const char *value);
 
+PULSAR_PUBLIC void pulsar_consumer_configuration_set_priority_level(
+    pulsar_consumer_configuration_t *consumer_configuration, int priority_level);
+
+PULSAR_PUBLIC int pulsar_consumer_configuration_get_priority_level(
+    pulsar_consumer_configuration_t *consumer_configuration);
+
 // const CryptoKeyReaderPtr getCryptoKeyReader()
 //
 // const;
diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
index 90c60df..aaec12c 100644
--- a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
@@ -185,3 +185,13 @@ int pulsar_consumer_get_subscription_initial_position(
     pulsar_consumer_configuration_t *consumer_configuration) {
     return consumer_configuration->consumerConfiguration.getSubscriptionInitialPosition();
 }
+
+void pulsar_consumer_configuration_set_priority_level(pulsar_consumer_configuration_t *consumer_configuration,
+                                                      int priority_level) {
+    consumer_configuration->consumerConfiguration.setPriorityLevel(priority_level);
+}
+
+int pulsar_consumer_configuration_get_priority_level(
+    pulsar_consumer_configuration_t *consumer_configuration) {
+    return consumer_configuration->consumerConfiguration.getPriorityLevel();
+}

[pulsar] 11/15: [docs] Fix doc for pulsar-admin bookies cmd (#12542)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0b5430af68b1b7082569009b291f323ea4bae35e
Author: litao <to...@gmail.com>
AuthorDate: Thu Nov 4 22:50:38 2021 +0800

    [docs] Fix doc for pulsar-admin bookies cmd (#12542)
    
    (cherry picked from commit 938dd54ce6f4633e102df9e2ae1f35daa0fdb2bd)
---
 .../src/main/java/org/apache/pulsar/admin/cli/CmdBookies.java         | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBookies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBookies.java
index e6551b9..c15e8db 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBookies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBookies.java
@@ -41,7 +41,7 @@ public class CmdBookies extends CmdBase {
     @Parameters(commandDescription = "Gets the rack placement information for a specific bookie in the cluster")
     private class GetBookie extends CliCommand {
 
-        @Parameter(names = { "-b", "--bookie" }, description = "bookie address", required = true)
+        @Parameter(names = { "-b", "--bookie" }, description = "Bookie address (format: `address:port`)", required = true)
         private String bookieAddress;
 
         @Override
@@ -62,7 +62,7 @@ public class CmdBookies extends CmdBase {
     @Parameters(commandDescription = "Remove rack placement information for a specific bookie in the cluster")
     private class RemoveBookie extends CliCommand {
 
-        @Parameter(names = { "-b", "--bookie" }, description = "bookie address", required = true)
+        @Parameter(names = { "-b", "--bookie" }, description = "Bookie address (format: `address:port`)", required = true)
         private String bookieAddress;
 
         @Override