You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 06:58:59 UTC

[pulsar] 02/07: Increase the test stability of transactionTest (#11541)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3d57af10aa9fe7b76f65ec2b3755fe0b43efebef
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Aug 5 13:32:57 2021 +0800

    Increase the test stability of transactionTest (#11541)
    
    (cherry picked from commit bbe08fb97b468336c694627d79a799dd5560e4ef)
---
 .../TopicTransactionBufferRecoverTest.java         |  9 +++++---
 .../TransactionClientReconnectTest.java            |  6 ++++--
 .../broker/transaction/TransactionProduceTest.java |  9 +++++---
 .../pulsar/broker/transaction/TransactionTest.java |  4 ++--
 .../broker/transaction/TransactionTestBase.java    | 24 ++++++++++++++--------
 .../pendingack/PendingAckInMemoryDeleteTest.java   | 19 ++++-------------
 .../pendingack/PendingAckPersistentTest.java       |  5 +++--
 .../client/impl/TransactionEndToEndTest.java       |  9 ++++----
 8 files changed, 45 insertions(+), 40 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 6a3aaa1..d79d3c8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -79,9 +79,10 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
     private static final String SUBSCRIPTION_NAME = "test-recover";
     private static final String TAKE_SNAPSHOT = NAMESPACE1 + "/take-snapshot";
     private static final String ABORT_DELETE = NAMESPACE1 + "/abort-delete";
-
+    private static final int NUM_PARTITIONS = 16;
     @BeforeMethod
     protected void setup() throws Exception {
+        setBrokerCount(1);
         internalSetup();
 
         String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
@@ -97,7 +98,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
                 new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
         admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
+        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
 
         if (pulsarClient != null) {
             pulsarClient.shutdown();
@@ -108,7 +109,9 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
                 .enableTransaction(true)
                 .build();
 
-        Thread.sleep(1000 * 3);
+
+        // wait tc init success to ready state
+        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
index c13dff7..8df470d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
@@ -46,7 +46,7 @@ import static org.testng.FileAssert.fail;
 public class TransactionClientReconnectTest extends TransactionTestBase {
 
     private static final String RECONNECT_TOPIC = "persistent://public/txn/txn-client-reconnect-test";
-
+    private static final int NUM_PARTITIONS = 1;
     @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
         setBrokerCount(1);
@@ -63,13 +63,15 @@ public class TransactionClientReconnectTest extends TransactionTestBase {
         admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
         admin.topics().createNonPartitionedTopic(RECONNECT_TOPIC);
         admin.topics().createSubscription(RECONNECT_TOPIC, "test", MessageId.latest);
-        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
+        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
 
         pulsarClient = PulsarClient.builder()
                 .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
                 .statsInterval(0, TimeUnit.SECONDS)
                 .enableTransaction(true)
                 .build();
+        // wait tc init success to ready state
+        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index e66525e..6729ff1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -81,9 +81,10 @@ public class TransactionProduceTest extends TransactionTestBase {
     private static final String PRODUCE_ABORT_TOPIC = NAMESPACE1 + "/produce-abort";
     private static final String ACK_COMMIT_TOPIC = NAMESPACE1 + "/ack-commit";
     private static final String ACK_ABORT_TOPIC = NAMESPACE1 + "/ack-abort";
-
+    private static final int NUM_PARTITIONS = 16;
     @BeforeMethod
     protected void setup() throws Exception {
+        setBrokerCount(1);
         internalSetup();
 
         String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
@@ -100,7 +101,7 @@ public class TransactionProduceTest extends TransactionTestBase {
         admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
                 new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
         admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
+        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
 
         if (pulsarClient != null) {
             pulsarClient.shutdown();
@@ -111,7 +112,9 @@ public class TransactionProduceTest extends TransactionTestBase {
                 .enableTransaction(true)
                 .build();
 
-        Thread.sleep(1000 * 3);
+
+        // wait tc init success to ready state
+        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index a319c5d..fb94638 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -93,6 +93,8 @@ public class TransactionTest extends TransactionTestBase {
                 .statsInterval(0, TimeUnit.SECONDS)
                 .enableTransaction(true)
                 .build();
+        // wait tc init success to ready state
+        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
     }
 
     @Test
@@ -159,8 +161,6 @@ public class TransactionTest extends TransactionTestBase {
 
     @Test
     public void testGetTxnID() throws Exception {
-        // wait tc init success to ready state
-        Assert.assertTrue(waitForCoordinatorToBeAvailable(NUM_BROKERS, NUM_PARTITIONS));
         Transaction transaction = pulsarClient.newTransaction()
                 .build().get();
         TxnID txnID = transaction.getTxnID();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 43c5edb..0a694c8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -296,16 +296,22 @@ public abstract class TransactionTestBase extends TestRetrySupport {
             log.warn("Failed to clean up mocked pulsar service:", e);
         }
     }
-    public boolean waitForCoordinatorToBeAvailable(int numOfBroker, int numOfTCPerBroker){
+    public void waitForCoordinatorToBeAvailable(int numOfTCPerBroker){
         // wait tc init success to ready state
-        Awaitility.await().untilAsserted(() -> {
-            TransactionMetadataStore transactionMetadataStore =
-                    getPulsarServiceList().get(numOfBroker - 1).getTransactionMetadataStoreService()
-                            .getStores().get(TransactionCoordinatorID.get(numOfTCPerBroker - 1));
-            assertNotNull(transactionMetadataStore);
-            assertEquals(((MLTransactionMetadataStore) transactionMetadataStore).getState(),
-                    TransactionMetadataStoreState.State.Ready);
+        Awaitility.await().until(() -> {
+            Map<TransactionCoordinatorID, TransactionMetadataStore> stores =
+                    getPulsarServiceList().get(brokerCount-1).getTransactionMetadataStoreService().getStores();
+            if (stores.size() == numOfTCPerBroker) {
+                for (TransactionCoordinatorID transactionCoordinatorID : stores.keySet()) {
+                    if (((MLTransactionMetadataStore) stores.get(transactionCoordinatorID)).getState()
+                            != TransactionMetadataStoreState.State.Ready) {
+                        return false;
+                    }
+                }
+                return true;
+            } else {
+                return false;
+            }
         });
-        return true;
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
index 9cdb219..fc952c4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
@@ -73,7 +73,7 @@ public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
 
     private static final String TENANT = "tnx";
     private static final String NAMESPACE1 = TENANT + "/ns1";
-
+    private static final int NUM_PARTITIONS = 16;
     @BeforeMethod
     protected void setup() throws Exception {
         setBrokerCount(1);
@@ -89,7 +89,7 @@ public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
                 new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
         admin.namespaces().createNamespace(NAMESPACE1);
         admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
+        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
 
         if (pulsarClient != null) {
             pulsarClient.shutdown();
@@ -102,19 +102,8 @@ public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
 
         Map<TransactionCoordinatorID, TransactionMetadataStore> stores =
                 getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores();
-        Awaitility.await().until(() -> {
-            if (stores.size() == 16) {
-                for (TransactionCoordinatorID transactionCoordinatorID : stores.keySet()) {
-                    if (((MLTransactionMetadataStore) stores.get(transactionCoordinatorID)).getState()
-                            != TransactionMetadataStoreState.State.Ready) {
-                        return false;
-                    }
-                }
-                return true;
-            } else {
-                return false;
-            }
-        });
+        // wait tc init success to ready state
+        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 4cdef2e..46c65b8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -64,6 +64,7 @@ public class PendingAckPersistentTest extends TransactionTestBase {
 
     private static final String PENDING_ACK_REPLAY_TOPIC = "persistent://public/txn/pending-ack-replay";
     private static final String NAMESPACE = "public/txn";
+    private static final int NUM_PARTITIONS = 16;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -87,8 +88,8 @@ public class PendingAckPersistentTest extends TransactionTestBase {
                 .statsInterval(0, TimeUnit.SECONDS)
                 .enableTransaction(true)
                 .build();
-
-        Thread.sleep(1000 * 3);
+        // wait tc init success to ready state
+        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 3da8a6c..f971b5a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -93,9 +93,10 @@ public class TransactionEndToEndTest extends TransactionTestBase {
     private static final String NAMESPACE1 = TENANT + "/ns1";
     private static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
     private static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test";
-
+    private static final int NUM_PARTITIONS = 16;
     @BeforeMethod
     protected void setup() throws Exception {
+        setBrokerCount(1);
         internalSetup();
 
         String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
@@ -110,7 +111,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
                 new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
         admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
+        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
 
         if (pulsarClient != null) {
             pulsarClient.close();
@@ -121,8 +122,8 @@ public class TransactionEndToEndTest extends TransactionTestBase {
                 .enableTransaction(true)
                 .build();
 
-        Thread.sleep(1000 * 3);
-    }
+        // wait tc init success to ready state
+        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);    }
 
     @AfterMethod(alwaysRun = true)
     protected void cleanup() {