You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/03/01 18:34:33 UTC
[incubator-pinot] branch master updated: In ClusterChangeMediator,
remove sleep and make it notify based (#3898)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9dda23f In ClusterChangeMediator, remove sleep and make it notify based (#3898)
9dda23f is described below
commit 9dda23f791c0b2fd6f09c83e973516941e724748
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Mar 1 10:34:28 2019 -0800
In ClusterChangeMediator, remove sleep and make it notify based (#3898)
When we get the Helix cluster change callbacks, we want to process them ASAP
Replace sleep() with wait() and notify() so that the new changes are processed immediately
Testing done:
- Check that the queue time matches with the processing time
- Run tests without starting the cluster change handling thread
- Reduce proactive change check interval to 1 second and check it interacts correctly with Helix callbacks
---
.../broker/broker/helix/ClusterChangeMediator.java | 39 +++++++++++++++-------
1 file changed, 27 insertions(+), 12 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
index 1d2dbe6..0653ddc 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
@@ -18,9 +18,9 @@
*/
package org.apache.pinot.broker.broker.helix;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.NotificationContext;
@@ -44,8 +44,8 @@ import org.slf4j.LoggerFactory;
* <p>
* <p>If there is no change callback in 1 hour, proactively check changes so that the changes are getting processed even
* when callbacks stop working.
- * <p>NOTE: disable Helix batch-mode and perform deduplication in this class. This can give us more control on the
- * frequency of change checks, and let us track the cluster change queue time.
+ * <p>NOTE: disable Helix batch-mode and perform deduplication in this class. This can save us the extra threads for
+ * handling Helix batch-mode, and let us track the cluster change queue time.
* <p>NOTE: disable Helix pre-fetch to reduce the ZK accesses.
*/
@BatchMode(enabled = false)
@@ -53,14 +53,12 @@ import org.slf4j.LoggerFactory;
public class ClusterChangeMediator implements ExternalViewChangeListener, InstanceConfigChangeListener, LiveInstanceChangeListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterChangeMediator.class);
- // Add 1 second interval between change checks to deduplicate multiple changes of the same type
- private static final long CHANGE_CHECK_INTERVAL_MS = 1000L;
// If no change got for 1 hour, proactively check changes
private static final long PROACTIVE_CHANGE_CHECK_INTERVAL_MS = 3600 * 1000L;
private final Map<ChangeType, ClusterChangeHandler> _changeHandlerMap;
- private final Map<ChangeType, Long> _lastChangeTimeMap = new ConcurrentHashMap<>();
- private final Map<ChangeType, Long> _lastProcessTimeMap = new ConcurrentHashMap<>();
+ private final Map<ChangeType, Long> _lastChangeTimeMap = new HashMap<>();
+ private final Map<ChangeType, Long> _lastProcessTimeMap = new HashMap<>();
private final Thread _clusterChangeHandlingThread;
@@ -84,7 +82,10 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
ChangeType changeType = entry.getKey();
ClusterChangeHandler changeHandler = entry.getValue();
long currentTime = System.currentTimeMillis();
- Long lastChangeTime = _lastChangeTimeMap.remove(changeType);
+ Long lastChangeTime;
+ synchronized (_lastChangeTimeMap) {
+ lastChangeTime = _lastChangeTimeMap.remove(changeType);
+ }
if (lastChangeTime != null) {
brokerMetrics.addTimedValue(BrokerTimer.CLUSTER_CHANGE_QUEUE_TIME, currentTime - lastChangeTime,
TimeUnit.MILLISECONDS);
@@ -98,10 +99,16 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
}
}
}
-
- // Add an interval between change checks to deduplicate multiple changes of the same type
- Thread.sleep(CHANGE_CHECK_INTERVAL_MS);
+ synchronized (_lastChangeTimeMap) {
+ // Wait for at most 1/10 of proactive change check interval if no new event received. This can guarantee
+ // that the proactive change check will not be delayed for more than 1/10 of the interval. In case of
+ // spurious wakeup, execute the while loop again for the proactive change check.
+ if (_lastChangeTimeMap.isEmpty()) {
+ _lastChangeTimeMap.wait(PROACTIVE_CHANGE_CHECK_INTERVAL_MS / 10);
+ }
+ }
} catch (Exception e) {
+ // Ignore all exceptions. The thread keeps running until ClusterChangeMediator.stop() is invoked.
LOGGER.error("Caught exception within cluster change handling thread", e);
}
}
@@ -132,6 +139,9 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
public void stop() {
LOGGER.info("Stopping the cluster change handling thread");
_stopped = true;
+ synchronized (_lastChangeTimeMap) {
+ _lastChangeTimeMap.notify();
+ }
try {
_clusterChangeHandlingThread.join();
} catch (InterruptedException e) {
@@ -173,7 +183,12 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
private void enqueueChange(ChangeType changeType) {
if (_clusterChangeHandlingThread.isAlive()) {
LOGGER.info("Enqueue {} change", changeType);
- _lastChangeTimeMap.put(changeType, System.currentTimeMillis());
+ synchronized (_lastChangeTimeMap) {
+ if (!_lastChangeTimeMap.containsKey(changeType)) {
+ _lastChangeTimeMap.put(changeType, System.currentTimeMillis());
+ _lastChangeTimeMap.notify();
+ }
+ }
} else {
LOGGER.error("Cluster change handling thread is not alive, directly process the {} change", changeType);
processClusterChange(changeType, _changeHandlerMap.get(changeType));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org