You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/02/01 23:47:18 UTC
git commit: HELIX-37: Cleanup CallbackHandler
Updated Branches:
refs/heads/master 47d1e650e -> a8ee977be
HELIX-37: Cleanup CallbackHandler
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/a8ee977b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/a8ee977b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/a8ee977b
Branch: refs/heads/master
Commit: a8ee977bee59d5e6b29c08420d59634d65691bf4
Parents: 47d1e65
Author: zzhang <zz...@uci.edu>
Authored: Fri Feb 1 14:47:07 2013 -0800
Committer: zzhang <zz...@uci.edu>
Committed: Fri Feb 1 14:47:07 2013 -0800
----------------------------------------------------------------------
.../apache/helix/manager/zk/CallbackHandler.java | 151 ++++++++++----
1 files changed, 108 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a8ee977b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 61dd59a..0cc72b5 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigChangeListener;
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.CurrentStateChangeListener;
@@ -41,12 +42,14 @@ import org.apache.helix.HealthStateChangeListener;
import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
import org.apache.helix.IdealStateChangeListener;
import org.apache.helix.LiveInstanceChangeListener;
import org.apache.helix.MessageListener;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.ZNRecord;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HealthStat;
@@ -69,7 +72,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
private final HelixDataAccessor _accessor;
private final ChangeType _changeType;
private final ZkClient _zkClient;
- private final AtomicLong lastNotificationTimeStamp;
+ private final AtomicLong _lastNotificationTimeStamp;
private final HelixManager _manager;
public CallbackHandler(HelixManager manager,
@@ -86,7 +89,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
this._listener = listener;
this._eventTypes = eventTypes;
this._changeType = changeType;
- lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
+ this._lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
init();
}
@@ -110,7 +113,6 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
if (logger.isInfoEnabled())
{
logger.info(Thread.currentThread().getId() + " START:INVOKE "
- // + changeContext.getPathChanged()
+ _path + " listener:" + _listener.getClass().getCanonicalName());
}
@@ -228,62 +230,116 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
}
}
- private void subscribeForChanges(NotificationContext context,
- String path,
- boolean watchParent,
- boolean watchChild)
+ private void subscribeChildChange(String path, NotificationContext context)
{
- NotificationContext.Type type = context.getType();
- if (watchParent)
- {
- if (type == NotificationContext.Type.INIT
- || type == NotificationContext.Type.CALLBACK)
+ NotificationContext.Type type = context.getType();
+ if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK)
{
- logger.info(_manager.getInstanceName() + " subscribe child change@" + path);
+ logger.info(_manager.getInstanceName() + " subscribes child-change. path: "
+ + path + ", listener: " + _listener);
_zkClient.subscribeChildChanges(path, this);
}
else if (type == NotificationContext.Type.FINALIZE)
{
- logger.info(_manager.getInstanceName() + " UNsubscribe child change@" + path);
+ logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: "
+ + path + ", listener: " + _listener);
_zkClient.unsubscribeChildChanges(path, this);
}
+ }
+
+ private void subscribeDataChange(String path, NotificationContext context)
+ {
+ NotificationContext.Type type = context.getType();
+ if (type == NotificationContext.Type.INIT
+ || type == NotificationContext.Type.CALLBACK)
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug(_manager.getInstanceName() + " subscribe data-change. path: "
+ + path + ", listener: " + _listener);
+ }
+ _zkClient.subscribeDataChanges(path, this);
+
+ }
+ else if (type == NotificationContext.Type.FINALIZE)
+ {
+ logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: "
+ + path + ", listener: " + _listener);
+ _zkClient.unsubscribeDataChanges(path, this);
+ }
+ }
+
+ private void subscribeForChanges(NotificationContext context,
+ String path,
+ boolean watchParent,
+ boolean watchChild)
+ {
+ if (watchParent)
+ {
+ subscribeChildChange(path, context);
}
if (watchChild)
{
try
{
- List<String> childNames = _zkClient.getChildren(path);
- if (childNames == null || childNames.size() == 0)
+ switch(_changeType)
+ {
+ case CURRENT_STATE:
+ case IDEAL_STATE:
+ case EXTERNAL_VIEW:
{
- return;
+ // check if bucketized
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ List<ZNRecord> records = baseAccessor.getChildren(path, null, 0);
+ for (ZNRecord record : records)
+ {
+ HelixProperty property = new HelixProperty(record);
+ String childPath = path + "/" + record.getId();
+
+ int bucketSize = property.getBucketSize();
+ if (bucketSize > 0)
+ {
+ // subscribe both data-change and child-change on bucketized parent node
+ // data-change gives a delete-callback which is used to remove watch
+ subscribeChildChange(childPath, context);
+ subscribeDataChange(childPath, context);
+
+ // subscribe data-change on bucketized child
+ List<String> bucketizedChildNames = _zkClient.getChildren(childPath);
+ if (bucketizedChildNames != null)
+ {
+ for (String bucketizedChildName : bucketizedChildNames)
+ {
+ String bucketizedChildPath = childPath + "/" + bucketizedChildName;
+ subscribeDataChange(bucketizedChildPath, context);
+ }
+ }
+ } else
+ {
+ subscribeDataChange(childPath, context);
+ }
+ }
+ break;
}
-
- for (String childName : childNames)
+ default:
{
- String childPath = path + "/" + childName;
- if (type == NotificationContext.Type.INIT
- || type == NotificationContext.Type.CALLBACK)
- {
- if (logger.isDebugEnabled())
+ List<String> childNames = _zkClient.getChildren(path);
+ if (childNames != null)
{
- logger.debug(_manager.getInstanceName() + " subscribe data change@" + childPath);
+ for (String childName : childNames)
+ {
+ String childPath = path + "/" + childName;
+ subscribeDataChange(childPath, context);
+ }
}
- _zkClient.subscribeDataChanges(childPath, this);
-
- }
- else if (type == NotificationContext.Type.FINALIZE)
- {
- logger.info(_manager.getInstanceName() + " UNsubscribe data change@" + childPath);
- _zkClient.unsubscribeDataChanges(childPath, this);
- }
-
- subscribeForChanges(context, childPath, watchParent, watchChild);
+ break;
}
+ }
}
catch (ZkNoNodeException e)
{
- logger.warn("fail to subscribe child data change@" + path);
+ logger.warn("fail to subscribe child/data change@" + path, e);
}
}
@@ -341,10 +397,19 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
updateNotificationTime(System.nanoTime());
if (dataPath != null && dataPath.startsWith(_path))
{
- NotificationContext changeContext = new NotificationContext(_manager);
- changeContext.setType(NotificationContext.Type.CALLBACK);
- _zkClient.unsubscribeChildChanges(dataPath, this);
- invoke(changeContext);
+ logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: "
+ + dataPath + ", listener: " + _listener);
+ _zkClient.unsubscribeDataChanges(dataPath, this);
+
+ // only needed for bucketized parent, but OK if we don't have child-change watch on the path
+ logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: "
+ + dataPath + ", listener: " + _listener);
+ _zkClient.unsubscribeChildChanges(dataPath, this);
+
+ // No need to invoke() since this event will handled by child-change on parent-node
+// NotificationContext changeContext = new NotificationContext(_manager);
+// changeContext.setType(NotificationContext.Type.CALLBACK);
+// invoke(changeContext);
}
}
catch (Exception e)
@@ -392,17 +457,17 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
private void updateNotificationTime(long nanoTime)
{
- long l = lastNotificationTimeStamp.get();
+ long l = _lastNotificationTimeStamp.get();
while (nanoTime > l)
{
- boolean b = lastNotificationTimeStamp.compareAndSet(l, nanoTime);
+ boolean b = _lastNotificationTimeStamp.compareAndSet(l, nanoTime);
if (b)
{
break;
}
else
{
- l = lastNotificationTimeStamp.get();
+ l = _lastNotificationTimeStamp.get();
}
}
}