You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/02/28 21:09:06 UTC

[incubator-pinot] branch master updated: Actively check cluster changes if there is no callback for a long time (#3885)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4964f91  Actively check cluster changes if there is no callback for a long time (#3885)
4964f91 is described below

commit 4964f91d43647031d192b1b34d52af4ddbd87ec1
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Feb 28 13:09:01 2019 -0800

    Actively check cluster changes if there is no callback for a long time (#3885)
    
    We encountered a site issue recently, and is suspecting that Helix callback for cluster changes is not working, probably because ZK re-connection.
    This PR enabled the proactive cluster change check if there is no callback for 1 hour.
    
    Changes include:
    1. Rewrite ClusterChangeMediator to proactively perform cluster change check
    2. Disable the Helix batch-mode and perform deduplication in ClusterChangeMediator
    3. Disable the Helix pre-fetch to reduce the ZK accesses
    4. Add interface ClusterChangeHandler for general cluster change handle
    5. Add ClusterChangeHandler implementation: ExternalViewChangeHandler, InstanceConfigChangeHandler, LiveInstanceChangeHandler
    6. Add metrics to track CLUSTER_CHANGE_QUEUE_TIME and PROACTIVE_CLUSTER_CHANGE_CHECK count
    
    Testing done:
    1. Run ClusterChangeMediator without starting the cluster change handling thread, the backup inline change processing is working properly
    2. Set proactive change check interval to 1 second, both callback and proactive change check work properly
---
 .../pinot/broker/broker/BrokerServerBuilder.java   |  10 +-
 .../broker/broker/helix/ClusterChangeHandler.java  |  33 +---
 .../broker/broker/helix/ClusterChangeMediator.java | 207 ++++++++++++---------
 .../broker/helix/ExternalViewChangeHandler.java    |  43 +++++
 .../broker/broker/helix/HelixBrokerStarter.java    |  32 +++-
 .../broker/helix/InstanceConfigChangeHandler.java  |  32 +---
 ...nerImpl.java => LiveInstanceChangeHandler.java} |  56 +++---
 .../ConnectionPoolBrokerRequestHandler.java        |  11 +-
 .../apache/pinot/common/metrics/BrokerMeter.java   |   4 +-
 .../apache/pinot/common/metrics/BrokerTimer.java   |   4 +-
 10 files changed, 240 insertions(+), 192 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
index dced4b2..39b25bf 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
 import com.yammer.metrics.core.MetricsRegistry;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.configuration.Configuration;
-import org.apache.pinot.broker.broker.helix.LiveInstancesChangeListenerImpl;
+import org.apache.pinot.broker.broker.helix.LiveInstanceChangeHandler;
 import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
 import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
 import org.apache.pinot.broker.requesthandler.ConnectionPoolBrokerRequestHandler;
@@ -60,7 +60,7 @@ public class BrokerServerBuilder {
   private final long _delayedShutdownTimeMs;
   private final RoutingTable _routingTable;
   private final TimeBoundaryService _timeBoundaryService;
-  private final LiveInstancesChangeListenerImpl _liveInstanceChangeListener;
+  private final LiveInstanceChangeHandler _liveInstanceChangeHandler;
   private final TableQueryQuotaManager _tableQueryQuotaManager;
   private final AccessControlFactory _accessControlFactory;
   private final MetricsRegistry _metricsRegistry;
@@ -69,13 +69,13 @@ public class BrokerServerBuilder {
   private final BrokerAdminApiApplication _brokerAdminApplication;
 
   public BrokerServerBuilder(Configuration config, RoutingTable routingTable, TimeBoundaryService timeBoundaryService,
-      LiveInstancesChangeListenerImpl liveInstanceChangeListener, TableQueryQuotaManager tableQueryQuotaManager) {
+      LiveInstanceChangeHandler liveInstanceChangeHandler, TableQueryQuotaManager tableQueryQuotaManager) {
     _state.set(State.INIT);
     _config = config;
     _delayedShutdownTimeMs = config.getLong(DELAY_SHUTDOWN_TIME_MS_CONFIG, DEFAULT_DELAY_SHUTDOWN_TIME_MS);
     _routingTable = routingTable;
     _timeBoundaryService = timeBoundaryService;
-    _liveInstanceChangeListener = liveInstanceChangeListener;
+    _liveInstanceChangeHandler = liveInstanceChangeHandler;
     _tableQueryQuotaManager = tableQueryQuotaManager;
     _accessControlFactory = AccessControlFactory.loadFactory(_config.subset(ACCESS_CONTROL_PREFIX));
     _metricsRegistry = new MetricsRegistry();
@@ -96,7 +96,7 @@ public class BrokerServerBuilder {
     } else {
       LOGGER.info("Using ConnectionPoolBrokerRequestHandler");
       return new ConnectionPoolBrokerRequestHandler(_config, _routingTable, _timeBoundaryService, _accessControlFactory,
-          _tableQueryQuotaManager, _brokerMetrics, _liveInstanceChangeListener, _metricsRegistry);
+          _tableQueryQuotaManager, _brokerMetrics, _liveInstanceChangeHandler, _metricsRegistry);
     }
   }
 
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
similarity index 52%
copy from pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
copy to pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
index ebd8740..e0a3e6c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
@@ -16,37 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics;
-
-import org.apache.pinot.common.Utils;
-
+package org.apache.pinot.broker.broker.helix;
 
 /**
- * Enumeration containing all the timers exposed by the Pinot broker.
- *
+ * Handles cluster changes such as external view changes, instance config changes, live instance changes etc.
  */
-public enum BrokerTimer implements AbstractMetrics.Timer {
-  ROUTING_TABLE_UPDATE_TIME(true), ROUTING_TABLE_UPDATE_QUEUE_TIME(true);
-  private final String timerName;
-  private final boolean global;
-
-  BrokerTimer(boolean global) {
-    this.global = global;
-    this.timerName = Utils.toCamelCase(name().toLowerCase());
-  }
-
-  @Override
-  public String getTimerName() {
-    return timerName;
-  }
+public interface ClusterChangeHandler {
 
-  /**
-   * Returns true if the timer is global (not attached to a particular resource)
-   *
-   * @return true if the timer is global
-   */
-  @Override
-  public boolean isGlobal() {
-    return global;
-  }
+  void processClusterChange();
 }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
index 9a8be33..1d2dbe6 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
@@ -18,21 +18,21 @@
  */
 package org.apache.pinot.broker.broker.helix;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.api.listeners.BatchMode;
 import org.apache.helix.api.listeners.ExternalViewChangeListener;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
+import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
-import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
+import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.BrokerTimer;
 import org.slf4j.Logger;
@@ -40,120 +40,143 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Manages the interactions between Helix cluster changes, the routing table and the connection pool.
+ * The {@code ClusterChangeMediator} handles the changes from Helix cluster.
+ * <p>
+ * <p>If there is no change callback in 1 hour, proactively check changes so that the changes are getting processed even
+ * when callbacks stop working.
+ * <p>NOTE: disable Helix batch-mode and perform deduplication in this class. This can give us more control on the
+ * frequency of change checks, and let us track the cluster change queue time.
+ * <p>NOTE: disable Helix pre-fetch to reduce the ZK accesses.
  */
-public class ClusterChangeMediator implements LiveInstanceChangeListener, ExternalViewChangeListener, InstanceConfigChangeListener {
+@BatchMode(enabled = false)
+@PreFetch(enabled = false)
+public class ClusterChangeMediator implements ExternalViewChangeListener, InstanceConfigChangeListener, LiveInstanceChangeListener {
   private static final Logger LOGGER = LoggerFactory.getLogger(ClusterChangeMediator.class);
-  private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
-  private final TableQueryQuotaManager _tableQueryQuotaManager;
 
-  private enum UpdateType {
-    EXTERNAL_VIEW, INSTANCE_CONFIG
-  }
+  // Add 1 second interval between change checks to deduplicate multiple changes of the same type
+  private static final long CHANGE_CHECK_INTERVAL_MS = 1000L;
+  // If no change got for 1 hour, proactively check changes
+  private static final long PROACTIVE_CHANGE_CHECK_INTERVAL_MS = 3600 * 1000L;
+
+  private final Map<ChangeType, ClusterChangeHandler> _changeHandlerMap;
+  private final Map<ChangeType, Long> _lastChangeTimeMap = new ConcurrentHashMap<>();
+  private final Map<ChangeType, Long> _lastProcessTimeMap = new ConcurrentHashMap<>();
 
-  private final LinkedBlockingQueue<Pair<UpdateType, Long>> _clusterChangeQueue = new LinkedBlockingQueue<>(1000);
+  private final Thread _clusterChangeHandlingThread;
 
-  private Thread _deferredClusterUpdater = null;
+  private volatile boolean _stopped = false;
 
-  public ClusterChangeMediator(HelixExternalViewBasedRouting helixExternalViewBasedRouting,
-      TableQueryQuotaManager tableQueryQuotaManager, final BrokerMetrics brokerMetrics) {
-    _helixExternalViewBasedRouting = helixExternalViewBasedRouting;
-    _tableQueryQuotaManager = tableQueryQuotaManager;
+  public ClusterChangeMediator(Map<ChangeType, ClusterChangeHandler> changeHandlerMap, BrokerMetrics brokerMetrics) {
+    _changeHandlerMap = changeHandlerMap;
+
+    // Initialize last process time map
+    long initTime = System.currentTimeMillis();
+    for (ChangeType changeType : changeHandlerMap.keySet()) {
+      _lastProcessTimeMap.put(changeType, initTime);
+    }
 
-    // Simple thread that polls every 10 seconds to check if there are any cluster updates to apply
-    _deferredClusterUpdater = new Thread("Deferred cluster state updater") {
+    _clusterChangeHandlingThread = new Thread("ClusterChangeHandlingThread") {
       @Override
       public void run() {
-        while (true) {
+        while (!_stopped) {
           try {
-            // Wait for at least one update
-            Pair<UpdateType, Long> firstUpdate = _clusterChangeQueue.take();
-
-            // Update the queue time metrics
-            long queueTime = System.currentTimeMillis() - firstUpdate.getValue();
-            brokerMetrics.addTimedValue(BrokerTimer.ROUTING_TABLE_UPDATE_QUEUE_TIME, queueTime, TimeUnit.MILLISECONDS);
-
-            // Take all other updates also present
-            List<Pair<UpdateType, Long>> allUpdates = new ArrayList<>();
-            allUpdates.add(firstUpdate);
-            _clusterChangeQueue.drainTo(allUpdates);
-
-            // Gather all update types
-            boolean externalViewUpdated = false;
-            boolean instanceConfigUpdated = false;
-
-            for (Pair<UpdateType, Long> update : allUpdates) {
-              if (update.getKey() == UpdateType.EXTERNAL_VIEW) {
-                externalViewUpdated = true;
-              } else if (update.getKey() == UpdateType.INSTANCE_CONFIG) {
-                instanceConfigUpdated = true;
+            for (Map.Entry<ChangeType, ClusterChangeHandler> entry : _changeHandlerMap.entrySet()) {
+              ChangeType changeType = entry.getKey();
+              ClusterChangeHandler changeHandler = entry.getValue();
+              long currentTime = System.currentTimeMillis();
+              Long lastChangeTime = _lastChangeTimeMap.remove(changeType);
+              if (lastChangeTime != null) {
+                brokerMetrics.addTimedValue(BrokerTimer.CLUSTER_CHANGE_QUEUE_TIME, currentTime - lastChangeTime,
+                    TimeUnit.MILLISECONDS);
+                processClusterChange(changeType, changeHandler);
+              } else {
+                long lastProcessTime = _lastProcessTimeMap.get(changeType);
+                if (currentTime - lastProcessTime > PROACTIVE_CHANGE_CHECK_INTERVAL_MS) {
+                  LOGGER.info("Proactive check {} change", changeType);
+                  brokerMetrics.addMeteredGlobalValue(BrokerMeter.PROACTIVE_CLUSTER_CHANGE_CHECK, 1L);
+                  processClusterChange(changeType, changeHandler);
+                }
               }
             }
 
-            if (externalViewUpdated) {
-              try {
-                _helixExternalViewBasedRouting.processExternalViewChange();
-                _tableQueryQuotaManager.processQueryQuotaChange();
-              } catch (Exception e) {
-                LOGGER.warn("Caught exception while updating external view", e);
-              }
-            }
-
-            if (instanceConfigUpdated) {
-              try {
-                _helixExternalViewBasedRouting.processInstanceConfigChange();
-              } catch (Exception e) {
-                LOGGER.warn("Caught exception while processing instance config", e);
-              }
-            }
-          } catch (InterruptedException e) {
-            LOGGER.warn("Was interrupted while waiting for a cluster change", e);
-            break;
+            // Add an interval between change checks to deduplicate multiple changes of the same type
+            Thread.sleep(CHANGE_CHECK_INTERVAL_MS);
+          } catch (Exception e) {
+            LOGGER.error("Caught exception within cluster change handling thread", e);
           }
         }
-
-        LOGGER.warn("Stopping deferred cluster state update thread");
-        _deferredClusterUpdater = null;
       }
     };
+  }
 
-    _deferredClusterUpdater.start();
+  private void processClusterChange(ChangeType changeType, ClusterChangeHandler changeHandler) {
+    long startTime = System.currentTimeMillis();
+    LOGGER.info("Start processing {} change", changeType);
+    changeHandler.processClusterChange();
+    long endTime = System.currentTimeMillis();
+    LOGGER.info("Finish processing {} change in {}ms", changeType, endTime - startTime);
+    _lastProcessTimeMap.put(changeType, endTime);
+  }
+
+  /**
+   * Starts the cluster change mediator.
+   */
+  public void start() {
+    LOGGER.info("Starting the cluster change handling thread");
+    _clusterChangeHandlingThread.start();
+  }
+
+  /**
+   * Stops the cluster change mediator.
+   */
+  public void stop() {
+    LOGGER.info("Stopping the cluster change handling thread");
+    _stopped = true;
+    try {
+      _clusterChangeHandlingThread.join();
+    } catch (InterruptedException e) {
+      LOGGER.error("Caught InterruptedException while waiting for cluster change handling thread to die");
+      Thread.currentThread().interrupt();
+    }
   }
 
   @Override
   public void onExternalViewChange(List<ExternalView> externalViewList, NotificationContext changeContext) {
-    // If the deferred update thread is alive, defer the update
-    if (_deferredClusterUpdater != null && _deferredClusterUpdater.isAlive()) {
-      try {
-        _clusterChangeQueue.put(new ImmutablePair<>(UpdateType.EXTERNAL_VIEW, System.currentTimeMillis()));
-      } catch (InterruptedException e) {
-        LOGGER.warn("Was interrupted while trying to add external view change to queue", e);
-      }
-    } else {
-      LOGGER.warn(
-          "Deferred cluster updater thread is null or stopped, not deferring external view routing table rebuild");
-      _helixExternalViewBasedRouting.processExternalViewChange();
-      _tableQueryQuotaManager.processQueryQuotaChange();
-    }
+    // External view list should be empty because Helix pre-fetch is disabled
+    assert externalViewList.isEmpty();
+
+    enqueueChange(ChangeType.EXTERNAL_VIEW);
   }
 
   @Override
-  public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, NotificationContext context) {
-    // If the deferred update thread is alive, defer the update
-    if (_deferredClusterUpdater != null && _deferredClusterUpdater.isAlive()) {
-      try {
-        _clusterChangeQueue.put(new ImmutablePair<>(UpdateType.INSTANCE_CONFIG, System.currentTimeMillis()));
-      } catch (InterruptedException e) {
-        LOGGER.warn("Was interrupted while trying to add external view change to queue", e);
-      }
-    } else {
-      LOGGER.warn(
-          "Deferred cluster updater thread is null or stopped, not deferring instance config change notification");
-      _helixExternalViewBasedRouting.processInstanceConfigChange();
-    }
+  public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, NotificationContext changeContext) {
+    // Instance config list should be empty because Helix pre-fetch is disabled
+    assert instanceConfigs.isEmpty();
+
+    enqueueChange(ChangeType.INSTANCE_CONFIG);
   }
 
   @Override
   public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) {
+    // Live instance list should be empty because Helix pre-fetch is disabled
+    assert liveInstances.isEmpty();
+
+    enqueueChange(ChangeType.LIVE_INSTANCE);
+  }
+
+  /**
+   * Helper method to enqueue a change from the Helix callback to be processed by the cluster change handling thread. If
+   * the handling thread is dead, directly process the change.
+   *
+   * @param changeType Type of the change
+   */
+  private void enqueueChange(ChangeType changeType) {
+    if (_clusterChangeHandlingThread.isAlive()) {
+      LOGGER.info("Enqueue {} change", changeType);
+      _lastChangeTimeMap.put(changeType, System.currentTimeMillis());
+    } else {
+      LOGGER.error("Cluster change handling thread is not alive, directly process the {} change", changeType);
+      processClusterChange(changeType, _changeHandlerMap.get(changeType));
+    }
   }
 }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ExternalViewChangeHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ExternalViewChangeHandler.java
new file mode 100644
index 0000000..1422953
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ExternalViewChangeHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.broker.helix;
+
+import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
+
+
+/**
+ * Cluster change handler for external view changes.
+ */
+public class ExternalViewChangeHandler implements ClusterChangeHandler {
+  private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
+  private final TableQueryQuotaManager _tableQueryQuotaManager;
+
+  public ExternalViewChangeHandler(HelixExternalViewBasedRouting helixExternalViewBasedRouting,
+      TableQueryQuotaManager tableQueryQuotaManager) {
+    _helixExternalViewBasedRouting = helixExternalViewBasedRouting;
+    _tableQueryQuotaManager = tableQueryQuotaManager;
+  }
+
+  @Override
+  public void processClusterChange() {
+    _helixExternalViewBasedRouting.processExternalViewChange();
+    _tableQueryQuotaManager.processQueryQuotaChange();
+  }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index c446c70..1ff0de6 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -21,12 +21,15 @@ package org.apache.pinot.broker.broker.helix;
 import com.google.common.collect.ImmutableList;
 import com.yammer.metrics.core.MetricsRegistry;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
@@ -68,9 +71,10 @@ public class HelixBrokerStarter {
   private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
   private final BrokerServerBuilder _brokerServerBuilder;
   private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
-  private final LiveInstancesChangeListenerImpl _liveInstancesListener;
+  private final LiveInstanceChangeHandler _liveInstanceChangeHandler;
   private final MetricsRegistry _metricsRegistry;
   private final TableQueryQuotaManager _tableQueryQuotaManager;
+  private final ClusterChangeMediator _clusterChangeMediator;
   private final TimeboundaryRefreshMessageHandlerFactory _tbiMessageHandler;
 
   // Set after broker is started, which is actually in the constructor.
@@ -90,8 +94,6 @@ public class HelixBrokerStarter {
       throws Exception {
     LOGGER.info("Starting Pinot broker");
 
-    _liveInstancesListener = new LiveInstancesChangeListenerImpl(helixClusterName);
-
     _pinotHelixProperties = DefaultHelixBrokerConfig.getDefaultBrokerConf(pinotHelixProperties);
 
     if (brokerHost == null) {
@@ -118,14 +120,21 @@ public class HelixBrokerStarter {
     _helixExternalViewBasedRouting = new HelixExternalViewBasedRouting(_propertyStore, _spectatorHelixManager,
         pinotHelixProperties.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
     _tableQueryQuotaManager = new TableQueryQuotaManager(_spectatorHelixManager);
+    _liveInstanceChangeHandler = new LiveInstanceChangeHandler(_spectatorHelixManager);
+    Map<ChangeType, ClusterChangeHandler> clusterChangeHandlerMap = new HashMap<>();
+    clusterChangeHandlerMap.put(ChangeType.EXTERNAL_VIEW,
+        new ExternalViewChangeHandler(_helixExternalViewBasedRouting, _tableQueryQuotaManager));
+    clusterChangeHandlerMap
+        .put(ChangeType.INSTANCE_CONFIG, new InstanceConfigChangeHandler(_helixExternalViewBasedRouting));
+    clusterChangeHandlerMap.put(ChangeType.LIVE_INSTANCE, _liveInstanceChangeHandler);
     _brokerServerBuilder = startBroker(_pinotHelixProperties);
     _metricsRegistry = _brokerServerBuilder.getMetricsRegistry();
-    ClusterChangeMediator clusterChangeMediator =
-        new ClusterChangeMediator(_helixExternalViewBasedRouting, _tableQueryQuotaManager,
-            _brokerServerBuilder.getBrokerMetrics());
-    _spectatorHelixManager.addExternalViewChangeListener(clusterChangeMediator);
-    _spectatorHelixManager.addInstanceConfigChangeListener(clusterChangeMediator);
-    _spectatorHelixManager.addLiveInstanceChangeListener(_liveInstancesListener);
+    _clusterChangeMediator =
+        new ClusterChangeMediator(clusterChangeHandlerMap, _brokerServerBuilder.getBrokerMetrics());
+    _clusterChangeMediator.start();
+    _spectatorHelixManager.addExternalViewChangeListener(_clusterChangeMediator);
+    _spectatorHelixManager.addInstanceConfigChangeListener(_clusterChangeMediator);
+    _spectatorHelixManager.addLiveInstanceChangeListener(_clusterChangeMediator);
 
     // Connect participant Helix manager.
     _helixManager =
@@ -193,7 +202,7 @@ public class HelixBrokerStarter {
       config = DefaultHelixBrokerConfig.getDefaultBrokerConf();
     }
     BrokerServerBuilder brokerServerBuilder = new BrokerServerBuilder(config, _helixExternalViewBasedRouting,
-        _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstancesListener, _tableQueryQuotaManager);
+        _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstanceChangeHandler, _tableQueryQuotaManager);
     _accessControlFactory = brokerServerBuilder.getAccessControlFactory();
     _helixExternalViewBasedRouting.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
     _tableQueryQuotaManager.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
@@ -281,10 +290,13 @@ public class HelixBrokerStarter {
       LOGGER.info("Disconnecting spectator Helix manager");
       _spectatorHelixManager.disconnect();
     }
+
     if (_tbiMessageHandler != null) {
       LOGGER.info("Shutting down timeboundary info refresh message handler");
       _tbiMessageHandler.shutdown();
     }
+
+    _clusterChangeMediator.stop();
   }
 
   public MetricsRegistry getMetricsRegistry() {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/InstanceConfigChangeHandler.java
similarity index 53%
copy from pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
copy to pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/InstanceConfigChangeHandler.java
index ebd8740..bfba1af 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/InstanceConfigChangeHandler.java
@@ -16,37 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics;
+package org.apache.pinot.broker.broker.helix;
 
-import org.apache.pinot.common.Utils;
+import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
 
 
 /**
- * Enumeration containing all the timers exposed by the Pinot broker.
- *
+ * Cluster change handler for instance config changes.
  */
-public enum BrokerTimer implements AbstractMetrics.Timer {
-  ROUTING_TABLE_UPDATE_TIME(true), ROUTING_TABLE_UPDATE_QUEUE_TIME(true);
-  private final String timerName;
-  private final boolean global;
-
-  BrokerTimer(boolean global) {
-    this.global = global;
-    this.timerName = Utils.toCamelCase(name().toLowerCase());
-  }
+public class InstanceConfigChangeHandler implements ClusterChangeHandler {
+  private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
 
-  @Override
-  public String getTimerName() {
-    return timerName;
+  public InstanceConfigChangeHandler(HelixExternalViewBasedRouting helixExternalViewBasedRouting) {
+    _helixExternalViewBasedRouting = helixExternalViewBasedRouting;
   }
 
-  /**
-   * Returns true if the timer is global (not attached to a particular resource)
-   *
-   * @return true if the timer is global
-   */
   @Override
-  public boolean isGlobal() {
-    return global;
+  public void processClusterChange() {
+    _helixExternalViewBasedRouting.processInstanceConfigChange();
   }
 }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstancesChangeListenerImpl.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java
similarity index 66%
rename from pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstancesChangeListenerImpl.java
rename to pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java
index 5c192ad..aaecb47 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstancesChangeListenerImpl.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java
@@ -21,8 +21,9 @@ package org.apache.pinot.broker.broker.helix;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.api.listeners.LiveInstanceChangeListener;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.model.LiveInstance;
 import org.apache.pinot.common.response.ServerInstance;
 import org.apache.pinot.common.utils.CommonConstants;
@@ -32,35 +33,40 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class LiveInstancesChangeListenerImpl implements LiveInstanceChangeListener {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(LiveInstancesChangeListenerImpl.class);
+/**
+ * Cluster change handler for live instance changes.
+ */
+public class LiveInstanceChangeHandler implements ClusterChangeHandler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(LiveInstanceChangeHandler.class);
 
   private static final boolean DO_NOT_RECREATE = false;
 
-  private long timeout;
-  private final Map<String, String> liveInstanceToSessionIdMap;
-  private KeyedPool<PooledNettyClientResourceManager.PooledClientConnection> connectionPool;
+  private final HelixDataAccessor _helixDataAccessor;
+  private final PropertyKey _liveInstancesKey;
 
-  public LiveInstancesChangeListenerImpl(String clusterName) {
-    this.liveInstanceToSessionIdMap = new HashMap<String, String>();
+  private KeyedPool<PooledNettyClientResourceManager.PooledClientConnection> _connectionPool;
+  private Map<String, String> _liveInstanceToSessionIdMap;
+
+  public LiveInstanceChangeHandler(HelixManager helixManager) {
+    _helixDataAccessor = helixManager.getHelixDataAccessor();
+    _liveInstancesKey = new PropertyKey.Builder(helixManager.getClusterName()).liveInstances();
   }
 
-  public void init(final KeyedPool<PooledNettyClientResourceManager.PooledClientConnection> connectionPool,
-      final long timeout) {
-    this.connectionPool = connectionPool;
-    this.timeout = timeout;
+  public void init(KeyedPool<PooledNettyClientResourceManager.PooledClientConnection> connectionPool) {
+    _connectionPool = connectionPool;
+    _liveInstanceToSessionIdMap = new HashMap<>();
   }
 
   @Override
-  public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) {
-    if (connectionPool == null) {
-      LOGGER.warn("init has not been called on the live instances listener, ignoring live instance change.");
+  public void processClusterChange() {
+    // Skip processing live instance change for single-connection routing
+    if (_connectionPool == null) {
       return;
     }
 
-    for (LiveInstance instance : liveInstances) {
+    List<LiveInstance> liveInstances = _helixDataAccessor.getChildValues(_liveInstancesKey);
 
+    for (LiveInstance instance : liveInstances) {
       String instanceId = instance.getInstanceName();
       String sessionId = instance.getSessionId();
 
@@ -80,15 +86,15 @@ public class LiveInstancesChangeListenerImpl implements LiveInstanceChangeListen
             .warn("Port for server instance {} does not appear to be numeric, defaulting to {}.", instanceId, port, e);
       }
 
-      if (liveInstanceToSessionIdMap.containsKey(instanceId)) {
+      if (_liveInstanceToSessionIdMap.containsKey(instanceId)) {
         // sessionId has changed
-        if (!sessionId.equals(liveInstanceToSessionIdMap.get(instanceId))) {
+        if (!sessionId.equals(_liveInstanceToSessionIdMap.get(instanceId))) {
           try {
             LOGGER.info("Instance {} has changed session id {} -> {}, validating connection pool for this instance.",
-                instanceId, sessionId, liveInstanceToSessionIdMap.get(instanceId));
+                instanceId, sessionId, _liveInstanceToSessionIdMap.get(instanceId));
             ServerInstance ins = ServerInstance.forHostPort(hostName, port);
-            connectionPool.validatePool(ins, DO_NOT_RECREATE);
-            liveInstanceToSessionIdMap.put(instanceId, sessionId);
+            _connectionPool.validatePool(ins, DO_NOT_RECREATE);
+            _liveInstanceToSessionIdMap.put(instanceId, sessionId);
           } catch (Exception e) {
             LOGGER.error("Error trying to validate & destroy dead connections for {}", instanceId, e);
           }
@@ -99,8 +105,8 @@ public class LiveInstancesChangeListenerImpl implements LiveInstanceChangeListen
         // lets first check if the connection is valid or not
         try {
           ServerInstance ins = ServerInstance.forHostPort(hostName, port);
-          connectionPool.validatePool(ins, DO_NOT_RECREATE);
-          liveInstanceToSessionIdMap.put(instanceId, sessionId);
+          _connectionPool.validatePool(ins, DO_NOT_RECREATE);
+          _liveInstanceToSessionIdMap.put(instanceId, sessionId);
         } catch (Exception e) {
           LOGGER.error("Error trying to destroy dead connections for {}", instanceId, e);
         }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
index 068a137..0202315 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
@@ -36,7 +36,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.broker.api.RequestStatistics;
 import org.apache.pinot.broker.broker.AccessControlFactory;
-import org.apache.pinot.broker.broker.helix.LiveInstancesChangeListenerImpl;
+import org.apache.pinot.broker.broker.helix.LiveInstanceChangeHandler;
 import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
 import org.apache.pinot.broker.routing.RoutingTable;
 import org.apache.pinot.broker.routing.TimeBoundaryService;
@@ -51,7 +51,6 @@ import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.response.ServerInstance;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.serde.SerDe;
@@ -80,7 +79,7 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
   private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionPoolBrokerRequestHandler.class);
   private static final String TRANSPORT_CONFIG_PREFIX = "pinot.broker.transport";
 
-  private final LiveInstancesChangeListenerImpl _liveInstanceChangeListener;
+  private final LiveInstanceChangeHandler _liveInstanceChangeHandler;
   private final EventLoopGroup _eventLoopGroup;
   private final ScheduledThreadPoolExecutor _poolTimeoutExecutor;
   private final ExecutorService _requestSenderPool;
@@ -90,9 +89,9 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
   public ConnectionPoolBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
       TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics,
-      LiveInstancesChangeListenerImpl liveInstanceChangeListener, MetricsRegistry metricsRegistry) {
+      LiveInstanceChangeHandler liveInstanceChangeHandler, MetricsRegistry metricsRegistry) {
     super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, brokerMetrics);
-    _liveInstanceChangeListener = liveInstanceChangeListener;
+    _liveInstanceChangeHandler = liveInstanceChangeHandler;
 
     TransportClientConf transportClientConf = new TransportClientConf();
     transportClientConf.init(_config.subset(TRANSPORT_CONFIG_PREFIX));
@@ -120,7 +119,7 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
   @Override
   public synchronized void start() {
     _connPool.start();
-    _liveInstanceChangeListener.init(_connPool, CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS);
+    _liveInstanceChangeHandler.init(_connPool);
   }
 
   @Override
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index 55c05a5..170e745 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -97,7 +97,9 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
   // Netty connection metrics
   NETTY_CONNECTION_REQUESTS_SENT("nettyConnection", true),
   NETTY_CONNECTION_BYTES_SENT("nettyConnection", true),
-  NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true);
+  NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),
+
+  PROACTIVE_CLUSTER_CHANGE_CHECK("proactiveClusterChangeCheck", true);
 
   private final String brokerMeterName;
   private final String unit;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
index ebd8740..694db60 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
@@ -26,7 +26,9 @@ import org.apache.pinot.common.Utils;
  *
  */
 public enum BrokerTimer implements AbstractMetrics.Timer {
-  ROUTING_TABLE_UPDATE_TIME(true), ROUTING_TABLE_UPDATE_QUEUE_TIME(true);
+  ROUTING_TABLE_UPDATE_TIME(true),
+  CLUSTER_CHANGE_QUEUE_TIME(true);
+
   private final String timerName;
   private final boolean global;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org