You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/09/28 09:56:12 UTC
[pulsar] branch master updated: [improve][test] Improve TransactionEndToEndTest to reduce the execution time (#17790)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7648a119b10 [improve][test] Improve TransactionEndToEndTest to reduce the execution time (#17790)
7648a119b10 is described below
commit 7648a119b10098c0b14f15d5a54a30b0aba7716d
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Wed Sep 28 17:56:00 2022 +0800
[improve][test] Improve TransactionEndToEndTest to reduce the execution time (#17790)
Fixes
- https://github.com/apache/pulsar/issues/17623
- https://github.com/apache/pulsar/issues/17637
### Motivation
Manually release resources, including `consumer`, `producer`, `pulsar client`, `transaction`, and `topic`. This saves `setup` and `cleanup` time before and after each method.
### Modifications
- Manually release resources instead of calling `cleanup` & `setup` each method
- remove useless method `markDeletePositionCheck`
- `Integer.valueOf(int)` instead of `new Integer(int)`, because `new Integer(int)` is deprecated
### Matching PR in forked repository
PR in forked repository:
- https://github.com/poorbarcode/pulsar/pull/10
---
.../client/impl/TransactionEndToEndTest.java | 155 +++++++++++++++++----
...ransactionEndToEndWithoutBatchIndexAckTest.java | 4 +-
2 files changed, 128 insertions(+), 31 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index aea77bec136..e3fc05ae042 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -80,8 +80,8 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.awaitility.Awaitility;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -96,14 +96,21 @@ public class TransactionEndToEndTest extends TransactionTestBase {
protected static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
protected static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test";
protected static final int NUM_PARTITIONS = 16;
- @BeforeMethod
+ @BeforeClass
protected void setup() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
}
- @AfterMethod(alwaysRun = true)
+ protected void resetTopicOutput() throws Exception {
+ admin.topics().deletePartitionedTopic(TOPIC_OUTPUT, true);
+ admin.topics().createPartitionedTopic(TOPIC_OUTPUT, TOPIC_PARTITION);
+ admin.topics().deletePartitionedTopic(TOPIC_MESSAGE_ACK_TEST, true);
+ admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
+ }
+
+ @AfterClass(alwaysRun = true)
protected void cleanup() {
super.internalCleanup();
}
@@ -167,6 +174,10 @@ public class TransactionEndToEndTest extends TransactionTestBase {
message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(message);
+ // cleanup.
+ producer.close();
+ consumer.close();
+ resetTopicOutput();
log.info("message commit test enableBatch {}", enableBatch);
}
@@ -175,7 +186,6 @@ public class TransactionEndToEndTest extends TransactionTestBase {
Transaction txn = getTxn();
String subName = "test";
- @Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(TOPIC_OUTPUT)
@@ -188,7 +198,6 @@ public class TransactionEndToEndTest extends TransactionTestBase {
producer.newMessage(txn).value(("Hello Txn - " + i).getBytes(UTF_8)).send();
}
- @Cleanup
Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topic(TOPIC_OUTPUT)
@@ -253,6 +262,10 @@ public class TransactionEndToEndTest extends TransactionTestBase {
return flag;
});
+ // cleanup.
+ producer.close();
+ consumer.close();
+ resetTopicOutput();
log.info("finished test partitionAbortTest");
}
@@ -311,6 +324,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
}
}
assertTrue(flag);
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ admin.topics().delete(topicName, true);
}
@Test
@@ -406,6 +424,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
Assert.assertTrue(reCommitError.getCause() instanceof TransactionNotFoundException);
}
}
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ admin.topics().delete(normalTopic, true);
}
@Test
@@ -423,6 +446,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
String content = "test";
producer.send(content);
assertEquals(consumer.receive().getValue(), content);
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ admin.topics().delete(topicTwo, true);
}
@Test
@@ -536,6 +564,10 @@ public class TransactionEndToEndTest extends TransactionTestBase {
}
assertTrue(exist);
+ // cleanup.
+ producer.close();
+ consumer.close();
+ resetTopicOutput();
log.info("receive transaction messages count: {}", receiveCnt);
}
@@ -638,6 +670,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
message = consumer.receive(1, TimeUnit.SECONDS);
Assert.assertNull(message);
}
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ admin.topics().delete(normalTopic, true);
}
private Transaction getTxn() throws Exception {
@@ -648,25 +685,6 @@ public class TransactionEndToEndTest extends TransactionTestBase {
.get();
}
- private void markDeletePositionCheck(String topic, String subName, boolean equalsWithLastConfirm) throws Exception {
- for (int i = 0; i < TOPIC_PARTITION; i++) {
- PersistentTopicInternalStats stats = null;
- String checkTopic = TopicName.get(topic).getPartition(i).toString();
- for (int j = 0; j < 10; j++) {
- stats = admin.topics().getInternalStats(checkTopic, false);
- if (stats.lastConfirmedEntry.equals(stats.cursors.get(subName).markDeletePosition)) {
- break;
- }
- Thread.sleep(200);
- }
- if (equalsWithLastConfirm) {
- Assert.assertEquals(stats.cursors.get(subName).markDeletePosition, stats.lastConfirmedEntry);
- } else {
- Assert.assertNotEquals(stats.cursors.get(subName).markDeletePosition, stats.lastConfirmedEntry);
- }
- }
- }
-
@Test
public void txnMetadataHandlerRecoverTest() throws Exception {
String topic = NAMESPACE1 + "/tc-metadata-handler-recover";
@@ -714,6 +732,12 @@ public class TransactionEndToEndTest extends TransactionTestBase {
Message<byte[]> message = consumer.receive();
Assert.assertNotNull(message);
}
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ recoverPulsarClient.close();
+ admin.topics().delete(topic, true);
}
@Test
@@ -748,9 +772,14 @@ public class TransactionEndToEndTest extends TransactionTestBase {
for (int i = 0; i < 1000; i++) {
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
- Assert.assertEquals(Integer.valueOf(new String(message.getData())), new Integer(i));
+ Assert.assertEquals(Integer.valueOf(new String(message.getData())), Integer.valueOf(i));
}
}
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ admin.topics().delete(topic, true);
}
@Test
@@ -855,10 +884,20 @@ public class TransactionEndToEndTest extends TransactionTestBase {
field.setAccessible(true);
TransactionImpl.State state = (TransactionImpl.State) field.get(timeoutTxnSkipClientTimeout);
assertEquals(state, TransactionImpl.State.ERROR);
+
+ // cleanup.
+ timeoutTxn.abort();
+ producer.close();
+ consumer.close();
+ admin.topics().delete(topic, true);
}
@Test
public void testTxnTimeoutAtTransactionMetadataStore() throws Exception{
+ Collection<TransactionMetadataStore> transactionMetadataStores =
+ getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores().values();
+ long timeoutCountOriginal = transactionMetadataStores.stream()
+ .mapToLong(store -> store.getMetadataStoreStats().timeoutCount).sum();
TxnID txnID = pulsarServiceList.get(0).getTransactionMetadataStoreService()
.newTransaction(new TransactionCoordinatorID(0), 1).get();
Awaitility.await().until(() -> {
@@ -869,11 +908,9 @@ public class TransactionEndToEndTest extends TransactionTestBase {
return true;
}
});
- Collection<TransactionMetadataStore> transactionMetadataStores =
- getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores().values();
long timeoutCount = transactionMetadataStores.stream()
.mapToLong(store -> store.getMetadataStoreStats().timeoutCount).sum();
- Assert.assertEquals(timeoutCount, 1);
+ Assert.assertEquals(timeoutCount, timeoutCountOriginal + 1);
}
@Test
@@ -914,6 +951,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
assertEquals(reReceiveMessage.getMessageId(), message.getMessageId());
+ // cleanup.
+ consumeTimeoutTxn.abort();
+ producer.close();
+ consumer.close();
+ admin.topics().delete(topic, true);
}
@DataProvider(name = "ackType")
@@ -979,6 +1021,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
}
txn.abort().get();
assertTrue(exist);
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ admin.topics().delete(topic, true);
}
@Test
@@ -1036,6 +1083,13 @@ public class TransactionEndToEndTest extends TransactionTestBase {
}
}
assertTrue(flag);
+
+ // cleanup.
+ txn.abort().get();
+ producer.close();
+ consumer1.close();
+ consumer2.close();
+ admin.topics().delete(topic, true);
}
@Test
@@ -1070,6 +1124,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
Assert.assertTrue(e.getCause() instanceof TransactionCoordinatorClientException
.InvalidTxnStatusException);
}
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ admin.topics().delete(topic, true);
}
@Test
@@ -1138,6 +1197,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
// then redeliver will not receive any message
message = consumer.receive(3, TimeUnit.SECONDS);
assertNull(message);
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ admin.topics().delete(topic, true);
}
@Test
@@ -1172,6 +1236,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
} catch (PulsarClientException ex) {
assertTrue(ex instanceof PulsarClientException.TimeoutException);
}
+
+ // cleanup.
+ transaction.abort().get();
+ producer.close();
+ admin.topics().delete(topic, true);
}
@Test
@@ -1218,6 +1287,12 @@ public class TransactionEndToEndTest extends TransactionTestBase {
// ack one message, the unack count is 4
assertEquals(getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false)
.get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages(), 4);
+
+ // cleanup.
+ txn.abort().get();
+ consumer.close();
+ producer.close();
+ admin.topics().delete(topic, true);
}
@Test
@@ -1274,6 +1349,14 @@ public class TransactionEndToEndTest extends TransactionTestBase {
assertEquals(((ConsumerImpl<?>) consumer).getAvailablePermits(), 3);
assertEquals(value, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
+
+ // cleanup.
+ consumer.close();
+ deadLetterConsumer.close();
+ producer.close();
+ admin.topics().delete(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX,
+ topic, subName), true);
+ admin.topics().delete(topic, true);
}
@Test
@@ -1341,6 +1424,14 @@ public class TransactionEndToEndTest extends TransactionTestBase {
assertEquals(value1, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
assertEquals(value2, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
+
+ // cleanup.
+ consumer.close();
+ deadLetterConsumer.close();
+ producer.close();
+ admin.topics().delete(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX,
+ topic, subName), true);
+ admin.topics().delete(topic, true);
}
@Test
@@ -1400,5 +1491,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
for (int i = 0; i < 10; i++) {
assertTrue(receivedMsgs.contains("msg-" + i));
}
+
+ // cleanup.
+ sharedConsumer.close();
+ failoverConsumer.close();
+ producer.close();
+ admin.topics().delete(topic, true);
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java
index 1ef3998c346..0b50be807be 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.client.impl;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
@@ -30,7 +30,7 @@ import org.testng.annotations.Test;
@Test(groups = "flaky")
public class TransactionEndToEndWithoutBatchIndexAckTest extends TransactionEndToEndTest {
- @BeforeMethod
+ @BeforeClass
protected void setup() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(false);
setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);