You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/02/01 23:47:18 UTC

git commit: HELIX-37: Cleanup CallbackHandler

Updated Branches:
  refs/heads/master 47d1e650e -> a8ee977be


HELIX-37: Cleanup CallbackHandler


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/a8ee977b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/a8ee977b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/a8ee977b

Branch: refs/heads/master
Commit: a8ee977bee59d5e6b29c08420d59634d65691bf4
Parents: 47d1e65
Author: zzhang <zz...@uci.edu>
Authored: Fri Feb 1 14:47:07 2013 -0800
Committer: zzhang <zz...@uci.edu>
Committed: Fri Feb 1 14:47:07 2013 -0800

----------------------------------------------------------------------
 .../apache/helix/manager/zk/CallbackHandler.java   |  151 ++++++++++----
 1 files changed, 108 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a8ee977b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 61dd59a..0cc72b5 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigChangeListener;
 import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.CurrentStateChangeListener;
@@ -41,12 +42,14 @@ import org.apache.helix.HealthStateChangeListener;
 import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
 import org.apache.helix.IdealStateChangeListener;
 import org.apache.helix.LiveInstanceChangeListener;
 import org.apache.helix.MessageListener;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HealthStat;
@@ -69,7 +72,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
   private final HelixDataAccessor _accessor;
   private final ChangeType        _changeType;
   private final ZkClient          _zkClient;
-  private final AtomicLong        lastNotificationTimeStamp;
+  private final AtomicLong        _lastNotificationTimeStamp;
   private final HelixManager      _manager;
 
   public CallbackHandler(HelixManager manager,
@@ -86,7 +89,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
     this._listener = listener;
     this._eventTypes = eventTypes;
     this._changeType = changeType;
-    lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
+    this._lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
     init();
   }
 
@@ -110,7 +113,6 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
       if (logger.isInfoEnabled())
       {
         logger.info(Thread.currentThread().getId() + " START:INVOKE "
-        // + changeContext.getPathChanged()
             + _path + " listener:" + _listener.getClass().getCanonicalName());
       }
 
@@ -228,62 +230,116 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
     }
   }
 
-  private void subscribeForChanges(NotificationContext context,
-                                   String path,
-                                   boolean watchParent,
-                                   boolean watchChild)
+  private void subscribeChildChange(String path, NotificationContext context)
   {
-    NotificationContext.Type type = context.getType();
-    if (watchParent)
-    {
-      if (type == NotificationContext.Type.INIT
-          || type == NotificationContext.Type.CALLBACK)
+	  NotificationContext.Type type = context.getType();
+      if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK)
       {
-        logger.info(_manager.getInstanceName() + " subscribe child change@" + path);
+        logger.info(_manager.getInstanceName() + " subscribes child-change. path: " 
+        		+ path + ", listener: " + _listener);
         _zkClient.subscribeChildChanges(path, this);
       }
       else if (type == NotificationContext.Type.FINALIZE)
       {
-        logger.info(_manager.getInstanceName() + " UNsubscribe child change@" + path);
+        logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " 
+        		+ path + ", listener: " + _listener);
         _zkClient.unsubscribeChildChanges(path, this);
       }
