You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/14 20:31:32 UTC
[pulsar] branch master updated: [improve][tests] improved flaky test runs (#16011)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b1b25ef15be [improve][tests] improved flaky test runs (#16011)
b1b25ef15be is described below
commit b1b25ef15be4595e2284cd4bbd4b8cfe39ed0743
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Tue Jun 14 13:31:24 2022 -0700
[improve][tests] improved flaky test runs (#16011)
* [improve][tests] improved flaky test runs
- improved PulsarFunctionTlsTests by reordering tearDown() logic
- improved ManagedLedgerFactoryImpl.shutdown() by closing cacheEviction threads early
- improved TestPulsarConnector memory consumption by removing unnecessary spy()
- improved PulsarFunctionsTest run by using receive() instead of receive(30, TimeUnit.SECONDS);
* Reverted PulsarFunctionsTest consumer.receive() change
---
.../apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java | 6 +++---
.../org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java | 7 ++++---
.../src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java | 4 ++--
.../java/org/apache/pulsar/sql/presto/TestPulsarConnector.java | 4 ++--
4 files changed, 11 insertions(+), 10 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 48e85423e84..1a12f9da496 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -262,7 +262,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
double evictionFrequency = Math.max(Math.min(config.getCacheEvictionFrequency(), 1000.0), 0.001);
long waitTimeMillis = (long) (1000 / evictionFrequency);
- while (true) {
+ while (!closed) {
try {
doCacheEviction();
@@ -514,6 +514,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
statsTask.cancel(true);
flushCursorsTask.cancel(true);
+ cacheEvictionExecutor.shutdownNow();
List<String> ledgerNames = new ArrayList<>(this.ledgers.keySet());
List<CompletableFuture<Void>> futures = new ArrayList<>(ledgerNames.size());
@@ -594,7 +595,6 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
}));
}
}));
- cacheEvictionExecutor.shutdownNow();
entryCacheManager.clear();
return FutureUtil.waitForAll(futures);
}
@@ -608,6 +608,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
statsTask.cancel(true);
flushCursorsTask.cancel(true);
+ cacheEvictionExecutor.shutdownNow();
// take a snapshot of ledgers currently in the map to prevent race conditions
List<CompletableFuture<ManagedLedgerImpl>> ledgers = new ArrayList<>(this.ledgers.values());
@@ -651,7 +652,6 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
}
scheduledExecutor.shutdownNow();
- cacheEvictionExecutor.shutdownNow();
entryCacheManager.clear();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index fcba675a4b7..227088019f0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -178,12 +178,13 @@ public class PulsarFunctionTlsTest {
void tearDown() throws Exception {
try {
for (int i = 0; i < BROKER_COUNT; i++) {
- if (pulsarServices[i] != null) {
- pulsarServices[i].close();
- }
if (pulsarAdmins[i] != null) {
pulsarAdmins[i].close();
}
+ if (pulsarServices[i] != null) {
+ pulsarServices[i].close();
+ }
+
}
bkEnsemble.stop();
} finally {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 61f68938f74..2cb0e62f8e3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -174,9 +174,9 @@ public class PulsarFunctionTlsTest {
log.info("--- Shutting down ---");
try {
functionAdmin.close();
- bkEnsemble.stop();
- workerServer.stop();
functionsWorkerService.stop();
+ workerServer.stop();
+ bkEnsemble.stop();
} finally {
if (tempDirectory != null) {
tempDirectory.delete();
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index fc13647e905..a03eb3f5a72 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -707,10 +707,10 @@ public abstract class TestPulsarConnector {
when(PulsarConnectorCache.instance.getManagedLedgerFactory()).thenReturn(managedLedgerFactory);
for (Map.Entry<TopicName, PulsarSplit> split : splits.entrySet()) {
- PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor(
+ PulsarRecordCursor pulsarRecordCursor = new PulsarRecordCursor(
topicsToColumnHandles.get(split.getKey()), split.getValue(),
pulsarConnectorConfig, managedLedgerFactory, new ManagedLedgerConfig(),
- new PulsarConnectorMetricsTracker(new NullStatsProvider()),dispatchingRowDecoderFactory));
+ new PulsarConnectorMetricsTracker(new NullStatsProvider()),dispatchingRowDecoderFactory);
this.pulsarRecordCursors.put(split.getKey(), pulsarRecordCursor);
}
}