You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/09/28 09:56:12 UTC

[pulsar] branch master updated: [improve][test] Improve TransactionEndToEndTest to reduce the execution time (#17790)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7648a119b10 [improve][test] Improve TransactionEndToEndTest to reduce the execution time (#17790)
7648a119b10 is described below

commit 7648a119b10098c0b14f15d5a54a30b0aba7716d
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Wed Sep 28 17:56:00 2022 +0800

    [improve][test] Improve TransactionEndToEndTest to reduce the execution time (#17790)
    
    Fixes
    - https://github.com/apache/pulsar/issues/17623
    - https://github.com/apache/pulsar/issues/17637
    
    ### Motivation
    
    Manually release resources, including `consumer`, `producer`, `pulsar client`, `transaction`, and `topic`. This saves `setup` and `cleanup` time before and after each method.
    
    ### Modifications
    
    - Manually release resources instead of calling `cleanup` & `setup` each method
    - remove useless method `markDeletePositionCheck`
    - `Integer.valueOf(int)` instead of `new Integer(int)`, because `new Integer(int)` is deprecated
    
    ### Matching PR in forked repository
    
    PR in forked repository:
    
    - https://github.com/poorbarcode/pulsar/pull/10
---
 .../client/impl/TransactionEndToEndTest.java       | 155 +++++++++++++++++----
 ...ransactionEndToEndWithoutBatchIndexAckTest.java |   4 +-
 2 files changed, 128 insertions(+), 31 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index aea77bec136..e3fc05ae042 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -80,8 +80,8 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -96,14 +96,21 @@ public class TransactionEndToEndTest extends TransactionTestBase {
     protected static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
     protected static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test";
     protected static final int NUM_PARTITIONS = 16;
-    @BeforeMethod
+    @BeforeClass
     protected void setup() throws Exception {
         conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
         setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
         admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
     }
 
-    @AfterMethod(alwaysRun = true)
+    protected void resetTopicOutput() throws Exception {
+        admin.topics().deletePartitionedTopic(TOPIC_OUTPUT, true);
+        admin.topics().createPartitionedTopic(TOPIC_OUTPUT, TOPIC_PARTITION);
+        admin.topics().deletePartitionedTopic(TOPIC_MESSAGE_ACK_TEST, true);
+        admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
+    }
+
+    @AfterClass(alwaysRun = true)
     protected void cleanup() {
         super.internalCleanup();
     }
@@ -167,6 +174,10 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         message = consumer.receive(5, TimeUnit.SECONDS);
         Assert.assertNull(message);
 
+        // cleanup.
+        producer.close();
+        consumer.close();
+        resetTopicOutput();
         log.info("message commit test enableBatch {}", enableBatch);
     }
 
@@ -175,7 +186,6 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         Transaction txn = getTxn();
         String subName = "test";
 
-        @Cleanup
         Producer<byte[]> producer = pulsarClient
                 .newProducer()
                 .topic(TOPIC_OUTPUT)
@@ -188,7 +198,6 @@ public class TransactionEndToEndTest extends TransactionTestBase {
             producer.newMessage(txn).value(("Hello Txn - " + i).getBytes(UTF_8)).send();
         }
 
-        @Cleanup
         Consumer<byte[]> consumer = pulsarClient
                 .newConsumer()
                 .topic(TOPIC_OUTPUT)
@@ -253,6 +262,10 @@ public class TransactionEndToEndTest extends TransactionTestBase {
             return flag;
         });
 
+        // cleanup.
+        producer.close();
+        consumer.close();
+        resetTopicOutput();
         log.info("finished test partitionAbortTest");
     }
 
@@ -311,6 +324,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
             }
         }
         assertTrue(flag);
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topicName, true);
     }
 
     @Test
@@ -406,6 +424,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
                 Assert.assertTrue(reCommitError.getCause() instanceof TransactionNotFoundException);
             }
         }
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        admin.topics().delete(normalTopic, true);
     }
 
     @Test
@@ -423,6 +446,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         String content = "test";
         producer.send(content);
         assertEquals(consumer.receive().getValue(), content);
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topicTwo, true);
     }
 
     @Test
@@ -536,6 +564,10 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         }
         assertTrue(exist);
 
+        // cleanup.
+        producer.close();
+        consumer.close();
+        resetTopicOutput();
         log.info("receive transaction messages count: {}", receiveCnt);
     }
 
@@ -638,6 +670,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
             message = consumer.receive(1, TimeUnit.SECONDS);
             Assert.assertNull(message);
         }
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        admin.topics().delete(normalTopic, true);
     }
 
     private Transaction getTxn() throws Exception {
@@ -648,25 +685,6 @@ public class TransactionEndToEndTest extends TransactionTestBase {
                 .get();
     }
 
-    private void markDeletePositionCheck(String topic, String subName, boolean equalsWithLastConfirm) throws Exception {
-        for (int i = 0; i < TOPIC_PARTITION; i++) {
-            PersistentTopicInternalStats stats = null;
-            String checkTopic = TopicName.get(topic).getPartition(i).toString();
-            for (int j = 0; j < 10; j++) {
-                stats = admin.topics().getInternalStats(checkTopic, false);
-                if (stats.lastConfirmedEntry.equals(stats.cursors.get(subName).markDeletePosition)) {
-                    break;
-                }
-                Thread.sleep(200);
-            }
-            if (equalsWithLastConfirm) {
-                Assert.assertEquals(stats.cursors.get(subName).markDeletePosition, stats.lastConfirmedEntry);
-            } else {
-                Assert.assertNotEquals(stats.cursors.get(subName).markDeletePosition, stats.lastConfirmedEntry);
-            }
-        }
-    }
-
     @Test
     public void txnMetadataHandlerRecoverTest() throws Exception {
         String topic = NAMESPACE1 + "/tc-metadata-handler-recover";
@@ -714,6 +732,12 @@ public class TransactionEndToEndTest extends TransactionTestBase {
             Message<byte[]> message = consumer.receive();
             Assert.assertNotNull(message);
         }
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        recoverPulsarClient.close();
+        admin.topics().delete(topic, true);
     }
 
     @Test
@@ -748,9 +772,14 @@ public class TransactionEndToEndTest extends TransactionTestBase {
             for (int i = 0; i < 1000; i++) {
                 Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
                 Assert.assertNotNull(message);
-                Assert.assertEquals(Integer.valueOf(new String(message.getData())), new Integer(i));
+                Assert.assertEquals(Integer.valueOf(new String(message.getData())), Integer.valueOf(i));
             }
         }
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topic, true);
     }
 
     @Test
