You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/03/01 18:34:33 UTC

[incubator-pinot] branch master updated: In ClusterChangeMediator, remove sleep and make it notify based (#3898)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9dda23f  In ClusterChangeMediator, remove sleep and make it notify based (#3898)
9dda23f is described below

commit 9dda23f791c0b2fd6f09c83e973516941e724748
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Mar 1 10:34:28 2019 -0800

    In ClusterChangeMediator, remove sleep and make it notify based (#3898)
    
    When we get the Helix cluster change callbacks, we want to process them ASAP
    Replace sleep() with wait() and notify() so that the new changes are processed immediately
    
    Testing done:
    - Check that the queue time matches with the processing time
    - Run tests without starting the cluster change handling thread
    - Reduce proactive change check interval to 1 second and check it interacts correctly with Helix callbacks
---
 .../broker/broker/helix/ClusterChangeMediator.java | 39 +++++++++++++++-------
 1 file changed, 27 insertions(+), 12 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
index 1d2dbe6..0653ddc 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pinot.broker.broker.helix;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.NotificationContext;
@@ -44,8 +44,8 @@ import org.slf4j.LoggerFactory;
  * <p>
  * <p>If there is no change callback in 1 hour, proactively check changes so that the changes are getting processed even
  * when callbacks stop working.
- * <p>NOTE: disable Helix batch-mode and perform deduplication in this class. This can give us more control on the
- * frequency of change checks, and let us track the cluster change queue time.
+ * <p>NOTE: disable Helix batch-mode and perform deduplication in this class. This can save us the extra threads for
+ * handling Helix batch-mode, and let us track the cluster change queue time.
  * <p>NOTE: disable Helix pre-fetch to reduce the ZK accesses.
  */
 @BatchMode(enabled = false)
@@ -53,14 +53,12 @@ import org.slf4j.LoggerFactory;
 public class ClusterChangeMediator implements ExternalViewChangeListener, InstanceConfigChangeListener, LiveInstanceChangeListener {
   private static final Logger LOGGER = LoggerFactory.getLogger(ClusterChangeMediator.class);
 
-  // Add 1 second interval between change checks to deduplicate multiple changes of the same type
-  private static final long CHANGE_CHECK_INTERVAL_MS = 1000L;
   // If no change got for 1 hour, proactively check changes
   private static final long PROACTIVE_CHANGE_CHECK_INTERVAL_MS = 3600 * 1000L;
 
   private final Map<ChangeType, ClusterChangeHandler> _changeHandlerMap;
-  private final Map<ChangeType, Long> _lastChangeTimeMap = new ConcurrentHashMap<>();
-  private final Map<ChangeType, Long> _lastProcessTimeMap = new ConcurrentHashMap<>();
+  private final Map<ChangeType, Long> _lastChangeTimeMap = new HashMap<>();
+  private final Map<ChangeType, Long> _lastProcessTimeMap = new HashMap<>();
 
   private final Thread _clusterChangeHandlingThread;
 
@@ -84,7 +82,10 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
               ChangeType changeType = entry.getKey();
               ClusterChangeHandler changeHandler = entry.getValue();
               long currentTime = System.currentTimeMillis();
-              Long lastChangeTime = _lastChangeTimeMap.remove(changeType);
+              Long lastChangeTime;
+              synchronized (_lastChangeTimeMap) {
+                lastChangeTime = _lastChangeTimeMap.remove(changeType);
+              }
               if (lastChangeTime != null) {
                 brokerMetrics.addTimedValue(BrokerTimer.CLUSTER_CHANGE_QUEUE_TIME, currentTime - lastChangeTime,
                     TimeUnit.MILLISECONDS);
@@ -98,10 +99,16 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
                 }
               }
             }
-
-            // Add an interval between change checks to deduplicate multiple changes of the same type
-            Thread.sleep(CHANGE_CHECK_INTERVAL_MS);
+            synchronized (_lastChangeTimeMap) {
+              // Wait for at most 1/10 of proactive change check interval if no new event received. This can guarantee
+              // that the proactive change check will not be delayed for more than 1/10 of the interval. In case of
+              // spurious wakeup, execute the while loop again for the proactive change check.
+              if (_lastChangeTimeMap.isEmpty()) {
+                _lastChangeTimeMap.wait(PROACTIVE_CHANGE_CHECK_INTERVAL_MS / 10);
+              }
+            }
           } catch (Exception e) {
+            // Ignore all exceptions. The thread keeps running until ClusterChangeMediator.stop() is invoked.
             LOGGER.error("Caught exception within cluster change handling thread", e);
           }
         }
@@ -132,6 +139,9 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
   public void stop() {
     LOGGER.info("Stopping the cluster change handling thread");
     _stopped = true;
+    synchronized (_lastChangeTimeMap) {
+      _lastChangeTimeMap.notify();
+    }
     try {
       _clusterChangeHandlingThread.join();
     } catch (InterruptedException e) {
@@ -173,7 +183,12 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
   private void enqueueChange(ChangeType changeType) {
     if (_clusterChangeHandlingThread.isAlive()) {
       LOGGER.info("Enqueue {} change", changeType);
-      _lastChangeTimeMap.put(changeType, System.currentTimeMillis());
+      synchronized (_lastChangeTimeMap) {
+        if (!_lastChangeTimeMap.containsKey(changeType)) {
+          _lastChangeTimeMap.put(changeType, System.currentTimeMillis());
+          _lastChangeTimeMap.notify();
+        }
+      }
     } else {
       LOGGER.error("Cluster change handling thread is not alive, directly process the {} change", changeType);
       processClusterChange(changeType, _changeHandlerMap.get(changeType));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org