You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by am...@apache.org on 2024/03/08 06:38:01 UTC

(druid) branch 29.0.1 updated: Fix race in BaseNodeRoleWatcher tests (#16064) (#16079)

This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch 29.0.1
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/29.0.1 by this push:
     new 6241af733c1 Fix race in BaseNodeRoleWatcher tests (#16064) (#16079)
6241af733c1 is described below

commit 6241af733c18e6b73bcf69e542a4b86252a83800
Author: AmatyaAvadhanula <am...@imply.io>
AuthorDate: Fri Mar 8 12:07:54 2024 +0530

    Fix race in BaseNodeRoleWatcher tests (#16064) (#16079)
    
    * Fix race in BaseNodeRoleWatcher tests
    
    * Make non static
---
 .../discovery/K8sDruidNodeDiscoveryProvider.java   |  2 +-
 .../CuratorDruidNodeDiscoveryProvider.java         |  2 +-
 .../druid/discovery/BaseNodeRoleWatcher.java       | 41 +++++++++++++---------
 .../druid/discovery/BaseNodeRoleWatcherTest.java   | 16 +++++----
 4 files changed, 36 insertions(+), 25 deletions(-)

diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
index 3cdafe0952c..559b53b6080 100644
--- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
+++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
@@ -210,7 +210,7 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
       this.k8sApiClient = k8sApiClient;
 
       this.nodeRole = nodeRole;
-      this.baseNodeRoleWatcher = new BaseNodeRoleWatcher(listenerExecutor, nodeRole);
+      this.baseNodeRoleWatcher = BaseNodeRoleWatcher.create(listenerExecutor, nodeRole);
 
       this.watcherErrorRetryWaitMS = watcherErrorRetryWaitMS;
     }
diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
index 7000246dfb4..1a3f8be9cb0 100644
--- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
+++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
@@ -185,7 +185,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
       this.curatorFramework = curatorFramework;
       this.nodeRole = nodeRole;
       this.jsonMapper = jsonMapper;
-      this.baseNodeRoleWatcher = new BaseNodeRoleWatcher(listenerExecutor, nodeRole);
+      this.baseNodeRoleWatcher = BaseNodeRoleWatcher.create(listenerExecutor, nodeRole);
 
       // This is required to be single threaded from docs in PathChildrenCache.
       this.cacheExecutor = Execs.singleThreaded(
diff --git a/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java b/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java
index 5de0a14b6d5..0484e229a26 100644
--- a/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java
+++ b/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java
@@ -76,28 +76,24 @@ public class BaseNodeRoleWatcher
       NodeRole nodeRole
   )
   {
-    this(listenerExecutor, nodeRole, DEFAULT_TIMEOUT_SECONDS);
+    this.nodeRole = nodeRole;
+    this.listenerExecutor = listenerExecutor;
   }
 
-  BaseNodeRoleWatcher(
+  public static BaseNodeRoleWatcher create(
       ScheduledExecutorService listenerExecutor,
-      NodeRole nodeRole,
-      long timeout
+      NodeRole nodeRole
   )
   {
-    this.nodeRole = nodeRole;
-    this.listenerExecutor = listenerExecutor;
-    this.listenerExecutor.schedule(
-        this::cacheInitializedTimedOut,
-        timeout,
-        TimeUnit.SECONDS
-    );
+    BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(listenerExecutor, nodeRole);
+    nodeRoleWatcher.scheduleTimeout(DEFAULT_TIMEOUT_SECONDS);
+    return nodeRoleWatcher;
   }
 
   public Collection<DiscoveryDruidNode> getAllNodes()
   {
     try {
-      cacheInitialized.await();
+      awaitInitialization();
     }
     catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
@@ -255,12 +251,9 @@ public class BaseNodeRoleWatcher
     synchronized (lock) {
       // No need to wait on CountDownLatch, because we are holding the lock under which it could only be
       // counted down.
-      if (cacheInitialized.getCount() == 0) {
-        LOGGER.warn("Cache for node watcher of role[%s] is already initialized. ignoring timeout.", nodeRole.getJsonName());
-        return;
+      if (cacheInitialized.getCount() != 0) {
+        cacheInitialized(true);
       }
-
-      cacheInitialized(true);
     }
   }
 
@@ -352,6 +345,20 @@ public class BaseNodeRoleWatcher
     }
   }
 
+  void scheduleTimeout(long timeout)
+  {
+    listenerExecutor.schedule(
+        this::cacheInitializedTimedOut,
+        timeout,
+        TimeUnit.SECONDS
+    );
+  }
+
+  void awaitInitialization() throws InterruptedException
+  {
+    cacheInitialized.await();
+  }
+
   private void safeSchedule(Runnable runnable, String errMsgFormat, Object... args)
   {
     listenerExecutor.submit(() -> {
diff --git a/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java b/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java
index f4fc454968d..fa3cd0f4483 100644
--- a/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java
+++ b/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java
@@ -40,7 +40,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class BaseNodeRoleWatcherTest
@@ -62,7 +61,7 @@ public class BaseNodeRoleWatcherTest
   @Test(timeout = 60_000L)
   public void testGeneralUseSimulation()
   {
-    BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, NodeRole.BROKER);
+    BaseNodeRoleWatcher nodeRoleWatcher = BaseNodeRoleWatcher.create(exec, NodeRole.BROKER);
 
     DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker1");
     DiscoveryDruidNode broker2 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker2");
@@ -139,7 +138,7 @@ public class BaseNodeRoleWatcherTest
   @Test(timeout = 60_000L)
   public void testRegisterListenerBeforeTimeout() throws InterruptedException
   {
-    BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, NodeRole.BROKER, 1);
+    BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, NodeRole.BROKER);
 
     TestListener listener1 = new TestListener();
     nodeRoleWatcher.registerListener(listener1);
@@ -161,16 +160,18 @@ public class BaseNodeRoleWatcherTest
 
     assertListener(listener1, false, Collections.emptyList(), Collections.emptyList());
 
-    Assert.assertTrue(listener1.ready.await(1500, TimeUnit.MILLISECONDS));
+    nodeRoleWatcher.scheduleTimeout(0);
+    nodeRoleWatcher.awaitInitialization();
+
     Assert.assertTrue(listener1.nodeViewInitializationTimedOut.get());
 
     assertListener(listener1, true, ImmutableList.of(broker1, broker3), ImmutableList.of());
   }
 
   @Test(timeout = 60_000L)
-  public void testGetAllNodesBeforeTimeout()
+  public void testGetAllNodesBeforeTimeout() throws InterruptedException
   {
-    BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, NodeRole.BROKER, 1);
+    BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, NodeRole.BROKER);
 
     TestListener listener1 = new TestListener();
     nodeRoleWatcher.registerListener(listener1);
@@ -193,6 +194,9 @@ public class BaseNodeRoleWatcherTest
 
     assertListener(listener1, false, Collections.emptyList(), Collections.emptyList());
 
+    nodeRoleWatcher.scheduleTimeout(0);
+    nodeRoleWatcher.awaitInitialization();
+
     Assert.assertEquals(2, nodeRoleWatcher.getAllNodes().size());
 
     Assert.assertTrue(listener1.nodeViewInitializationTimedOut.get());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org