You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/02/28 21:09:06 UTC
[incubator-pinot] branch master updated: Actively check cluster
changes if there is no callback for a long time (#3885)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4964f91 Actively check cluster changes if there is no callback for a long time (#3885)
4964f91 is described below
commit 4964f91d43647031d192b1b34d52af4ddbd87ec1
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Feb 28 13:09:01 2019 -0800
Actively check cluster changes if there is no callback for a long time (#3885)
We encountered a site issue recently, and is suspecting that Helix callback for cluster changes is not working, probably because ZK re-connection.
This PR enabled the proactive cluster change check if there is no callback for 1 hour.
Changes include:
1. Rewrite ClusterChangeMediator to proactively perform cluster change check
2. Disable the Helix batch-mode and perform deduplication in ClusterChangeMediator
3. Disable the Helix pre-fetch to reduce the ZK accesses
4. Add interface ClusterChangeHandler for general cluster change handle
5. Add ClusterChangeHandler implementation: ExternalViewChangeHandler, InstanceConfigChangeHandler, LiveInstanceChangeHandler
6. Add metrics to track CLUSTER_CHANGE_QUEUE_TIME and PROACTIVE_CLUSTER_CHANGE_CHECK count
Testing done:
1. Run ClusterChangeMediator without starting the cluster change handling thread, the backup inline change processing is working properly
2. Set proactive change check interval to 1 second, both callback and proactive change check work properly
---
.../pinot/broker/broker/BrokerServerBuilder.java | 10 +-
.../broker/broker/helix/ClusterChangeHandler.java | 33 +---
.../broker/broker/helix/ClusterChangeMediator.java | 207 ++++++++++++---------
.../broker/helix/ExternalViewChangeHandler.java | 43 +++++
.../broker/broker/helix/HelixBrokerStarter.java | 32 +++-
.../broker/helix/InstanceConfigChangeHandler.java | 32 +---
...nerImpl.java => LiveInstanceChangeHandler.java} | 56 +++---
.../ConnectionPoolBrokerRequestHandler.java | 11 +-
.../apache/pinot/common/metrics/BrokerMeter.java | 4 +-
.../apache/pinot/common/metrics/BrokerTimer.java | 4 +-
10 files changed, 240 insertions(+), 192 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
index dced4b2..39b25bf 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
import com.yammer.metrics.core.MetricsRegistry;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.configuration.Configuration;
-import org.apache.pinot.broker.broker.helix.LiveInstancesChangeListenerImpl;
+import org.apache.pinot.broker.broker.helix.LiveInstanceChangeHandler;
import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.ConnectionPoolBrokerRequestHandler;
@@ -60,7 +60,7 @@ public class BrokerServerBuilder {
private final long _delayedShutdownTimeMs;
private final RoutingTable _routingTable;
private final TimeBoundaryService _timeBoundaryService;
- private final LiveInstancesChangeListenerImpl _liveInstanceChangeListener;
+ private final LiveInstanceChangeHandler _liveInstanceChangeHandler;
private final TableQueryQuotaManager _tableQueryQuotaManager;
private final AccessControlFactory _accessControlFactory;
private final MetricsRegistry _metricsRegistry;
@@ -69,13 +69,13 @@ public class BrokerServerBuilder {
private final BrokerAdminApiApplication _brokerAdminApplication;
public BrokerServerBuilder(Configuration config, RoutingTable routingTable, TimeBoundaryService timeBoundaryService,
- LiveInstancesChangeListenerImpl liveInstanceChangeListener, TableQueryQuotaManager tableQueryQuotaManager) {
+ LiveInstanceChangeHandler liveInstanceChangeHandler, TableQueryQuotaManager tableQueryQuotaManager) {
_state.set(State.INIT);
_config = config;
_delayedShutdownTimeMs = config.getLong(DELAY_SHUTDOWN_TIME_MS_CONFIG, DEFAULT_DELAY_SHUTDOWN_TIME_MS);
_routingTable = routingTable;
_timeBoundaryService = timeBoundaryService;
- _liveInstanceChangeListener = liveInstanceChangeListener;
+ _liveInstanceChangeHandler = liveInstanceChangeHandler;
_tableQueryQuotaManager = tableQueryQuotaManager;
_accessControlFactory = AccessControlFactory.loadFactory(_config.subset(ACCESS_CONTROL_PREFIX));
_metricsRegistry = new MetricsRegistry();
@@ -96,7 +96,7 @@ public class BrokerServerBuilder {
} else {
LOGGER.info("Using ConnectionPoolBrokerRequestHandler");
return new ConnectionPoolBrokerRequestHandler(_config, _routingTable, _timeBoundaryService, _accessControlFactory,
- _tableQueryQuotaManager, _brokerMetrics, _liveInstanceChangeListener, _metricsRegistry);
+ _tableQueryQuotaManager, _brokerMetrics, _liveInstanceChangeHandler, _metricsRegistry);
}
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
similarity index 52%
copy from pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
copy to pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
index ebd8740..e0a3e6c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
@@ -16,37 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.common.metrics;
-
-import org.apache.pinot.common.Utils;
-
+package org.apache.pinot.broker.broker.helix;
/**
- * Enumeration containing all the timers exposed by the Pinot broker.
- *
+ * Handles cluster changes such as external view changes, instance config changes, live instance changes etc.
*/
-public enum BrokerTimer implements AbstractMetrics.Timer {
- ROUTING_TABLE_UPDATE_TIME(true), ROUTING_TABLE_UPDATE_QUEUE_TIME(true);
- private final String timerName;
- private final boolean global;
-
- BrokerTimer(boolean global) {
- this.global = global;
- this.timerName = Utils.toCamelCase(name().toLowerCase());
- }
-
- @Override
- public String getTimerName() {
- return timerName;
- }
+public interface ClusterChangeHandler {
- /**
- * Returns true if the timer is global (not attached to a particular resource)
- *
- * @return true if the timer is global
- */
- @Override
- public boolean isGlobal() {
- return global;
- }
+ void processClusterChange();
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
index 9a8be33..1d2dbe6 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
@@ -18,21 +18,21 @@
*/
package org.apache.pinot.broker.broker.helix;
-import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.listeners.BatchMode;
import org.apache.helix.api.listeners.ExternalViewChangeListener;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
+import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
-import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
+import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.BrokerTimer;
import org.slf4j.Logger;
@@ -40,120 +40,143 @@ import org.slf4j.LoggerFactory;
/**
- * Manages the interactions between Helix cluster changes, the routing table and the connection pool.
+ * The {@code ClusterChangeMediator} handles the changes from Helix cluster.
+ * <p>
+ * <p>If there is no change callback in 1 hour, proactively check changes so that the changes are getting processed even
+ * when callbacks stop working.
+ * <p>NOTE: disable Helix batch-mode and perform deduplication in this class. This can give us more control on the
+ * frequency of change checks, and let us track the cluster change queue time.
+ * <p>NOTE: disable Helix pre-fetch to reduce the ZK accesses.
*/
-public class ClusterChangeMediator implements LiveInstanceChangeListener, ExternalViewChangeListener, InstanceConfigChangeListener {
+@BatchMode(enabled = false)
+@PreFetch(enabled = false)
+public class ClusterChangeMediator implements ExternalViewChangeListener, InstanceConfigChangeListener, LiveInstanceChangeListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterChangeMediator.class);
- private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
- private final TableQueryQuotaManager _tableQueryQuotaManager;
- private enum UpdateType {
- EXTERNAL_VIEW, INSTANCE_CONFIG
- }
+ // Add 1 second interval between change checks to deduplicate multiple changes of the same type
+ private static final long CHANGE_CHECK_INTERVAL_MS = 1000L;
+ // If no change got for 1 hour, proactively check changes
+ private static final long PROACTIVE_CHANGE_CHECK_INTERVAL_MS = 3600 * 1000L;
+
+ private final Map<ChangeType, ClusterChangeHandler> _changeHandlerMap;
+ private final Map<ChangeType, Long> _lastChangeTimeMap = new ConcurrentHashMap<>();
+ private final Map<ChangeType, Long> _lastProcessTimeMap = new ConcurrentHashMap<>();
- private final LinkedBlockingQueue<Pair<UpdateType, Long>> _clusterChangeQueue = new LinkedBlockingQueue<>(1000);
+ private final Thread _clusterChangeHandlingThread;
- private Thread _deferredClusterUpdater = null;
+ private volatile boolean _stopped = false;
- public ClusterChangeMediator(HelixExternalViewBasedRouting helixExternalViewBasedRouting,
- TableQueryQuotaManager tableQueryQuotaManager, final BrokerMetrics brokerMetrics) {
- _helixExternalViewBasedRouting = helixExternalViewBasedRouting;
- _tableQueryQuotaManager = tableQueryQuotaManager;
+ public ClusterChangeMediator(Map<ChangeType, ClusterChangeHandler> changeHandlerMap, BrokerMetrics brokerMetrics) {
+ _changeHandlerMap = changeHandlerMap;
+
+ // Initialize last process time map
+ long initTime = System.currentTimeMillis();
+ for (ChangeType changeType : changeHandlerMap.keySet()) {
+ _lastProcessTimeMap.put(changeType, initTime);
+ }
- // Simple thread that polls every 10 seconds to check if there are any cluster updates to apply
- _deferredClusterUpdater = new Thread("Deferred cluster state updater") {
+ _clusterChangeHandlingThread = new Thread("ClusterChangeHandlingThread") {
@Override
public void run() {
- while (true) {
+ while (!_stopped) {
try {
- // Wait for at least one update
- Pair<UpdateType, Long> firstUpdate = _clusterChangeQueue.take();
-
- // Update the queue time metrics
- long queueTime = System.currentTimeMillis() - firstUpdate.getValue();
- brokerMetrics.addTimedValue(BrokerTimer.ROUTING_TABLE_UPDATE_QUEUE_TIME, queueTime, TimeUnit.MILLISECONDS);
-
- // Take all other updates also present
- List<Pair<UpdateType, Long>> allUpdates = new ArrayList<>();
- allUpdates.add(firstUpdate);
- _clusterChangeQueue.drainTo(allUpdates);
-
- // Gather all update types
- boolean externalViewUpdated = false;
- boolean instanceConfigUpdated = false;
-
- for (Pair<UpdateType, Long> update : allUpdates) {
- if (update.getKey() == UpdateType.EXTERNAL_VIEW) {
- externalViewUpdated = true;
- } else if (update.getKey() == UpdateType.INSTANCE_CONFIG) {
- instanceConfigUpdated = true;
+ for (Map.Entry<ChangeType, ClusterChangeHandler> entry : _changeHandlerMap.entrySet()) {
+ ChangeType changeType = entry.getKey();
+ ClusterChangeHandler changeHandler = entry.getValue();
+ long currentTime = System.currentTimeMillis();
+ Long lastChangeTime = _lastChangeTimeMap.remove(changeType);
+ if (lastChangeTime != null) {
+ brokerMetrics.addTimedValue(BrokerTimer.CLUSTER_CHANGE_QUEUE_TIME, currentTime - lastChangeTime,
+ TimeUnit.MILLISECONDS);
+ processClusterChange(changeType, changeHandler);
+ } else {
+ long lastProcessTime = _lastProcessTimeMap.get(changeType);
+ if (currentTime - lastProcessTime > PROACTIVE_CHANGE_CHECK_INTERVAL_MS) {
+ LOGGER.info("Proactive check {} change", changeType);
+ brokerMetrics.addMeteredGlobalValue(BrokerMeter.PROACTIVE_CLUSTER_CHANGE_CHECK, 1L);
+ processClusterChange(changeType, changeHandler);
+ }
}
}
- if (externalViewUpdated) {
- try {
- _helixExternalViewBasedRouting.processExternalViewChange();
- _tableQueryQuotaManager.processQueryQuotaChange();
- } catch (Exception e) {
- LOGGER.warn("Caught exception while updating external view", e);
- }
- }
-
- if (instanceConfigUpdated) {
- try {
- _helixExternalViewBasedRouting.processInstanceConfigChange();
- } catch (Exception e) {
- LOGGER.warn("Caught exception while processing instance config", e);
- }
- }
- } catch (InterruptedException e) {
- LOGGER.warn("Was interrupted while waiting for a cluster change", e);
- break;
+ // Add an interval between change checks to deduplicate multiple changes of the same type
+ Thread.sleep(CHANGE_CHECK_INTERVAL_MS);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception within cluster change handling thread", e);
}
}
-
- LOGGER.warn("Stopping deferred cluster state update thread");
- _deferredClusterUpdater = null;
}
};
+ }
- _deferredClusterUpdater.start();
+ private void processClusterChange(ChangeType changeType, ClusterChangeHandler changeHandler) {
+ long startTime = System.currentTimeMillis();
+ LOGGER.info("Start processing {} change", changeType);
+ changeHandler.processClusterChange();
+ long endTime = System.currentTimeMillis();
+ LOGGER.info("Finish processing {} change in {}ms", changeType, endTime - startTime);
+ _lastProcessTimeMap.put(changeType, endTime);
+ }
+
+ /**
+ * Starts the cluster change mediator.
+ */
+ public void start() {
+ LOGGER.info("Starting the cluster change handling thread");
+ _clusterChangeHandlingThread.start();
+ }
+
+ /**
+ * Stops the cluster change mediator.
+ */
+ public void stop() {
+ LOGGER.info("Stopping the cluster change handling thread");
+ _stopped = true;
+ try {
+ _clusterChangeHandlingThread.join();
+ } catch (InterruptedException e) {
+ LOGGER.error("Caught InterruptedException while waiting for cluster change handling thread to die");
+ Thread.currentThread().interrupt();
+ }
}
@Override
public void onExternalViewChange(List<ExternalView> externalViewList, NotificationContext changeContext) {
- // If the deferred update thread is alive, defer the update
- if (_deferredClusterUpdater != null && _deferredClusterUpdater.isAlive()) {
- try {
- _clusterChangeQueue.put(new ImmutablePair<>(UpdateType.EXTERNAL_VIEW, System.currentTimeMillis()));
- } catch (InterruptedException e) {
- LOGGER.warn("Was interrupted while trying to add external view change to queue", e);
- }
- } else {
- LOGGER.warn(
- "Deferred cluster updater thread is null or stopped, not deferring external view routing table rebuild");
- _helixExternalViewBasedRouting.processExternalViewChange();
- _tableQueryQuotaManager.processQueryQuotaChange();
- }
+ // External view list should be empty because Helix pre-fetch is disabled
+ assert externalViewList.isEmpty();
+
+ enqueueChange(ChangeType.EXTERNAL_VIEW);
}
@Override
- public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, NotificationContext context) {
- // If the deferred update thread is alive, defer the update
- if (_deferredClusterUpdater != null && _deferredClusterUpdater.isAlive()) {
- try {
- _clusterChangeQueue.put(new ImmutablePair<>(UpdateType.INSTANCE_CONFIG, System.currentTimeMillis()));
- } catch (InterruptedException e) {
- LOGGER.warn("Was interrupted while trying to add external view change to queue", e);
- }
- } else {
- LOGGER.warn(
- "Deferred cluster updater thread is null or stopped, not deferring instance config change notification");
- _helixExternalViewBasedRouting.processInstanceConfigChange();
- }
+ public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, NotificationContext changeContext) {
+ // Instance config list should be empty because Helix pre-fetch is disabled
+ assert instanceConfigs.isEmpty();
+
+ enqueueChange(ChangeType.INSTANCE_CONFIG);
}
@Override
public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) {
+ // Live instance list should be empty because Helix pre-fetch is disabled
+ assert liveInstances.isEmpty();
+
+ enqueueChange(ChangeType.LIVE_INSTANCE);
+ }
+
+ /**
+ * Helper method to enqueue a change from the Helix callback to be processed by the cluster change handling thread. If
+ * the handling thread is dead, directly process the change.
+ *
+ * @param changeType Type of the change
+ */
+ private void enqueueChange(ChangeType changeType) {
+ if (_clusterChangeHandlingThread.isAlive()) {
+ LOGGER.info("Enqueue {} change", changeType);
+ _lastChangeTimeMap.put(changeType, System.currentTimeMillis());
+ } else {
+ LOGGER.error("Cluster change handling thread is not alive, directly process the {} change", changeType);
+ processClusterChange(changeType, _changeHandlerMap.get(changeType));
+ }
}
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ExternalViewChangeHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ExternalViewChangeHandler.java
new file mode 100644
index 0000000..1422953
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ExternalViewChangeHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.broker.helix;
+
+import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
+
+
+/**
+ * Cluster change handler for external view changes.
+ */
+public class ExternalViewChangeHandler implements ClusterChangeHandler {
+ private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
+ private final TableQueryQuotaManager _tableQueryQuotaManager;
+
+ public ExternalViewChangeHandler(HelixExternalViewBasedRouting helixExternalViewBasedRouting,
+ TableQueryQuotaManager tableQueryQuotaManager) {
+ _helixExternalViewBasedRouting = helixExternalViewBasedRouting;
+ _tableQueryQuotaManager = tableQueryQuotaManager;
+ }
+
+ @Override
+ public void processClusterChange() {
+ _helixExternalViewBasedRouting.processExternalViewChange();
+ _tableQueryQuotaManager.processQueryQuotaChange();
+ }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index c446c70..1ff0de6 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -21,12 +21,15 @@ package org.apache.pinot.broker.broker.helix;
import com.google.common.collect.ImmutableList;
import com.yammer.metrics.core.MetricsRegistry;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
@@ -68,9 +71,10 @@ public class HelixBrokerStarter {
private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
private final BrokerServerBuilder _brokerServerBuilder;
private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
- private final LiveInstancesChangeListenerImpl _liveInstancesListener;
+ private final LiveInstanceChangeHandler _liveInstanceChangeHandler;
private final MetricsRegistry _metricsRegistry;
private final TableQueryQuotaManager _tableQueryQuotaManager;
+ private final ClusterChangeMediator _clusterChangeMediator;
private final TimeboundaryRefreshMessageHandlerFactory _tbiMessageHandler;
// Set after broker is started, which is actually in the constructor.
@@ -90,8 +94,6 @@ public class HelixBrokerStarter {
throws Exception {
LOGGER.info("Starting Pinot broker");
- _liveInstancesListener = new LiveInstancesChangeListenerImpl(helixClusterName);
-
_pinotHelixProperties = DefaultHelixBrokerConfig.getDefaultBrokerConf(pinotHelixProperties);
if (brokerHost == null) {
@@ -118,14 +120,21 @@ public class HelixBrokerStarter {
_helixExternalViewBasedRouting = new HelixExternalViewBasedRouting(_propertyStore, _spectatorHelixManager,
pinotHelixProperties.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
_tableQueryQuotaManager = new TableQueryQuotaManager(_spectatorHelixManager);
+ _liveInstanceChangeHandler = new LiveInstanceChangeHandler(_spectatorHelixManager);
+ Map<ChangeType, ClusterChangeHandler> clusterChangeHandlerMap = new HashMap<>();
+ clusterChangeHandlerMap.put(ChangeType.EXTERNAL_VIEW,
+ new ExternalViewChangeHandler(_helixExternalViewBasedRouting, _tableQueryQuotaManager));
+ clusterChangeHandlerMap
+ .put(ChangeType.INSTANCE_CONFIG, new InstanceConfigChangeHandler(_helixExternalViewBasedRouting));
+ clusterChangeHandlerMap.put(ChangeType.LIVE_INSTANCE, _liveInstanceChangeHandler);
_brokerServerBuilder = startBroker(_pinotHelixProperties);
_metricsRegistry = _brokerServerBuilder.getMetricsRegistry();
- ClusterChangeMediator clusterChangeMediator =
- new ClusterChangeMediator(_helixExternalViewBasedRouting, _tableQueryQuotaManager,
- _brokerServerBuilder.getBrokerMetrics());
- _spectatorHelixManager.addExternalViewChangeListener(clusterChangeMediator);
- _spectatorHelixManager.addInstanceConfigChangeListener(clusterChangeMediator);
- _spectatorHelixManager.addLiveInstanceChangeListener(_liveInstancesListener);
+ _clusterChangeMediator =
+ new ClusterChangeMediator(clusterChangeHandlerMap, _brokerServerBuilder.getBrokerMetrics());
+ _clusterChangeMediator.start();
+ _spectatorHelixManager.addExternalViewChangeListener(_clusterChangeMediator);
+ _spectatorHelixManager.addInstanceConfigChangeListener(_clusterChangeMediator);
+ _spectatorHelixManager.addLiveInstanceChangeListener(_clusterChangeMediator);
// Connect participant Helix manager.
_helixManager =
@@ -193,7 +202,7 @@ public class HelixBrokerStarter {
config = DefaultHelixBrokerConfig.getDefaultBrokerConf();
}
BrokerServerBuilder brokerServerBuilder = new BrokerServerBuilder(config, _helixExternalViewBasedRouting,
- _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstancesListener, _tableQueryQuotaManager);
+ _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstanceChangeHandler, _tableQueryQuotaManager);
_accessControlFactory = brokerServerBuilder.getAccessControlFactory();
_helixExternalViewBasedRouting.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
_tableQueryQuotaManager.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
@@ -281,10 +290,13 @@ public class HelixBrokerStarter {
LOGGER.info("Disconnecting spectator Helix manager");
_spectatorHelixManager.disconnect();
}
+
if (_tbiMessageHandler != null) {
LOGGER.info("Shutting down timeboundary info refresh message handler");
_tbiMessageHandler.shutdown();
}
+
+ _clusterChangeMediator.stop();
}
public MetricsRegistry getMetricsRegistry() {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/InstanceConfigChangeHandler.java
similarity index 53%
copy from pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
copy to pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/InstanceConfigChangeHandler.java
index ebd8740..bfba1af 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/InstanceConfigChangeHandler.java
@@ -16,37 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.common.metrics;
+package org.apache.pinot.broker.broker.helix;
-import org.apache.pinot.common.Utils;
+import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
/**
- * Enumeration containing all the timers exposed by the Pinot broker.
- *
+ * Cluster change handler for instance config changes.
*/
-public enum BrokerTimer implements AbstractMetrics.Timer {
- ROUTING_TABLE_UPDATE_TIME(true), ROUTING_TABLE_UPDATE_QUEUE_TIME(true);
- private final String timerName;
- private final boolean global;
-
- BrokerTimer(boolean global) {
- this.global = global;
- this.timerName = Utils.toCamelCase(name().toLowerCase());
- }
+public class InstanceConfigChangeHandler implements ClusterChangeHandler {
+ private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
- @Override
- public String getTimerName() {
- return timerName;
+ public InstanceConfigChangeHandler(HelixExternalViewBasedRouting helixExternalViewBasedRouting) {
+ _helixExternalViewBasedRouting = helixExternalViewBasedRouting;
}
- /**
- * Returns true if the timer is global (not attached to a particular resource)
- *
- * @return true if the timer is global
- */
@Override
- public boolean isGlobal() {
- return global;
+ public void processClusterChange() {
+ _helixExternalViewBasedRouting.processInstanceConfigChange();
}
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstancesChangeListenerImpl.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java
similarity index 66%
rename from pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstancesChangeListenerImpl.java
rename to pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java
index 5c192ad..aaecb47 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstancesChangeListenerImpl.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java
@@ -21,8 +21,9 @@ package org.apache.pinot.broker.broker.helix;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.api.listeners.LiveInstanceChangeListener;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
import org.apache.helix.model.LiveInstance;
import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.utils.CommonConstants;
@@ -32,35 +33,40 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class LiveInstancesChangeListenerImpl implements LiveInstanceChangeListener {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(LiveInstancesChangeListenerImpl.class);
+/**
+ * Cluster change handler for live instance changes.
+ */
+public class LiveInstanceChangeHandler implements ClusterChangeHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(LiveInstanceChangeHandler.class);
private static final boolean DO_NOT_RECREATE = false;
- private long timeout;
- private final Map<String, String> liveInstanceToSessionIdMap;
- private KeyedPool<PooledNettyClientResourceManager.PooledClientConnection> connectionPool;
+ private final HelixDataAccessor _helixDataAccessor;
+ private final PropertyKey _liveInstancesKey;
- public LiveInstancesChangeListenerImpl(String clusterName) {
- this.liveInstanceToSessionIdMap = new HashMap<String, String>();
+ private KeyedPool<PooledNettyClientResourceManager.PooledClientConnection> _connectionPool;
+ private Map<String, String> _liveInstanceToSessionIdMap;
+
+ public LiveInstanceChangeHandler(HelixManager helixManager) {
+ _helixDataAccessor = helixManager.getHelixDataAccessor();
+ _liveInstancesKey = new PropertyKey.Builder(helixManager.getClusterName()).liveInstances();
}
- public void init(final KeyedPool<PooledNettyClientResourceManager.PooledClientConnection> connectionPool,
- final long timeout) {
- this.connectionPool = connectionPool;
- this.timeout = timeout;
+ public void init(KeyedPool<PooledNettyClientResourceManager.PooledClientConnection> connectionPool) {
+ _connectionPool = connectionPool;
+ _liveInstanceToSessionIdMap = new HashMap<>();
}
@Override
- public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) {
- if (connectionPool == null) {
- LOGGER.warn("init has not been called on the live instances listener, ignoring live instance change.");
+ public void processClusterChange() {
+ // Skip processing live instance change for single-connection routing
+ if (_connectionPool == null) {
return;
}
- for (LiveInstance instance : liveInstances) {
+ List<LiveInstance> liveInstances = _helixDataAccessor.getChildValues(_liveInstancesKey);
+ for (LiveInstance instance : liveInstances) {
String instanceId = instance.getInstanceName();
String sessionId = instance.getSessionId();
@@ -80,15 +86,15 @@ public class LiveInstancesChangeListenerImpl implements LiveInstanceChangeListen
.warn("Port for server instance {} does not appear to be numeric, defaulting to {}.", instanceId, port, e);
}
- if (liveInstanceToSessionIdMap.containsKey(instanceId)) {
+ if (_liveInstanceToSessionIdMap.containsKey(instanceId)) {
// sessionId has changed
- if (!sessionId.equals(liveInstanceToSessionIdMap.get(instanceId))) {
+ if (!sessionId.equals(_liveInstanceToSessionIdMap.get(instanceId))) {
try {
LOGGER.info("Instance {} has changed session id {} -> {}, validating connection pool for this instance.",
- instanceId, sessionId, liveInstanceToSessionIdMap.get(instanceId));
+ instanceId, sessionId, _liveInstanceToSessionIdMap.get(instanceId));
ServerInstance ins = ServerInstance.forHostPort(hostName, port);
- connectionPool.validatePool(ins, DO_NOT_RECREATE);
- liveInstanceToSessionIdMap.put(instanceId, sessionId);
+ _connectionPool.validatePool(ins, DO_NOT_RECREATE);
+ _liveInstanceToSessionIdMap.put(instanceId, sessionId);
} catch (Exception e) {
LOGGER.error("Error trying to validate & destroy dead connections for {}", instanceId, e);
}
@@ -99,8 +105,8 @@ public class LiveInstancesChangeListenerImpl implements LiveInstanceChangeListen
// lets first check if the connection is valid or not
try {
ServerInstance ins = ServerInstance.forHostPort(hostName, port);
- connectionPool.validatePool(ins, DO_NOT_RECREATE);
- liveInstanceToSessionIdMap.put(instanceId, sessionId);
+ _connectionPool.validatePool(ins, DO_NOT_RECREATE);
+ _liveInstanceToSessionIdMap.put(instanceId, sessionId);
} catch (Exception e) {
LOGGER.error("Error trying to destroy dead connections for {}", instanceId, e);
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
index 068a137..0202315 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
@@ -36,7 +36,7 @@ import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.Configuration;
import org.apache.pinot.broker.api.RequestStatistics;
import org.apache.pinot.broker.broker.AccessControlFactory;
-import org.apache.pinot.broker.broker.helix.LiveInstancesChangeListenerImpl;
+import org.apache.pinot.broker.broker.helix.LiveInstanceChangeHandler;
import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
import org.apache.pinot.broker.routing.RoutingTable;
import org.apache.pinot.broker.routing.TimeBoundaryService;
@@ -51,7 +51,6 @@ import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.serde.SerDe;
@@ -80,7 +79,7 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionPoolBrokerRequestHandler.class);
private static final String TRANSPORT_CONFIG_PREFIX = "pinot.broker.transport";
- private final LiveInstancesChangeListenerImpl _liveInstanceChangeListener;
+ private final LiveInstanceChangeHandler _liveInstanceChangeHandler;
private final EventLoopGroup _eventLoopGroup;
private final ScheduledThreadPoolExecutor _poolTimeoutExecutor;
private final ExecutorService _requestSenderPool;
@@ -90,9 +89,9 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
public ConnectionPoolBrokerRequestHandler(Configuration config, RoutingTable routingTable,
TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics,
- LiveInstancesChangeListenerImpl liveInstanceChangeListener, MetricsRegistry metricsRegistry) {
+ LiveInstanceChangeHandler liveInstanceChangeHandler, MetricsRegistry metricsRegistry) {
super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, brokerMetrics);
- _liveInstanceChangeListener = liveInstanceChangeListener;
+ _liveInstanceChangeHandler = liveInstanceChangeHandler;
TransportClientConf transportClientConf = new TransportClientConf();
transportClientConf.init(_config.subset(TRANSPORT_CONFIG_PREFIX));
@@ -120,7 +119,7 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
@Override
public synchronized void start() {
_connPool.start();
- _liveInstanceChangeListener.init(_connPool, CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS);
+ _liveInstanceChangeHandler.init(_connPool);
}
@Override
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index 55c05a5..170e745 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -97,7 +97,9 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
// Netty connection metrics
NETTY_CONNECTION_REQUESTS_SENT("nettyConnection", true),
NETTY_CONNECTION_BYTES_SENT("nettyConnection", true),
- NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true);
+ NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),
+
+ PROACTIVE_CLUSTER_CHANGE_CHECK("proactiveClusterChangeCheck", true);
private final String brokerMeterName;
private final String unit;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
index ebd8740..694db60 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
@@ -26,7 +26,9 @@ import org.apache.pinot.common.Utils;
*
*/
public enum BrokerTimer implements AbstractMetrics.Timer {
- ROUTING_TABLE_UPDATE_TIME(true), ROUTING_TABLE_UPDATE_QUEUE_TIME(true);
+ ROUTING_TABLE_UPDATE_TIME(true),
+ CLUSTER_CHANGE_QUEUE_TIME(true);
+
private final String timerName;
private final boolean global;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org