You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2019/02/27 02:14:39 UTC

[GitHub] Jackie-Jiang commented on a change in pull request #3885: Actively check cluster changes if there is no callback for a long time

Jackie-Jiang commented on a change in pull request #3885: Actively check cluster changes if there is no callback for a long time
URL: https://github.com/apache/incubator-pinot/pull/3885#discussion_r260568919
 
 

 ##########
 File path: pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
 ##########
 @@ -18,142 +18,136 @@
  */
 package org.apache.pinot.broker.broker.helix;
 
-import java.util.ArrayList;
+import com.google.common.base.Preconditions;
 import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.api.listeners.BatchMode;
 import org.apache.helix.api.listeners.ExternalViewChangeListener;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
+import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
-import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
+import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.BrokerTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * Manages the interactions between Helix cluster changes, the routing table and the connection pool.
+ * The {@code ClusterChangeMediator} handles the changes from Helix cluster.
+ * <p>
+ * <p>If there is no change callback in 1 hour, proactively check changes so that the changes are getting processed even
+ * when callbacks stop working.
+ * <p>NOTE: disable Helix batch-mode and perform deduplication in this class.
+ * <p>NOTE: disable Helix pre-fetch to reduce the ZK accesses.
  */
-public class ClusterChangeMediator implements LiveInstanceChangeListener, ExternalViewChangeListener, InstanceConfigChangeListener {
+@BatchMode(enabled = false)
+@PreFetch(enabled = false)
+public class ClusterChangeMediator implements ExternalViewChangeListener, InstanceConfigChangeListener, LiveInstanceChangeListener {
   private static final Logger LOGGER = LoggerFactory.getLogger(ClusterChangeMediator.class);
-  private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
-  private final TableQueryQuotaManager _tableQueryQuotaManager;
 
-  private enum UpdateType {
-    EXTERNAL_VIEW, INSTANCE_CONFIG
-  }
+  private static final long PROACTIVE_CHANGE_CHECK_INTERVAL_MS = 3600 * 1000L;
+
+  private final Map<ChangeType, ClusterChangeHandler> _changeHandlerMap;
+  private final Map<ChangeType, Long> _lastChangeTimeMap = new ConcurrentHashMap<>();
+  private final Map<ChangeType, Long> _lastProcessTimeMap = new ConcurrentHashMap<>();
 
-  private final LinkedBlockingQueue<Pair<UpdateType, Long>> _clusterChangeQueue = new LinkedBlockingQueue<>(1000);
+  private final Thread _clusterChangeHandlingThread;
 
-  private Thread _deferredClusterUpdater = null;
+  public ClusterChangeMediator(Map<ChangeType, ClusterChangeHandler> changeHandlerMap, BrokerMetrics brokerMetrics) {
+    _changeHandlerMap = changeHandlerMap;
 
-  public ClusterChangeMediator(HelixExternalViewBasedRouting helixExternalViewBasedRouting,
-      TableQueryQuotaManager tableQueryQuotaManager, final BrokerMetrics brokerMetrics) {
-    _helixExternalViewBasedRouting = helixExternalViewBasedRouting;
-    _tableQueryQuotaManager = tableQueryQuotaManager;
+    // Initialize last process time map
+    long initTime = System.currentTimeMillis();
+    for (ChangeType changeType : changeHandlerMap.keySet()) {
+      _lastProcessTimeMap.put(changeType, initTime);
+    }
 
-    // Simple thread that polls every 10 seconds to check if there are any cluster updates to apply
-    _deferredClusterUpdater = new Thread("Deferred cluster state updater") {
+    _clusterChangeHandlingThread = new Thread("ClusterChangeHandlingThread") {
       @Override
       public void run() {
         while (true) {
           try {
-            // Wait for at least one update
-            Pair<UpdateType, Long> firstUpdate = _clusterChangeQueue.take();
-
-            // Update the queue time metrics
-            long queueTime = System.currentTimeMillis() - firstUpdate.getValue();
-            brokerMetrics.addTimedValue(BrokerTimer.ROUTING_TABLE_UPDATE_QUEUE_TIME, queueTime, TimeUnit.MILLISECONDS);
-
-            // Take all other updates also present
-            List<Pair<UpdateType, Long>> allUpdates = new ArrayList<>();
-            allUpdates.add(firstUpdate);
-            _clusterChangeQueue.drainTo(allUpdates);
-
-            // Gather all update types
-            boolean externalViewUpdated = false;
-            boolean instanceConfigUpdated = false;
-
-            for (Pair<UpdateType, Long> update : allUpdates) {
-              if (update.getKey() == UpdateType.EXTERNAL_VIEW) {
-                externalViewUpdated = true;
-              } else if (update.getKey() == UpdateType.INSTANCE_CONFIG) {
-                instanceConfigUpdated = true;
-              }
-            }
-
-            if (externalViewUpdated) {
-              try {
-                _helixExternalViewBasedRouting.processExternalViewChange();
-                _tableQueryQuotaManager.processQueryQuotaChange();
-              } catch (Exception e) {
-                LOGGER.warn("Caught exception while updating external view", e);
+            for (Map.Entry<ChangeType, ClusterChangeHandler> entry : _changeHandlerMap.entrySet()) {
+              ChangeType changeType = entry.getKey();
+              ClusterChangeHandler changeHandler = entry.getValue();
+              long currentTime = System.currentTimeMillis();
+              Long lastChangeTime = _lastChangeTimeMap.remove(changeType);
+              if (lastChangeTime != null) {
+                brokerMetrics.addTimedValue(BrokerTimer.CLUSTER_CHANGE_QUEUE_TIME, currentTime - lastChangeTime,
+                    TimeUnit.MILLISECONDS);
+                processClusterChange(changeType, changeHandler);
+              } else {
+                long lastProcessTime = _lastProcessTimeMap.get(changeType);
+                if (currentTime - lastProcessTime > PROACTIVE_CHANGE_CHECK_INTERVAL_MS) {
 
 Review comment:
   This only acts as a fall back mechanism. We should get an INIT callback when we start the cluster.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org