+  }
+  
+  private void subscribeDataChange(String path, NotificationContext context)
+  {
+    	NotificationContext.Type type = context.getType();
+        if (type == NotificationContext.Type.INIT
+            || type == NotificationContext.Type.CALLBACK)
+        {
+          if (logger.isDebugEnabled())
+          {
+            logger.debug(_manager.getInstanceName() + " subscribe data-change. path: " 
+            		+ path + ", listener: " + _listener);
+          }
+          _zkClient.subscribeDataChanges(path, this);
+
+        }
+        else if (type == NotificationContext.Type.FINALIZE)
+        {
+          logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " 
+        		  + path + ", listener: " + _listener);
+          _zkClient.unsubscribeDataChanges(path, this);
+        }
+  }
+  
+  private void subscribeForChanges(NotificationContext context,
+                                   String path,
+                                   boolean watchParent,
+                                   boolean watchChild)
+  {
+    if (watchParent)
+    {
+    	subscribeChildChange(path, context);
     }
 
     if (watchChild)
     {
       try
       {
-        List<String> childNames = _zkClient.getChildren(path);
-        if (childNames == null || childNames.size() == 0)
+    	switch(_changeType)
+    	{
+        case CURRENT_STATE:
+        case IDEAL_STATE:
+        case EXTERNAL_VIEW:
         {
-          return;
+            // check if bucketized
+        	BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+        	List<ZNRecord> records = baseAccessor.getChildren(path, null, 0);
+        	for (ZNRecord record : records)
+        	{
+                HelixProperty property = new HelixProperty(record);
+            	String childPath = path + "/" + record.getId();
+
+                int bucketSize = property.getBucketSize();
+                if (bucketSize > 0)
+                {
+                  // subscribe both data-change and child-change on bucketized parent node
+                  // data-change gives a delete-callback which is used to remove watch
+                  subscribeChildChange(childPath, context);
+                  subscribeDataChange(childPath, context);
+                  
+                  // subscribe data-change on bucketized child
+                  List<String> bucketizedChildNames = _zkClient.getChildren(childPath);
+                  if (bucketizedChildNames != null) 
+                  {
+                    for (String bucketizedChildName : bucketizedChildNames)
+                    {
+                       String bucketizedChildPath = childPath + "/" + bucketizedChildName;
+                       subscribeDataChange(bucketizedChildPath, context);
+                    }  
+                  }
+                } else
+                {
+                    subscribeDataChange(childPath, context);
+                }
+        	}
+        	break;
         }
-
-        for (String childName : childNames)
+        default:
         {
-          String childPath = path + "/" + childName;
-          if (type == NotificationContext.Type.INIT
-              || type == NotificationContext.Type.CALLBACK)
-          {
-            if (logger.isDebugEnabled())
+            List<String> childNames = _zkClient.getChildren(path);
+            if (childNames != null) 
             {
-              logger.debug(_manager.getInstanceName() + " subscribe data change@" + childPath);
+              for (String childName : childNames)
+              {
+                 String childPath = path + "/" + childName;
+                 subscribeDataChange(childPath, context);
+              }  
             }
-            _zkClient.subscribeDataChanges(childPath, this);
-
-          }
-          else if (type == NotificationContext.Type.FINALIZE)
-          {
-            logger.info(_manager.getInstanceName() + " UNsubscribe data change@" + childPath);
-            _zkClient.unsubscribeDataChanges(childPath, this);
-          }
-
-          subscribeForChanges(context, childPath, watchParent, watchChild);
+        	break;
         }
+    	}
       }
       catch (ZkNoNodeException e)
       {
-        logger.warn("fail to subscribe child data change@" + path);
+        logger.warn("fail to subscribe child/data change@" + path, e);
       }
     }
 
@@ -341,10 +397,19 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
       updateNotificationTime(System.nanoTime());
       if (dataPath != null && dataPath.startsWith(_path))
       {
-        NotificationContext changeContext = new NotificationContext(_manager);
-        changeContext.setType(NotificationContext.Type.CALLBACK);
-        _zkClient.unsubscribeChildChanges(dataPath, this);
-        invoke(changeContext);
+          logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " 
+        		  + dataPath + ", listener: " + _listener);
+          _zkClient.unsubscribeDataChanges(dataPath, this);
+
+          // only needed for bucketized parent, but OK if we don't have child-change watch on the path
+          logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: "
+        		  + dataPath + ", listener: " + _listener);
+          _zkClient.unsubscribeChildChanges(dataPath, this);
+
+          // No need to invoke() since this event will handled by child-change on parent-node
+//        NotificationContext changeContext = new NotificationContext(_manager);
+//        changeContext.setType(NotificationContext.Type.CALLBACK);
+// 		  invoke(changeContext);
       }
     }
     catch (Exception e)
@@ -392,17 +457,17 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
 
   private void updateNotificationTime(long nanoTime)
   {
-    long l = lastNotificationTimeStamp.get();
+    long l = _lastNotificationTimeStamp.get();
     while (nanoTime > l)
     {
-      boolean b = lastNotificationTimeStamp.compareAndSet(l, nanoTime);
+      boolean b = _lastNotificationTimeStamp.compareAndSet(l, nanoTime);
       if (b)
       {
         break;
       }
       else
       {
-        l = lastNotificationTimeStamp.get();
+        l = _lastNotificationTimeStamp.get();
       }
     }
   }