You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/06/22 22:57:43 UTC
[24/50] [abbrv] helix git commit: Adding support to batch ZK callback
optionally by setting sys var asyncBatchModeEnabled=true
Adding support to batch ZK callback optionally by setting sys var asyncBatchModeEnabled=true
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/384978a2
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/384978a2
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/384978a2
Branch: refs/heads/master
Commit: 384978a2e16ab0f4adb388e32c7e448c77996ca2
Parents: 015a73c
Author: kishoreg <ki...@apache.org>
Authored: Fri Mar 24 10:48:05 2017 -0700
Committer: kishoreg <ki...@apache.org>
Committed: Fri Mar 24 10:48:05 2017 -0700
----------------------------------------------------------------------
.../helix/manager/zk/CallbackHandler.java | 130 ++++++++++++-------
1 file changed, 86 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/384978a2/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index c3e8206..90df56d 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -31,6 +31,8 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.I0Itec.zkclient.IZkChildListener;
@@ -53,7 +55,7 @@ import org.apache.helix.MessageListener;
import org.apache.helix.NotificationContext;
import org.apache.helix.NotificationContext.Type;
import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.PropertyPathConfig;
import org.apache.helix.ScopedConfigChangeListener;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.CurrentState;
@@ -89,15 +91,19 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
private final AtomicLong _lastNotificationTimeStamp;
private final HelixManager _manager;
private final PropertyKey _propertyKey;
-
+ BlockingQueue<NotificationContext> _queue = new LinkedBlockingQueue<NotificationContext>(1000);
+ private static boolean asyncBatchModeEnabled = false;
+ static {
+ asyncBatchModeEnabled = Boolean.parseBoolean(System.getProperty("isAsyncBatchModeEnabled"));
+ logger.info("isAsyncBatchModeEnabled: " + asyncBatchModeEnabled);
+ }
/**
* maintain the expected notification types
* this is fix for HELIX-195: race condition between FINALIZE callbacks and Zk callbacks
*/
private List<NotificationContext.Type> _expectTypes = nextNotificationType.get(Type.FINALIZE);
- public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey,
- Object listener, EventType[] eventTypes, ChangeType changeType) {
+ public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey, Object listener, EventType[] eventTypes, ChangeType changeType) {
if (listener == null) {
throw new HelixException("listener could not be null");
}
@@ -111,6 +117,10 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
this._eventTypes = eventTypes;
this._changeType = changeType;
this._lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
+ this._queue = new LinkedBlockingQueue<NotificationContext>(1000);
+ if (asyncBatchModeEnabled) {
+ new Thread(new CallbackInvoker(this)).start();
+ }
init();
}
@@ -122,13 +132,58 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
return _path;
}
+ class CallbackInvoker implements Runnable {
+ private CallbackHandler handler;
+
+ CallbackInvoker(CallbackHandler handler) {
+ this.handler = handler;
+ }
+
+ public void run() {
+ while (true) {
+ try {
+ NotificationContext notificationToProcess = _queue.take();
+ int mergedCallbacks = 0;
+ // remove all elements in the queue that have the same type
+ while (true) {
+ NotificationContext nextItem = _queue.peek();
+ if (nextItem != null && notificationToProcess.getType() == nextItem.getType()) {
+ notificationToProcess = _queue.take();
+ mergedCallbacks++;
+ } else {
+ break;
+ }
+ }
+ try {
+ logger.info("Num callbacks merged for path:" + handler.getPath() + " : " + mergedCallbacks);
+ handler.invoke(notificationToProcess);
+ } catch (Exception e) {
+ logger.warn("Exception in callback processing thread. Skipping callback", e);
+ }
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted exception in callback processing thread. Exiting thread, new callbacks will not be processed", e);
+ break;
+ }
+ }
+ }
+ }
+
+ public void enqueueTask(NotificationContext changeContext) throws Exception {
+ //async mode only applicable to CALLBACK from ZK, During INIT and FINALIZE invoke the callback's immediately.
+ if (asyncBatchModeEnabled && changeContext.getType() != NotificationContext.Type.CALLBACK) {
+ logger.info("Enqueuing callback");
+ _queue.put(changeContext);
+ } else {
+ invoke(changeContext);
+ }
+ }
+
public void invoke(NotificationContext changeContext) throws Exception {
// This allows the listener to work with one change at a time
synchronized (_manager) {
Type type = changeContext.getType();
if (!_expectTypes.contains(type)) {
- logger.warn("Skip processing callbacks for listener: " + _listener + ", path: " + _path
- + ", expected types: " + _expectTypes + " but was " + type);
+ logger.warn("Skip processing callbacks for listener: " + _listener + ", path: " + _path + ", expected types: " + _expectTypes + " but was " + type);
return;
}
_expectTypes = nextNotificationType.get(type);
@@ -136,8 +191,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
// Builder keyBuilder = _accessor.keyBuilder();
long start = System.currentTimeMillis();
if (logger.isInfoEnabled()) {
- logger.info(Thread.currentThread().getId() + " START:INVOKE " + _path + " listener:"
- + _listener.getClass().getCanonicalName());
+ logger.info(Thread.currentThread().getId() + " START:INVOKE " + _path + " listener:" + _listener.getClass().getCanonicalName());
}
if (_changeType == IDEAL_STATE) {
@@ -165,18 +219,16 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
List<HelixProperty> configs = _accessor.getChildValues(_propertyKey);
listener.onConfigChange(configs, changeContext);
} else if (_changeType == LIVE_INSTANCE) {
- LiveInstanceChangeListener liveInstanceChangeListener =
- (LiveInstanceChangeListener) _listener;
+ LiveInstanceChangeListener liveInstanceChangeListener = (LiveInstanceChangeListener) _listener;
subscribeForChanges(changeContext, _path, true, true);
List<LiveInstance> liveInstances = _accessor.getChildValues(_propertyKey);
liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext);
} else if (_changeType == CURRENT_STATE) {
- CurrentStateChangeListener currentStateChangeListener =
- (CurrentStateChangeListener) _listener;
+ CurrentStateChangeListener currentStateChangeListener = (CurrentStateChangeListener) _listener;
subscribeForChanges(changeContext, _path, true, true);
- String instanceName = PropertyPathBuilder.getInstanceNameFromPath(_path);
+ String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
List<CurrentState> currentStates = _accessor.getChildValues(_propertyKey);
@@ -185,7 +237,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
} else if (_changeType == MESSAGE) {
MessageListener messageListener = (MessageListener) _listener;
subscribeForChanges(changeContext, _path, true, false);
- String instanceName = PropertyPathBuilder.getInstanceNameFromPath(_path);
+ String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
List<Message> messages = _accessor.getChildValues(_propertyKey);
messageListener.onMessage(instanceName, messages, changeContext);
@@ -211,8 +263,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
long end = System.currentTimeMillis();
if (logger.isInfoEnabled()) {
- logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path + " listener:"
- + _listener.getClass().getCanonicalName() + " Took: " + (end - start) + "ms");
+ logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path + " listener:" + _listener.getClass().getCanonicalName() + " Took: " + (end - start)
+ + "ms");
}
}
}
@@ -220,12 +272,10 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
private void subscribeChildChange(String path, NotificationContext context) {
NotificationContext.Type type = context.getType();
if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK) {
- logger.info(_manager.getInstanceName() + " subscribes child-change. path: " + path
- + ", listener: " + _listener);
+ logger.info(_manager.getInstanceName() + " subscribes child-change. path: " + path + ", listener: " + _listener);
_zkClient.subscribeChildChanges(path, this);
} else if (type == NotificationContext.Type.FINALIZE) {
- logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + path
- + ", listener: " + _listener);
+ logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + path + ", listener: " + _listener);
_zkClient.unsubscribeChildChanges(path, this);
}
@@ -235,22 +285,20 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
NotificationContext.Type type = context.getType();
if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK) {
if (logger.isDebugEnabled()) {
- logger.debug(_manager.getInstanceName() + " subscribe data-change. path: " + path
- + ", listener: " + _listener);
+ logger.debug(_manager.getInstanceName() + " subscribe data-change. path: " + path + ", listener: " + _listener);
}
_zkClient.subscribeDataChanges(path, this);
} else if (type == NotificationContext.Type.FINALIZE) {
- logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + path
- + ", listener: " + _listener);
+ logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + path + ", listener: " + _listener);
_zkClient.unsubscribeDataChanges(path, this);
}
}
// TODO watchParent is always true. consider remove it
- private void subscribeForChanges(NotificationContext context, String path, boolean watchParent,
- boolean watchChild) {
+ private void subscribeForChanges(NotificationContext context, String path, boolean watchParent, boolean watchChild) {
+ long start = System.currentTimeMillis();
if (watchParent) {
subscribeChildChange(path, context);
}
@@ -301,10 +349,11 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
}
}
} catch (ZkNoNodeException e) {
- logger.warn("fail to subscribe child/data change. path: " + path + ", listener: "
- + _listener, e);
+ logger.warn("fail to subscribe child/data change. path: " + path + ", listener: " + _listener, e);
}
}
+ long end = System.currentTimeMillis();
+ logger.info("Subcribing to path:" + path + " took:" + (end - start));
}
@@ -321,7 +370,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
try {
NotificationContext changeContext = new NotificationContext(_manager);
changeContext.setType(NotificationContext.Type.INIT);
- invoke(changeContext);
+ enqueueTask(changeContext);
} catch (Exception e) {
String msg = "Exception while invoking init callback for listener:" + _listener;
ZKExceptionHandler.getInstance().handle(msg, e);
@@ -335,11 +384,10 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
if (dataPath != null && dataPath.startsWith(_path)) {
NotificationContext changeContext = new NotificationContext(_manager);
changeContext.setType(NotificationContext.Type.CALLBACK);
- invoke(changeContext);
+ enqueueTask(changeContext);
}
} catch (Exception e) {
- String msg =
- "exception in handling data-change. path: " + dataPath + ", listener: " + _listener;
+ String msg = "exception in handling data-change. path: " + dataPath + ", listener: " + _listener;
ZKExceptionHandler.getInstance().handle(msg, e);
}
}
@@ -349,14 +397,12 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
try {
updateNotificationTime(System.nanoTime());
if (dataPath != null && dataPath.startsWith(_path)) {
- logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + dataPath
- + ", listener: " + _listener);
+ logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + dataPath + ", listener: " + _listener);
_zkClient.unsubscribeDataChanges(dataPath, this);
// only needed for bucketized parent, but OK if we don't have child-change
// watch on the bucketized parent path
- logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + dataPath
- + ", listener: " + _listener);
+ logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + dataPath + ", listener: " + _listener);
_zkClient.unsubscribeChildChanges(dataPath, this);
// No need to invoke() since this event will handled by child-change on parent-node
// NotificationContext changeContext = new NotificationContext(_manager);
@@ -364,9 +410,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
// invoke(changeContext);
}
} catch (Exception e) {
- String msg =
- "exception in handling data-delete-change. path: " + dataPath + ", listener: "
- + _listener;
+ String msg = "exception in handling data-delete-change. path: " + dataPath + ", listener: " + _listener;
ZKExceptionHandler.getInstance().handle(msg, e);
}
}
@@ -384,13 +428,11 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
_manager.removeListener(_propertyKey, _listener);
} else {
changeContext.setType(NotificationContext.Type.CALLBACK);
- invoke(changeContext);
+ enqueueTask(changeContext);
}
}
} catch (Exception e) {
- String msg =
- "exception in handling child-change. instance: " + _manager.getInstanceName()
- + ", parentPath: " + parentPath + ", listener: " + _listener;
+ String msg = "exception in handling child-change. instance: " + _manager.getInstanceName() + ", parentPath: " + parentPath + ", listener: " + _listener;
ZKExceptionHandler.getInstance().handle(msg, e);
}
}
@@ -402,7 +444,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
try {
NotificationContext changeContext = new NotificationContext(_manager);
changeContext.setType(NotificationContext.Type.FINALIZE);
- invoke(changeContext);
+ enqueueTask(changeContext);
} catch (Exception e) {
String msg = "Exception while resetting the listener:" + _listener;
ZKExceptionHandler.getInstance().handle(msg, e);