@@ -855,10 +884,20 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         field.setAccessible(true);
         TransactionImpl.State state = (TransactionImpl.State) field.get(timeoutTxnSkipClientTimeout);
         assertEquals(state, TransactionImpl.State.ERROR);
+
+        // cleanup.
+        timeoutTxn.abort();
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topic, true);
     }
 
     @Test
     public void testTxnTimeoutAtTransactionMetadataStore() throws Exception{
+        Collection<TransactionMetadataStore> transactionMetadataStores =
+                getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores().values();
+        long timeoutCountOriginal = transactionMetadataStores.stream()
+                .mapToLong(store -> store.getMetadataStoreStats().timeoutCount).sum();
         TxnID txnID = pulsarServiceList.get(0).getTransactionMetadataStoreService()
                 .newTransaction(new TransactionCoordinatorID(0), 1).get();
         Awaitility.await().until(() -> {
@@ -869,11 +908,9 @@ public class TransactionEndToEndTest extends TransactionTestBase {
                 return true;
             }
         });
-        Collection<TransactionMetadataStore> transactionMetadataStores =
-                getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores().values();
         long timeoutCount = transactionMetadataStores.stream()
                 .mapToLong(store -> store.getMetadataStoreStats().timeoutCount).sum();
-        Assert.assertEquals(timeoutCount, 1);
+        Assert.assertEquals(timeoutCount, timeoutCountOriginal + 1);
     }
 
     @Test
@@ -914,6 +951,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
 
         assertEquals(reReceiveMessage.getMessageId(), message.getMessageId());
 
+        // cleanup.
+        consumeTimeoutTxn.abort();
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topic, true);
     }
 
     @DataProvider(name = "ackType")
@@ -979,6 +1021,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         }
         txn.abort().get();
         assertTrue(exist);
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topic, true);
     }
 
     @Test
@@ -1036,6 +1083,13 @@ public class TransactionEndToEndTest extends TransactionTestBase {
             }
         }
         assertTrue(flag);
+
+        // cleanup.
+        txn.abort().get();
+        producer.close();
+        consumer1.close();
+        consumer2.close();
+        admin.topics().delete(topic, true);
     }
 
     @Test
@@ -1070,6 +1124,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
             Assert.assertTrue(e.getCause() instanceof TransactionCoordinatorClientException
                     .InvalidTxnStatusException);
         }
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topic, true);
     }
 
     @Test
@@ -1138,6 +1197,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         // then redeliver will not receive any message
         message = consumer.receive(3, TimeUnit.SECONDS);
         assertNull(message);
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topic, true);
     }
 
     @Test
@@ -1172,6 +1236,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         } catch (PulsarClientException ex) {
             assertTrue(ex instanceof PulsarClientException.TimeoutException);
         }
+
+        // cleanup.
+        transaction.abort().get();
+        producer.close();
+        admin.topics().delete(topic, true);
     }
 
     @Test
@@ -1218,6 +1287,12 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         // ack one message, the unack count is 4
         assertEquals(getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false)
                 .get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages(), 4);
+
+        // cleanup.
+        txn.abort().get();
+        consumer.close();
+        producer.close();
+        admin.topics().delete(topic, true);
     }
 
     @Test
@@ -1274,6 +1349,14 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         assertEquals(((ConsumerImpl<?>) consumer).getAvailablePermits(), 3);
 
         assertEquals(value, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
+
+        // cleanup.
+        consumer.close();
+        deadLetterConsumer.close();
+        producer.close();
+        admin.topics().delete(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX,
+                topic, subName), true);
+        admin.topics().delete(topic, true);
     }
 
     @Test
@@ -1341,6 +1424,14 @@ public class TransactionEndToEndTest extends TransactionTestBase {
 
         assertEquals(value1, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
         assertEquals(value2, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
+
+        // cleanup.
+        consumer.close();
+        deadLetterConsumer.close();
+        producer.close();
+        admin.topics().delete(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX,
+                topic, subName), true);
+        admin.topics().delete(topic, true);
     }
 
     @Test
@@ -1400,5 +1491,11 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         for (int i = 0; i < 10; i++) {
             assertTrue(receivedMsgs.contains("msg-" + i));
         }
+
+        // cleanup.
+        sharedConsumer.close();
+        failoverConsumer.close();
+        producer.close();
+        admin.topics().delete(topic, true);
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java
index 1ef3998c346..0b50be807be 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.client.impl;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 /**
@@ -30,7 +30,7 @@ import org.testng.annotations.Test;
 @Test(groups = "flaky")
 public class TransactionEndToEndWithoutBatchIndexAckTest extends TransactionEndToEndTest {
 
-    @BeforeMethod
+    @BeforeClass
     protected void setup() throws Exception {
         conf.setAcknowledgmentAtBatchIndexLevelEnabled(false);
         setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);