You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/03/01 03:04:53 UTC

[incubator-pinot] branch cluster_change_mediator created (now 3d68076)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch cluster_change_mediator
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 3d68076  In ClusterChangeMediator, remove sleep and make it notify based

This branch includes the following new commits:

     new 3d68076  In ClusterChangeMediator, remove sleep and make it notify based

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: In ClusterChangeMediator, remove sleep and make it notify based

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch cluster_change_mediator
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 3d68076bfa32bdc8c68c0f7ce39cb28ae94b798d
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Thu Feb 28 18:50:04 2019 -0800

    In ClusterChangeMediator, remove sleep and make it notify based
    
    When we get the Helix cluster change callbacks, we want to process them ASAP
    Replace sleep() with wait() and notify() so that the new changes are processed immediately
    
    Testing done:
    - Check that the queue time matches with the processing time
    - Run tests without starting the cluster change handling thread
    - Reduce proactive change check interval to 1 second and check it interacts correctly with Helix callbacks
---
 .../broker/broker/helix/ClusterChangeMediator.java | 37 +++++++++++++++-------
 1 file changed, 25 insertions(+), 12 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
index 1d2dbe6..728a366 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pinot.broker.broker.helix;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.NotificationContext;
@@ -44,8 +44,8 @@ import org.slf4j.LoggerFactory;
  * <p>
  * <p>If there is no change callback in 1 hour, proactively check changes so that the changes are getting processed even
  * when callbacks stop working.
- * <p>NOTE: disable Helix batch-mode and perform deduplication in this class. This can give us more control on the
- * frequency of change checks, and let us track the cluster change queue time.
+ * <p>NOTE: disable Helix batch-mode and perform deduplication in this class. This can save us the extra threads for
+ * handling Helix batch-mode, and let us track the cluster change queue time.
  * <p>NOTE: disable Helix pre-fetch to reduce the ZK accesses.
  */
 @BatchMode(enabled = false)
@@ -53,14 +53,12 @@ import org.slf4j.LoggerFactory;
 public class ClusterChangeMediator implements ExternalViewChangeListener, InstanceConfigChangeListener, LiveInstanceChangeListener {
   private static final Logger LOGGER = LoggerFactory.getLogger(ClusterChangeMediator.class);
 
-  // Add 1 second interval between change checks to deduplicate multiple changes of the same type
-  private static final long CHANGE_CHECK_INTERVAL_MS = 1000L;
   // If no change got for 1 hour, proactively check changes
   private static final long PROACTIVE_CHANGE_CHECK_INTERVAL_MS = 3600 * 1000L;
 
   private final Map<ChangeType, ClusterChangeHandler> _changeHandlerMap;
-  private final Map<ChangeType, Long> _lastChangeTimeMap = new ConcurrentHashMap<>();
-  private final Map<ChangeType, Long> _lastProcessTimeMap = new ConcurrentHashMap<>();
+  private final Map<ChangeType, Long> _lastChangeTimeMap = new HashMap<>();
+  private final Map<ChangeType, Long> _lastProcessTimeMap = new HashMap<>();
 
   private final Thread _clusterChangeHandlingThread;
 
@@ -84,7 +82,10 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
               ChangeType changeType = entry.getKey();
               ClusterChangeHandler changeHandler = entry.getValue();
               long currentTime = System.currentTimeMillis();
-              Long lastChangeTime = _lastChangeTimeMap.remove(changeType);
+              Long lastChangeTime;
+              synchronized (_lastChangeTimeMap) {
+                lastChangeTime = _lastChangeTimeMap.remove(changeType);
+              }
               if (lastChangeTime != null) {
                 brokerMetrics.addTimedValue(BrokerTimer.CLUSTER_CHANGE_QUEUE_TIME, currentTime - lastChangeTime,
                     TimeUnit.MILLISECONDS);
@@ -98,10 +99,16 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
                 }
               }
             }
-
-            // Add an interval between change checks to deduplicate multiple changes of the same type
-            Thread.sleep(CHANGE_CHECK_INTERVAL_MS);
+            synchronized (_lastChangeTimeMap) {
+              // Wait for at most 1/10 of proactive change check interval if no new event received. This can guarantee
+              // that the proactive change check will not be delayed for more than 1/10 of the interval. In case of
+              // spurious wakeup, execute the while loop again for the proactive change check.
+              if (_lastChangeTimeMap.isEmpty()) {
+                _lastChangeTimeMap.wait(PROACTIVE_CHANGE_CHECK_INTERVAL_MS / 10);
+              }
+            }
           } catch (Exception e) {
+            // Ignore all exceptions. The thread keeps running until ClusterChangeMediator.stop() is invoked.
             LOGGER.error("Caught exception within cluster change handling thread", e);
           }
         }
@@ -132,6 +139,9 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
   public void stop() {
     LOGGER.info("Stopping the cluster change handling thread");
     _stopped = true;
+    synchronized (_lastChangeTimeMap) {
+      _lastChangeTimeMap.notify();
+    }
     try {
       _clusterChangeHandlingThread.join();
     } catch (InterruptedException e) {
@@ -173,7 +183,10 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
   private void enqueueChange(ChangeType changeType) {
     if (_clusterChangeHandlingThread.isAlive()) {
       LOGGER.info("Enqueue {} change", changeType);
-      _lastChangeTimeMap.put(changeType, System.currentTimeMillis());
+      synchronized (_lastChangeTimeMap) {
+        _lastChangeTimeMap.putIfAbsent(changeType, System.currentTimeMillis());
+        _lastChangeTimeMap.notify();
+      }
     } else {
       LOGGER.error("Cluster change handling thread is not alive, directly process the {} change", changeType);
       processClusterChange(changeType, _changeHandlerMap.get(changeType));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org