You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 06:58:59 UTC
[pulsar] 02/07: Increase the test stability of transactionTest
(#11541)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3d57af10aa9fe7b76f65ec2b3755fe0b43efebef
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Aug 5 13:32:57 2021 +0800
Increase the test stability of transactionTest (#11541)
(cherry picked from commit bbe08fb97b468336c694627d79a799dd5560e4ef)
---
.../TopicTransactionBufferRecoverTest.java | 9 +++++---
.../TransactionClientReconnectTest.java | 6 ++++--
.../broker/transaction/TransactionProduceTest.java | 9 +++++---
.../pulsar/broker/transaction/TransactionTest.java | 4 ++--
.../broker/transaction/TransactionTestBase.java | 24 ++++++++++++++--------
.../pendingack/PendingAckInMemoryDeleteTest.java | 19 ++++-------------
.../pendingack/PendingAckPersistentTest.java | 5 +++--
.../client/impl/TransactionEndToEndTest.java | 9 ++++----
8 files changed, 45 insertions(+), 40 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 6a3aaa1..d79d3c8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -79,9 +79,10 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
private static final String SUBSCRIPTION_NAME = "test-recover";
private static final String TAKE_SNAPSHOT = NAMESPACE1 + "/take-snapshot";
private static final String ABORT_DELETE = NAMESPACE1 + "/abort-delete";
-
+ private static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
+ setBrokerCount(1);
internalSetup();
String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
@@ -97,7 +98,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
- admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
+ admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
if (pulsarClient != null) {
pulsarClient.shutdown();
@@ -108,7 +109,9 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
.enableTransaction(true)
.build();
- Thread.sleep(1000 * 3);
+
+ // wait tc init success to ready state
+ waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
}
@AfterMethod(alwaysRun = true)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
index c13dff7..8df470d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
@@ -46,7 +46,7 @@ import static org.testng.FileAssert.fail;
public class TransactionClientReconnectTest extends TransactionTestBase {
private static final String RECONNECT_TOPIC = "persistent://public/txn/txn-client-reconnect-test";
-
+ private static final int NUM_PARTITIONS = 1;
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
setBrokerCount(1);
@@ -63,13 +63,15 @@ public class TransactionClientReconnectTest extends TransactionTestBase {
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
admin.topics().createNonPartitionedTopic(RECONNECT_TOPIC);
admin.topics().createSubscription(RECONNECT_TOPIC, "test", MessageId.latest);
- admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
+ admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
pulsarClient = PulsarClient.builder()
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.enableTransaction(true)
.build();
+ // wait tc init success to ready state
+ waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
}
@AfterMethod(alwaysRun = true)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index e66525e..6729ff1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -81,9 +81,10 @@ public class TransactionProduceTest extends TransactionTestBase {
private static final String PRODUCE_ABORT_TOPIC = NAMESPACE1 + "/produce-abort";
private static final String ACK_COMMIT_TOPIC = NAMESPACE1 + "/ack-commit";
private static final String ACK_ABORT_TOPIC = NAMESPACE1 + "/ack-abort";
-
+ private static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
+ setBrokerCount(1);
internalSetup();
String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
@@ -100,7 +101,7 @@ public class TransactionProduceTest extends TransactionTestBase {
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
- admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
+ admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
if (pulsarClient != null) {
pulsarClient.shutdown();
@@ -111,7 +112,9 @@ public class TransactionProduceTest extends TransactionTestBase {
.enableTransaction(true)
.build();
- Thread.sleep(1000 * 3);
+
+ // wait tc init success to ready state
+ waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
}
@AfterMethod(alwaysRun = true)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index a319c5d..fb94638 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -93,6 +93,8 @@ public class TransactionTest extends TransactionTestBase {
.statsInterval(0, TimeUnit.SECONDS)
.enableTransaction(true)
.build();
+ // wait tc init success to ready state
+ waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
}
@Test
@@ -159,8 +161,6 @@ public class TransactionTest extends TransactionTestBase {
@Test
public void testGetTxnID() throws Exception {
- // wait tc init success to ready state
- Assert.assertTrue(waitForCoordinatorToBeAvailable(NUM_BROKERS, NUM_PARTITIONS));
Transaction transaction = pulsarClient.newTransaction()
.build().get();
TxnID txnID = transaction.getTxnID();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 43c5edb..0a694c8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -296,16 +296,22 @@ public abstract class TransactionTestBase extends TestRetrySupport {
log.warn("Failed to clean up mocked pulsar service:", e);
}
}
- public boolean waitForCoordinatorToBeAvailable(int numOfBroker, int numOfTCPerBroker){
+ public void waitForCoordinatorToBeAvailable(int numOfTCPerBroker){
// wait tc init success to ready state
- Awaitility.await().untilAsserted(() -> {
- TransactionMetadataStore transactionMetadataStore =
- getPulsarServiceList().get(numOfBroker - 1).getTransactionMetadataStoreService()
- .getStores().get(TransactionCoordinatorID.get(numOfTCPerBroker - 1));
- assertNotNull(transactionMetadataStore);
- assertEquals(((MLTransactionMetadataStore) transactionMetadataStore).getState(),
- TransactionMetadataStoreState.State.Ready);
+ Awaitility.await().until(() -> {
+ Map<TransactionCoordinatorID, TransactionMetadataStore> stores =
+ getPulsarServiceList().get(brokerCount-1).getTransactionMetadataStoreService().getStores();
+ if (stores.size() == numOfTCPerBroker) {
+ for (TransactionCoordinatorID transactionCoordinatorID : stores.keySet()) {
+ if (((MLTransactionMetadataStore) stores.get(transactionCoordinatorID)).getState()
+ != TransactionMetadataStoreState.State.Ready) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
});
- return true;
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
index 9cdb219..fc952c4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
@@ -73,7 +73,7 @@ public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
private static final String TENANT = "tnx";
private static final String NAMESPACE1 = TENANT + "/ns1";
-
+ private static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
setBrokerCount(1);
@@ -89,7 +89,7 @@ public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NAMESPACE1);
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
- admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
+ admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
if (pulsarClient != null) {
pulsarClient.shutdown();
@@ -102,19 +102,8 @@ public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
Map<TransactionCoordinatorID, TransactionMetadataStore> stores =
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores();
- Awaitility.await().until(() -> {
- if (stores.size() == 16) {
- for (TransactionCoordinatorID transactionCoordinatorID : stores.keySet()) {
- if (((MLTransactionMetadataStore) stores.get(transactionCoordinatorID)).getState()
- != TransactionMetadataStoreState.State.Ready) {
- return false;
- }
- }
- return true;
- } else {
- return false;
- }
- });
+ // wait tc init success to ready state
+ waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
}
@AfterMethod(alwaysRun = true)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 4cdef2e..46c65b8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -64,6 +64,7 @@ public class PendingAckPersistentTest extends TransactionTestBase {
private static final String PENDING_ACK_REPLAY_TOPIC = "persistent://public/txn/pending-ack-replay";
private static final String NAMESPACE = "public/txn";
+ private static final int NUM_PARTITIONS = 16;
@BeforeMethod
public void setup() throws Exception {
@@ -87,8 +88,8 @@ public class PendingAckPersistentTest extends TransactionTestBase {
.statsInterval(0, TimeUnit.SECONDS)
.enableTransaction(true)
.build();
-
- Thread.sleep(1000 * 3);
+ // wait tc init success to ready state
+ waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
}
@AfterMethod(alwaysRun = true)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 3da8a6c..f971b5a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -93,9 +93,10 @@ public class TransactionEndToEndTest extends TransactionTestBase {
private static final String NAMESPACE1 = TENANT + "/ns1";
private static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
private static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test";
-
+ private static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
+ setBrokerCount(1);
internalSetup();
String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
@@ -110,7 +111,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
- admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
+ admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
if (pulsarClient != null) {
pulsarClient.close();
@@ -121,8 +122,8 @@ public class TransactionEndToEndTest extends TransactionTestBase {
.enableTransaction(true)
.build();
- Thread.sleep(1000 * 3);
- }
+ // wait tc init success to ready state
+ waitForCoordinatorToBeAvailable(NUM_PARTITIONS); }
@AfterMethod(alwaysRun = true)
protected void cleanup() {