You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/21 03:02:44 UTC

[pulsar] 13/15: [Transaction] Delete the redundant code (#13327)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 914f172592c6d205cc380ea3edb76e0f03904e0b
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Dec 17 14:47:12 2021 +0800

    [Transaction] Delete the redundant code (#13327)
    
    The problem was resolved, so there is no need to add a wait and retry method again.
    1. Delete the redundant code
    2. Optimize some code form
    
    (cherry picked from commit fbe010323076ba2339e2339e3031a78e20b09061)
---
 .../broker/transaction/TransactionTestBase.java    | 14 -----
 .../pulsar/testclient/PerformanceConsumer.java     | 10 ++--
 .../pulsar/testclient/PerformanceProducer.java     | 12 +++--
 .../pulsar/testclient/PerformanceTransaction.java  | 20 +++++---
 .../pulsar/testclient/utils/PerformanceUtils.java  | 59 ----------------------
 .../testclient/PerformanceTransactionTest.java     | 23 ++++-----
 6 files changed, 38 insertions(+), 100 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index e13365d..fe7a813 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -61,8 +61,6 @@ import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.MockZooKeeperSession;
 import org.apache.zookeeper.ZooKeeper;
-import org.awaitility.Awaitility;
-import org.testng.Assert;
 
 @Slf4j
 public abstract class TransactionTestBase extends TestRetrySupport {
@@ -144,8 +142,6 @@ public abstract class TransactionTestBase extends TestRetrySupport {
                 .statsInterval(0, TimeUnit.SECONDS)
                 .enableTransaction(true)
                 .build();
-        // wait tc init success to ready state
-        waitForCoordinatorToBeAvailable(numPartitionsOfTC);
     }
 
     protected void startBroker() throws Exception {
@@ -332,14 +328,4 @@ public abstract class TransactionTestBase extends TestRetrySupport {
             log.warn("Failed to clean up mocked pulsar service:", e);
         }
     }
-    public void waitForCoordinatorToBeAvailable(int numOfTCPerBroker){
-        // wait tc init success to ready state
-        Awaitility.await()
-                .untilAsserted(() -> {
-                    int transactionMetaStoreCount = pulsarServiceList.stream()
-                            .mapToInt(pulsarService -> pulsarService.getTransactionMetadataStoreService().getStores().size())
-                            .sum();
-                    Assert.assertEquals(transactionMetaStoreCount, numOfTCPerBroker);
-                });
-    }
 }
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 9c15a0e..3c7bed9 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.testclient;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.testclient.utils.PerformanceUtils.buildTransaction;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
@@ -362,8 +361,13 @@ public class PerformanceConsumer {
         }
         PulsarClient pulsarClient = clientBuilder.build();
 
-        AtomicReference<Transaction> atomicReference = buildTransaction(pulsarClient, arguments.isEnableTransaction,
-                arguments.transactionTimeout);
+        AtomicReference<Transaction> atomicReference;
+        if (arguments.isEnableTransaction) {
+            atomicReference = new AtomicReference<>(pulsarClient.newTransaction()
+                    .withTransactionTimeout(arguments.transactionTimeout, TimeUnit.SECONDS).build().get());
+        } else {
+            atomicReference = new AtomicReference<>(null);
+        }
 
         AtomicLong messageAckedCount = new AtomicLong();
         Semaphore messageReceiveLimiter = new Semaphore(arguments.numMessagesPerTransaction);
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 20eb8f3..0e3d550 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -74,7 +74,6 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
 import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES;
 import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
 import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_BATCHING_MAX_MESSAGES;
-import static org.apache.pulsar.testclient.utils.PerformanceUtils.buildTransaction;
 
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -422,7 +421,7 @@ public class PerformanceProducer {
                 clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
             }
 
-            try (PulsarAdmin client = clientBuilder.build();) {
+            try (PulsarAdmin client = clientBuilder.build()) {
                 for (String topic : arguments.topics) {
                     log.info("Creating partitioned topic {} with {} partitions", topic, arguments.partitions);
                     try {
@@ -592,8 +591,15 @@ public class PerformanceProducer {
                     // enable round robin message routing if it is a partitioned topic
                     .messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
 
+            AtomicReference<Transaction> transactionAtomicReference;
             if (arguments.isEnableTransaction) {
                 producerBuilder.sendTimeout(0, TimeUnit.SECONDS);
+                transactionAtomicReference = new AtomicReference<>(client.newTransaction()
+                        .withTransactionTimeout(arguments.transactionTimeout, TimeUnit.SECONDS)
+                        .build()
+                        .get());
+            } else {
+                transactionAtomicReference = new AtomicReference<>(null);
             }
             if (arguments.producerName != null) {
                 String producerName = String.format("%s%s%d", arguments.producerName, arguments.separator, producerId);
@@ -659,8 +665,6 @@ public class PerformanceProducer {
             }
             // Send messages on all topics/producers
             long totalSent = 0;
-            AtomicReference<Transaction> transactionAtomicReference = buildTransaction(client,
-                    arguments.isEnableTransaction, arguments.transactionTimeout);
             AtomicLong numMessageSend = new AtomicLong(0);
             Semaphore numMsgPerTxnLimit = new Semaphore(arguments.numMessagesPerTransaction);
             while (true) {
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
index 5127f85..eee284b 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.testclient;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.pulsar.testclient.utils.PerformanceUtils.buildTransaction;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
@@ -284,7 +283,7 @@ public class PerformanceTransaction {
         ExecutorService executorService = new ThreadPoolExecutor(arguments.numTestThreads,
                 arguments.numTestThreads,
                 0L, TimeUnit.MILLISECONDS,
-                new LinkedBlockingQueue<Runnable>());
+                new LinkedBlockingQueue<>());
 
 
         long startTime = System.nanoTime();
@@ -311,16 +310,23 @@ public class PerformanceTransaction {
                     //A thread may perform tasks of multiple transactions in a traversing manner.
                     List<Producer<byte[]>> producers = null;
                     List<List<Consumer<byte[]>>> consumers = null;
+                    AtomicReference<Transaction> atomicReference = null;
                     try {
                         producers = buildProducers(client, arguments);
                         consumers = buildConsumer(client, arguments);
+                        if (!arguments.isDisableTransaction) {
+                            atomicReference = new AtomicReference<>(client.newTransaction()
+                                    .withTransactionTimeout(arguments.transactionTimeout, TimeUnit.SECONDS)
+                                    .build()
+                                    .get());
+                        } else {
+                            atomicReference = new AtomicReference<>(null);
+                        }
                     } catch (Exception e) {
                         log.error("Failed to build Producer/Consumer with exception : ", e);
                         executorService.shutdownNow();
                         PerfClientUtils.exit(-1);
                     }
-                    AtomicReference<Transaction> atomicReference = buildTransaction(client,
-                            !arguments.isDisableTransaction, arguments.transactionTimeout);
                     //The while loop has no break, and finally ends the execution through the shutdownNow of
                     //the executorService
                     while (true) {
@@ -351,7 +357,7 @@ public class PerformanceTransaction {
                         for (List<Consumer<byte[]>> subscriptions : consumers) {
                                 for (Consumer<byte[]> consumer : subscriptions) {
                                     for (int j = 0; j < arguments.numMessagesReceivedPerTransaction; j++) {
-                                        Message message = null;
+                                        Message<byte[]> message = null;
                                         try {
                                             message = consumer.receive();
                                         } catch (PulsarClientException e) {
@@ -690,9 +696,7 @@ public class PerformanceTransaction {
                 .sendTimeout(0, TimeUnit.SECONDS);
 
         final List<Future<Producer<byte[]>>> producerFutures = Lists.newArrayList();
-        Iterator<String> produceTopicsIterator = arguments.producerTopic.iterator();
-        while(produceTopicsIterator.hasNext()){
-            String topic = produceTopicsIterator.next();
+        for (String topic : arguments.producerTopic) {
             log.info("Create producer for topic {}", topic);
             producerFutures.add(producerBuilder.clone().topic(topic).createAsync());
         }
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/utils/PerformanceUtils.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/utils/PerformanceUtils.java
deleted file mode 100644
index ded1131..0000000
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/utils/PerformanceUtils.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.testclient.utils;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.transaction.Transaction;
-import org.apache.pulsar.testclient.PerformanceProducer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PerformanceUtils {
-
-    private static final Logger log = LoggerFactory.getLogger(PerformanceProducer.class);
-
-    public static AtomicReference<Transaction> buildTransaction(PulsarClient pulsarClient, boolean isEnableTransaction,
-                                                                long transactionTimeout) {
-
-        AtomicLong numBuildTxnFailed = new AtomicLong();
-        if (isEnableTransaction) {
-            while(true) {
-                AtomicReference atomicReference = null;
-                try {
-                    atomicReference = new AtomicReference(pulsarClient.newTransaction()
-                            .withTransactionTimeout(transactionTimeout, TimeUnit.SECONDS).build().get());
-                } catch (Exception e) {
-                    numBuildTxnFailed.incrementAndGet();
-                    if (numBuildTxnFailed.get()%10 == 0) {
-                        log.error("Failed to new a transaction with {} times", numBuildTxnFailed.get(), e);
-                    }
-                }
-                if (atomicReference != null && atomicReference.get() != null) {
-                    log.info("After {} failures, the transaction was created successfully for the first time",
-                            numBuildTxnFailed.get());
-                    return atomicReference;
-                }
-            }
-        }
-        return new AtomicReference<>(null);
-    }
-}
diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
index c5e62f7..a08fbe0 100644
--- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
+++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
@@ -91,8 +91,8 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testTxnPerf() throws Exception {
         String argString = "--topics-c %s --topics-p %s -threads 1 -ntxn 50 -u %s -ss %s -np 1 -au %s";
-        String testConsumeTopic = testTopic + UUID.randomUUID().toString();
-        String testProduceTopic = testTopic + UUID.randomUUID().toString();
+        String testConsumeTopic = testTopic + UUID.randomUUID();
+        String testProduceTopic = testTopic + UUID.randomUUID();
         String testSub = "testSub";
         admin.topics().createPartitionedTopic(testConsumeTopic, 1);
         String args = String.format(argString, testConsumeTopic, testProduceTopic,
@@ -119,9 +119,8 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
         CountDownLatch countDownLatch = new CountDownLatch(50);
         for (int i = 0; i < 50
                 ; i++) {
-            produceToConsumeTopic.newMessage().value(("testConsume " + i).getBytes()).sendAsync().thenRun(() -> {
-                countDownLatch.countDown();
-            });
+            produceToConsumeTopic.newMessage().value(("testConsume " + i).getBytes()).sendAsync().thenRun(
+                    countDownLatch::countDown);
         }
 
         countDownLatch.await();
@@ -149,11 +148,11 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
         for (int i = 0; i < 50; i++) {
-            Message message = consumeFromProduceTopic.receive(2, TimeUnit.SECONDS);
+            Message<byte[]> message = consumeFromProduceTopic.receive(2, TimeUnit.SECONDS);
             Assert.assertNotNull(message);
             consumeFromProduceTopic.acknowledge(message);
         }
-        Message message = consumeFromConsumeTopic.receive(2, TimeUnit.SECONDS);
+        Message<byte[]> message = consumeFromConsumeTopic.receive(2, TimeUnit.SECONDS);
         Assert.assertNull(message);
         message = consumeFromProduceTopic.receive(2, TimeUnit.SECONDS);
         Assert.assertNull(message);
@@ -187,16 +186,16 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
                 .enableBatchIndexAcknowledgment(false)
                 .subscribe();
         for (int i = 0; i < totalMessage; i++) {
-           Message message = consumer.receive(2, TimeUnit.SECONDS);
+           Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertNotNull(message);
            consumer.acknowledge(message);
         }
-        Message message = consumer.receive(2, TimeUnit.SECONDS);
+        Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
         Assert.assertNull(message);
     }
 
     @Test
-    public void testConsumeTxnMessage() throws InterruptedException, PulsarClientException, ExecutionException {
+    public void testConsumeTxnMessage() throws InterruptedException, PulsarClientException {
         String argString = "%s -r 10 -u %s -txn -ss %s -st %s -sp %s -ntxn %d";
         String subName = "sub";
         String topic = testTopic + UUID.randomUUID();
@@ -230,10 +229,10 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
                 .enableBatchIndexAcknowledgment(false)
                .subscribe();
         for (int i = 0; i < 5; i++) {
-            Message message = consumer.receive(2, TimeUnit.SECONDS);
+            Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
             Assert.assertNotNull(message);
         }
-        Message message = consumer.receive(2, TimeUnit.SECONDS);
+        Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
         Assert.assertNull(message);
     }