You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 15:13:14 UTC

[pulsar] 09/29: [improve][tests] improved flaky test runs (#16011)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5f7a6afa2e3b166faf9902fe97a82c7e6902ddaf
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Tue Jun 14 13:31:24 2022 -0700

    [improve][tests] improved flaky test runs (#16011)
    
    * [improve][tests] improved flaky test runs
    - improved PulsarFunctionTlsTests by reordering tearDown() logic
    - improved ManagedLedgerFactoryImpl.shutdown() by closing cacheEviction threads early
    - improved TestPulsarConnector memory consumption by removing unnecessary spy()
    - improved PulsarFunctionsTest run by using receive() instead of receive(30, TimeUnit.SECONDS);
    
    * Reverted PulsarFunctionsTest consumer.receive() change
    
    (cherry picked from commit b1b25ef15be4595e2284cd4bbd4b8cfe39ed0743)
---
 .../apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java   | 6 +++---
 .../org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java  | 7 ++++---
 .../src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java  | 4 ++--
 .../java/org/apache/pulsar/sql/presto/TestPulsarConnector.java     | 4 ++--
 4 files changed, 11 insertions(+), 10 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 5ba1dc14c44..d829efad87f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -257,7 +257,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
         double evictionFrequency = Math.max(Math.min(config.getCacheEvictionFrequency(), 1000.0), 0.001);
         long waitTimeMillis = (long) (1000 / evictionFrequency);
 
-        while (true) {
+        while (!closed) {
             try {
                 doCacheEviction();
 
@@ -509,6 +509,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
 
         statsTask.cancel(true);
         flushCursorsTask.cancel(true);
+        cacheEvictionExecutor.shutdownNow();
 
         List<String> ledgerNames = new ArrayList<>(this.ledgers.keySet());
         List<CompletableFuture<Void>> futures = new ArrayList<>(ledgerNames.size());
@@ -589,7 +590,6 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
                 }));
             }
         }));
-        cacheEvictionExecutor.shutdownNow();
         entryCacheManager.clear();
         return FutureUtil.waitForAll(futures);
     }
@@ -603,6 +603,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
 
         statsTask.cancel(true);
         flushCursorsTask.cancel(true);
+        cacheEvictionExecutor.shutdownNow();
 
         // take a snapshot of ledgers currently in the map to prevent race conditions
         List<CompletableFuture<ManagedLedgerImpl>> ledgers = new ArrayList<>(this.ledgers.values());
@@ -646,7 +647,6 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
         }
 
         scheduledExecutor.shutdownNow();
-        cacheEvictionExecutor.shutdownNow();
 
         entryCacheManager.clear();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 64e3b588236..844596743d0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -177,12 +177,13 @@ public class PulsarFunctionTlsTest {
     void tearDown() throws Exception {
         try {
             for (int i = 0; i < BROKER_COUNT; i++) {
-                if (pulsarServices[i] != null) {
-                    pulsarServices[i].close();
-                }
                 if (pulsarAdmins[i] != null) {
                     pulsarAdmins[i].close();
                 }
+                if (pulsarServices[i] != null) {
+                    pulsarServices[i].close();
+                }
+
             }
             bkEnsemble.stop();
         } finally {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 15ee27dc3a5..9f6d9f65dbb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -171,9 +171,9 @@ public class PulsarFunctionTlsTest {
         log.info("--- Shutting down ---");
         try {
             functionAdmin.close();
-            bkEnsemble.stop();
-            workerServer.stop();
             functionsWorkerService.stop();
+            workerServer.stop();
+            bkEnsemble.stop();
         } finally {
             if (tempDirectory != null) {
                 tempDirectory.delete();
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 7db32f59148..e7b19c8e5f5 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -699,10 +699,10 @@ public abstract class TestPulsarConnector {
         when(PulsarConnectorCache.instance.getManagedLedgerFactory()).thenReturn(managedLedgerFactory);
 
         for (Map.Entry<TopicName, PulsarSplit> split : splits.entrySet()) {
-            PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor(
+            PulsarRecordCursor pulsarRecordCursor = new PulsarRecordCursor(
                     topicsToColumnHandles.get(split.getKey()), split.getValue(),
                     pulsarConnectorConfig, managedLedgerFactory, new ManagedLedgerConfig(),
-                    new PulsarConnectorMetricsTracker(new NullStatsProvider()),dispatchingRowDecoderFactory));
+                    new PulsarConnectorMetricsTracker(new NullStatsProvider()),dispatchingRowDecoderFactory);
             this.pulsarRecordCursors.put(split.getKey(), pulsarRecordCursor);
         }
     }