You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2019/02/27 01:52:13 UTC

[GitHub] mcvsubbu commented on a change in pull request #3885: Actively check cluster changes if there is no callback for a long time

mcvsubbu commented on a change in pull request #3885: Actively check cluster changes if there is no callback for a long time
URL: https://github.com/apache/incubator-pinot/pull/3885#discussion_r260563318
 
 

 ##########
 File path: pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
 ##########
 @@ -18,142 +18,136 @@
  */
 package org.apache.pinot.broker.broker.helix;
 
-import java.util.ArrayList;
+import com.google.common.base.Preconditions;
 import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.api.listeners.BatchMode;
 import org.apache.helix.api.listeners.ExternalViewChangeListener;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
+import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
-import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
+import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.BrokerTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * Manages the interactions between Helix cluster changes, the routing table and the connection pool.
+ * The {@code ClusterChangeMediator} handles the changes from Helix cluster.
+ * <p>
+ * <p>If there is no change callback in 1 hour, proactively check changes so that the changes are getting processed even
+ * when callbacks stop working.
+ * <p>NOTE: disable Helix batch-mode and perform deduplication in this class.
+ * <p>NOTE: disable Helix pre-fetch to reduce the ZK accesses.
  */
-public class ClusterChangeMediator implements LiveInstanceChangeListener, ExternalViewChangeListener, InstanceConfigChangeListener {
+@BatchMode(enabled = false)
+@PreFetch(enabled = false)
+public class ClusterChangeMediator implements ExternalViewChangeListener, InstanceConfigChangeListener, LiveInstanceChangeListener {
   private static final Logger LOGGER = LoggerFactory.getLogger(ClusterChangeMediator.class);
-  private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
-  private final TableQueryQuotaManager _tableQueryQuotaManager;
 
-  private enum UpdateType {
-    EXTERNAL_VIEW, INSTANCE_CONFIG
-  }
+  private static final long PROACTIVE_CHANGE_CHECK_INTERVAL_MS = 3600 * 1000L;
+
+  private final Map<ChangeType, ClusterChangeHandler> _changeHandlerMap;
+  private final Map<ChangeType, Long> _lastChangeTimeMap = new ConcurrentHashMap<>();
+  private final Map<ChangeType, Long> _lastProcessTimeMap = new ConcurrentHashMap<>();
 
-  private final LinkedBlockingQueue<Pair<UpdateType, Long>> _clusterChangeQueue = new LinkedBlockingQueue<>(1000);
+  private final Thread _clusterChangeHandlingThread;
 
-  private Thread _deferredClusterUpdater = null;
+  public ClusterChangeMediator(Map<ChangeType, ClusterChangeHandler> changeHandlerMap, BrokerMetrics brokerMetrics) {
+    _changeHandlerMap = changeHandlerMap;
 
-  public ClusterChangeMediator(HelixExternalViewBasedRouting helixExternalViewBasedRouting,
-      TableQueryQuotaManager tableQueryQuotaManager, final BrokerMetrics brokerMetrics) {
-    _helixExternalViewBasedRouting = helixExternalViewBasedRouting;
-    _tableQueryQuotaManager = tableQueryQuotaManager;
+    // Initialize last process time map
+    long initTime = System.currentTimeMillis();
+    for (ChangeType changeType : changeHandlerMap.keySet()) {
+      _lastProcessTimeMap.put(changeType, initTime);
+    }
 
-    // Simple thread that polls every 10 seconds to check if there are any cluster updates to apply
-    _deferredClusterUpdater = new Thread("Deferred cluster state updater") {
+    _clusterChangeHandlingThread = new Thread("ClusterChangeHandlingThread") {
       @Override
       public void run() {
         while (true) {
           try {
-            // Wait for at least one update
-            Pair<UpdateType, Long> firstUpdate = _clusterChangeQueue.take();
-
-            // Update the queue time metrics
-            long queueTime = System.currentTimeMillis() - firstUpdate.getValue();
-            brokerMetrics.addTimedValue(BrokerTimer.ROUTING_TABLE_UPDATE_QUEUE_TIME, queueTime, TimeUnit.MILLISECONDS);
-
-            // Take all other updates also present
-            List<Pair<UpdateType, Long>> allUpdates = new ArrayList<>();
-            allUpdates.add(firstUpdate);
-            _clusterChangeQueue.drainTo(allUpdates);
-
-            // Gather all update types
-            boolean externalViewUpdated = false;
-            boolean instanceConfigUpdated = false;
-
-            for (Pair<UpdateType, Long> update : allUpdates) {
-              if (update.getKey() == UpdateType.EXTERNAL_VIEW) {
-                externalViewUpdated = true;
-              } else if (update.getKey() == UpdateType.INSTANCE_CONFIG) {
-                instanceConfigUpdated = true;
-              }
-            }
-
-            if (externalViewUpdated) {
-              try {
-                _helixExternalViewBasedRouting.processExternalViewChange();
-                _tableQueryQuotaManager.processQueryQuotaChange();
-              } catch (Exception e) {
-                LOGGER.warn("Caught exception while updating external view", e);
+            for (Map.Entry<ChangeType, ClusterChangeHandler> entry : _changeHandlerMap.entrySet()) {
+              ChangeType changeType = entry.getKey();
+              ClusterChangeHandler changeHandler = entry.getValue();
+              long currentTime = System.currentTimeMillis();
+              Long lastChangeTime = _lastChangeTimeMap.remove(changeType);
+              if (lastChangeTime != null) {
+                brokerMetrics.addTimedValue(BrokerTimer.CLUSTER_CHANGE_QUEUE_TIME, currentTime - lastChangeTime,
+                    TimeUnit.MILLISECONDS);
+                processClusterChange(changeType, changeHandler);
+              } else {
+                long lastProcessTime = _lastProcessTimeMap.get(changeType);
+                if (currentTime - lastProcessTime > PROACTIVE_CHANGE_CHECK_INTERVAL_MS) {
+                  LOGGER.info("Proactive check {} change", changeType);
+                  brokerMetrics.addMeteredGlobalValue(BrokerMeter.PROACTIVE_CLUSTER_CHANGE_CHECK, 1L);
+                  processClusterChange(changeType, changeHandler);
+                }
               }
             }
 
-            if (instanceConfigUpdated) {
-              try {
-                _helixExternalViewBasedRouting.processInstanceConfigChange();
-              } catch (Exception e) {
-                LOGGER.warn("Caught exception while processing instance config", e);
-              }
-            }
+            // Sleep 1 second after each round of checks
+            Thread.sleep(1000L);
           } catch (InterruptedException e) {
-            LOGGER.warn("Was interrupted while waiting for a cluster change", e);
+            LOGGER.warn("Cluster change handling thread is interrupted, stopping the thread");
             break;
+          } catch (Exception e) {
+            LOGGER.error("Caught exception while handling changes", e);
           }
         }
-
-        LOGGER.warn("Stopping deferred cluster state update thread");
-        _deferredClusterUpdater = null;
       }
     };
+    _clusterChangeHandlingThread.start();
+  }
 
-    _deferredClusterUpdater.start();
+  private void processClusterChange(ChangeType changeType, ClusterChangeHandler changeHandler) {
+    long startTime = System.currentTimeMillis();
+    LOGGER.info("Start processing {} change", changeType);
+    changeHandler.processClusterChange();
+    long endTime = System.currentTimeMillis();
+    LOGGER.info("Finish processing {} change in {}ms", changeType, endTime - startTime);
+    _lastProcessTimeMap.put(changeType, endTime);
   }
 
   @Override
   public void onExternalViewChange(List<ExternalView> externalViewList, NotificationContext changeContext) {
-    // If the deferred update thread is alive, defer the update
-    if (_deferredClusterUpdater != null && _deferredClusterUpdater.isAlive()) {
-      try {
-        _clusterChangeQueue.put(new ImmutablePair<>(UpdateType.EXTERNAL_VIEW, System.currentTimeMillis()));
-      } catch (InterruptedException e) {
-        LOGGER.warn("Was interrupted while trying to add external view change to queue", e);
-      }
-    } else {
-      LOGGER.warn(
-          "Deferred cluster updater thread is null or stopped, not deferring external view routing table rebuild");
-      _helixExternalViewBasedRouting.processExternalViewChange();
-      _tableQueryQuotaManager.processQueryQuotaChange();
-    }
+    Preconditions.checkState(externalViewList.isEmpty(), "Helix pre-fetch should be disabled");
 
 Review comment:
   If I understand right, the pre-fetch is an annotation on this method. Why not add the annotation and set it to be false? Another way of putting this question: What is an admin supposed to do if they see this pre-condition fail?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org