You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/12/02 12:20:24 UTC

[pulsar] branch master updated: Attempt to fix flakiness in ZKSessionTest and quarantine the test (#13087)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9eaf2b5  Attempt to fix flakiness in ZKSessionTest and quarantine the test (#13087)
9eaf2b5 is described below

commit 9eaf2b5ed3dd73c00125292b1199ace8b8140fe3
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Dec 2 14:18:13 2021 +0200

    Attempt to fix flakiness in ZKSessionTest and quarantine the test (#13087)
---
 .../pulsar/metadata/BaseMetadataStoreTest.java     |  5 ++++-
 .../org/apache/pulsar/metadata/TestZKServer.java   | 23 +++++++++++++++++-----
 .../org/apache/pulsar/metadata/ZKSessionTest.java  |  8 +++++---
 3 files changed, 27 insertions(+), 9 deletions(-)

diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
index 3b902ee..fcb6b77 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
@@ -43,7 +43,10 @@ public abstract class BaseMetadataStoreTest extends TestRetrySupport {
     @Override
     public final void cleanup() throws Exception {
         markCurrentSetupNumberCleaned();
-        zks.close();
+        if (zks != null) {
+            zks.close();
+            zks = null;
+        }
     }
 
     private static String createTempFolder() {
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java
index d936294..6605af4 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java
@@ -45,6 +45,7 @@ import org.assertj.core.util.Files;
 
 @Slf4j
 public class TestZKServer implements AutoCloseable {
+    public static final int TICK_TIME = 1000;
     protected ZooKeeperServer zks;
     private final File zkDataDir;
     private ServerCnxnFactory serverFactory;
@@ -63,7 +64,7 @@ public class TestZKServer implements AutoCloseable {
     }
 
     public void start() throws Exception {
-        this.zks = new ZooKeeperServer(zkDataDir, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME);
+        this.zks = new ZooKeeperServer(zkDataDir, zkDataDir, TICK_TIME);
         this.serverFactory = new NIOServerCnxnFactory();
         this.serverFactory.configure(new InetSocketAddress(zkPort), 1000);
         this.serverFactory.startup(zks, true);
@@ -97,16 +98,28 @@ public class TestZKServer implements AutoCloseable {
     }
 
     public void stop() throws Exception {
-        if (zks != null) {
-            zks.shutdown();
-            zks.getZKDatabase().close();
-            zks = null;
+        if (containerManager != null) {
+            containerManager.stop();
+            containerManager = null;
         }
 
         if (serverFactory != null) {
             serverFactory.shutdown();
             serverFactory = null;
         }
+
+        if (zks != null) {
+            SessionTracker sessionTracker = zks.getSessionTracker();
+            zks.shutdown();
+            zks.getZKDatabase().close();
+            if (sessionTracker instanceof Thread) {
+                Thread sessionTrackerThread = (Thread) sessionTracker;
+                sessionTrackerThread.interrupt();
+                sessionTrackerThread.join();
+            }
+            zks = null;
+        }
+
         log.info("Stopped test ZK server");
     }
 
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
index 76e14bb..5f45f9c 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import java.time.Duration;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -39,6 +40,7 @@ import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
+@Test(groups = "quarantine")
 public class ZKSessionTest extends BaseMetadataStoreTest {
 
     @Test
@@ -70,7 +72,7 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
         @Cleanup
         MetadataStoreExtended store = MetadataStoreExtended.create(zks.getConnectionString(),
                 MetadataStoreConfig.builder()
-                        .sessionTimeoutMillis(10_000)
+                        .sessionTimeoutMillis(5_000)
                         .build());
 
         BlockingQueue<SessionEvent> sessionEvents = new LinkedBlockingQueue<>();
@@ -166,13 +168,13 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
         e = sessionEvents.poll(10, TimeUnit.SECONDS);
         assertEquals(e, SessionEvent.SessionLost);
         // --- test  le1 can be leader
-        Awaitility.await()
+        Awaitility.await().atMost(Duration.ofSeconds(15))
                 .untilAsserted(()-> assertEquals(le1.getState(),LeaderElectionState.Leading)); // reacquire leadership
         e = sessionEvents.poll(10, TimeUnit.SECONDS);
         assertEquals(e, SessionEvent.Reconnected);
         e = sessionEvents.poll(10, TimeUnit.SECONDS);
         assertEquals(e, SessionEvent.SessionReestablished);
-        Awaitility.await()
+        Awaitility.await().atMost(Duration.ofSeconds(15))
                 .untilAsserted(()-> assertEquals(le1.getState(),LeaderElectionState.Leading));
         assertTrue(store.get(path).join().isPresent());
     }