You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "gianm (via GitHub)" <gi...@apache.org> on 2024/03/01 17:34:30 UTC

Re: [PR] Handle uninitialized cache in Node role watchers (druid)

gianm commented on code in PR #15726:
URL: https://github.com/apache/druid/pull/15726#discussion_r1509295355


##########
server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java:
##########
@@ -209,13 +221,59 @@ private void removeNode(DiscoveryDruidNode druidNode)
     }
   }
 
+  public void cacheInitializedTimedOut()
+  {
+    synchronized (lock) {
+      LOGGER.warn("Cache for node role [%s] could not be initialized before timeout.", nodeRole.getJsonName());
+      // No need to wait on CountDownLatch, because we are holding the lock under which it could only be
+      // counted down.
+      if (cacheInitialized.getCount() == 0) {
+        LOGGER.warn("Cache for node watcher of role[%s] is already initialized. ignoring timeout.", nodeRole.getJsonName());
+        return;
+      }
+
+      // It is important to take a snapshot here as list of nodes might change by the time listeners process
+      // the changes.
+      List<DiscoveryDruidNode> currNodes = Lists.newArrayList(nodes.values());
+      LOGGER.info(
+          "Node watcher of role [%s] is now initialized with %d nodes.",
+          nodeRole.getJsonName(),
+          currNodes.size());
+
+      for (DruidNodeDiscovery.Listener listener : nodeListeners) {
+        safeSchedule(
+            () -> {
+              listener.nodesAdded(currNodes);

Review Comment:
   This logic is very similar to `cacheInitialized()`, it just calls the timed-out version instead. Just as a matter of code structure and maintainability, it would be good to extract shared logic to a private method like `cacheInitialized(boolean timedOut)`, and have both `cacheInitializedTimedOut()` and `cacheInitialized()` call that method.
   
   Also `cacheInitializedTimedOut()` should be private (or package-private if you're calling it from unit tests), rather than public. It's not part of the API for callers.



##########
server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java:
##########
@@ -59,38 +59,46 @@ public class BaseNodeRoleWatcher
   private final ConcurrentMap<String, DiscoveryDruidNode> nodes = new ConcurrentHashMap<>();
   private final Collection<DiscoveryDruidNode> unmodifiableNodes = Collections.unmodifiableCollection(nodes.values());
 
-  private final ExecutorService listenerExecutor;
+  private final ScheduledExecutorService listenerExecutor;
 
   private final List<DruidNodeDiscovery.Listener> nodeListeners = new ArrayList<>();
 
   private final Object lock = new Object();
 
+  // Always countdown under lock
   private final CountDownLatch cacheInitialized = new CountDownLatch(1);
 
+  private volatile boolean cacheInitializationTimedOut = false;
+
   public BaseNodeRoleWatcher(
-      ExecutorService listenerExecutor,
+      ScheduledExecutorService listenerExecutor,
       NodeRole nodeRole
   )
   {
-    this.listenerExecutor = listenerExecutor;
     this.nodeRole = nodeRole;
+    this.listenerExecutor = listenerExecutor;
+    this.listenerExecutor.schedule(
+        this::cacheInitializedTimedOut,
+        30L,
+        TimeUnit.SECONDS
+    );
   }
 
   public Collection<DiscoveryDruidNode> getAllNodes()
   {
+    if (cacheInitializationTimedOut) {
+      return unmodifiableNodes;
+    }
     boolean nodeViewInitialized;
     try {
-      nodeViewInitialized = cacheInitialized.await((long) 30, TimeUnit.SECONDS);
+      nodeViewInitialized = cacheInitialized.await(30L, TimeUnit.SECONDS);

Review Comment:
   There's no need to wait for 30s here, since the constructor sets up a 30s timeout on its own, and ensures that the list is populated if the 30s timeout is hit. So it's better to do `await()` here and then, when the await returns, return `unmodifiableNodes`.



##########
server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java:
##########
@@ -116,6 +116,19 @@ public void testGeneralUseSimulation()
     assertListener(listener3, true, nodesAdded, nodesRemoved);
   }
 
+  @Test(timeout = 60_000L)
+  public void testTimeOutAfterInitialization() throws InterruptedException

Review Comment:
   Please also include tests for the case where some nodes are added, but initialization is never called. In the test cases, verify the behavior of these scenarios:
   
   - `registerListener` is called before the timeout: verify the listener does get the proper notifications
   - `getAllNodes` is called before the timeout: verify the call does return the list of nodes
   
   These scenarios should be verified in separate test cases to ensure that they each work in isolation.



##########
server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java:
##########
@@ -59,38 +59,46 @@ public class BaseNodeRoleWatcher
   private final ConcurrentMap<String, DiscoveryDruidNode> nodes = new ConcurrentHashMap<>();
   private final Collection<DiscoveryDruidNode> unmodifiableNodes = Collections.unmodifiableCollection(nodes.values());
 
-  private final ExecutorService listenerExecutor;
+  private final ScheduledExecutorService listenerExecutor;
 
   private final List<DruidNodeDiscovery.Listener> nodeListeners = new ArrayList<>();
 
   private final Object lock = new Object();
 
+  // Always countdown under lock
   private final CountDownLatch cacheInitialized = new CountDownLatch(1);
 
+  private volatile boolean cacheInitializationTimedOut = false;
+
   public BaseNodeRoleWatcher(
-      ExecutorService listenerExecutor,
+      ScheduledExecutorService listenerExecutor,
       NodeRole nodeRole
   )
   {
-    this.listenerExecutor = listenerExecutor;
     this.nodeRole = nodeRole;
+    this.listenerExecutor = listenerExecutor;
+    this.listenerExecutor.schedule(
+        this::cacheInitializedTimedOut,
+        30L,
+        TimeUnit.SECONDS
+    );
   }
 
   public Collection<DiscoveryDruidNode> getAllNodes()
   {
+    if (cacheInitializationTimedOut) {

Review Comment:
   This short-circuit doesn't seem necessary.



##########
server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java:
##########
@@ -116,6 +116,19 @@ public void testGeneralUseSimulation()
     assertListener(listener3, true, nodesAdded, nodesRemoved);
   }
 
+  @Test(timeout = 60_000L)
+  public void testTimeOutAfterInitialization() throws InterruptedException
+  {
+    BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(
+        Execs.scheduledSingleThreaded("BaseNodeRoleWatcher"),
+        NodeRole.BROKER
+    );
+    TestListener listener = new TestListener();
+    nodeRoleWatcher.registerListener(listener);
+    Thread.sleep(32_000);

Review Comment:
   We want tests to run quickly, so a 32s sleep is too long. You can handle this by introducing a parameter we can use for testing that controls the timeout, and set it shorter during the unit test.



##########
server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java:
##########
@@ -235,6 +235,28 @@ public void nodeViewInitialized()
           }
         }
       }
+
+      @Override
+      public void nodeViewInitializedTimedOut()
+      {
+        synchronized (lock) {
+          if (uninitializedNodeRoles == 0) {
+            log.error("Unexpected call of nodeViewInitializedTimedOut()");
+            return;
+          }
+          uninitializedNodeRoles--;
+          if (uninitializedNodeRoles == 0) {
+            for (Listener listener : listeners) {
+              try {
+                listener.nodeViewInitializedTimedOut();
+              }
+              catch (Exception ex) {
+                log.error(ex, "Listener[%s].nodeViewInitializedTimedOut() threw exception. Ignored.", listener);

Review Comment:
   This logic looks very similar to the `nodeViewInitialized()` logic. Better to extract the shared stuff to a private method like `nodeViewInitialized(boolean timedOut)`.



##########
server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java:
##########
@@ -209,13 +221,59 @@ private void removeNode(DiscoveryDruidNode druidNode)
     }
   }
 
+  public void cacheInitializedTimedOut()
+  {
+    synchronized (lock) {
+      LOGGER.warn("Cache for node role [%s] could not be initialized before timeout.", nodeRole.getJsonName());

Review Comment:
   This method is scheduled unconditionally for 30s after construction. It shouldn't log any warnings if the cache has already been initialized by then, otherwise people will see these warnings even when nothing has actually timed out.



##########
server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java:
##########
@@ -59,38 +59,46 @@ public class BaseNodeRoleWatcher
   private final ConcurrentMap<String, DiscoveryDruidNode> nodes = new ConcurrentHashMap<>();
   private final Collection<DiscoveryDruidNode> unmodifiableNodes = Collections.unmodifiableCollection(nodes.values());
 
-  private final ExecutorService listenerExecutor;
+  private final ScheduledExecutorService listenerExecutor;
 
   private final List<DruidNodeDiscovery.Listener> nodeListeners = new ArrayList<>();
 
   private final Object lock = new Object();
 
+  // Always countdown under lock
   private final CountDownLatch cacheInitialized = new CountDownLatch(1);
 
+  private volatile boolean cacheInitializationTimedOut = false;
+
   public BaseNodeRoleWatcher(
-      ExecutorService listenerExecutor,
+      ScheduledExecutorService listenerExecutor,
       NodeRole nodeRole
   )
   {
-    this.listenerExecutor = listenerExecutor;
     this.nodeRole = nodeRole;
+    this.listenerExecutor = listenerExecutor;
+    this.listenerExecutor.schedule(
+        this::cacheInitializedTimedOut,
+        30L,
+        TimeUnit.SECONDS
+    );
   }
 
   public Collection<DiscoveryDruidNode> getAllNodes()
   {
+    if (cacheInitializationTimedOut) {
+      return unmodifiableNodes;
+    }
     boolean nodeViewInitialized;
     try {
-      nodeViewInitialized = cacheInitialized.await((long) 30, TimeUnit.SECONDS);
+      nodeViewInitialized = cacheInitialized.await(30L, TimeUnit.SECONDS);
     }
     catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
       nodeViewInitialized = false;
     }
     if (!nodeViewInitialized) {
-      LOGGER.info(
-          "Cache for node role [%s] not initialized yet; getAllNodes() might not return full information.",
-          nodeRole.getJsonName()
-      );
+      cacheInitializedTimedOut();

Review Comment:
   There is no need to call `cacheInitializedTimedOut` here. It only needs to be called once, and the `schedule` in the constructor takes care of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org