You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/06/22 22:57:43 UTC

[24/50] [abbrv] helix git commit: Adding support to batch ZK callback optionally by setting sys var asyncBatchModeEnabled=true

Adding support to batch ZK callback optionally by setting sys var asyncBatchModeEnabled=true


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/384978a2
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/384978a2
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/384978a2

Branch: refs/heads/master
Commit: 384978a2e16ab0f4adb388e32c7e448c77996ca2
Parents: 015a73c
Author: kishoreg <ki...@apache.org>
Authored: Fri Mar 24 10:48:05 2017 -0700
Committer: kishoreg <ki...@apache.org>
Committed: Fri Mar 24 10:48:05 2017 -0700

----------------------------------------------------------------------
 .../helix/manager/zk/CallbackHandler.java       | 130 ++++++++++++-------
 1 file changed, 86 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/384978a2/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index c3e8206..90df56d 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -31,6 +31,8 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.I0Itec.zkclient.IZkChildListener;
@@ -53,7 +55,7 @@ import org.apache.helix.MessageListener;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.ScopedConfigChangeListener;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.CurrentState;
@@ -89,15 +91,19 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
   private final AtomicLong _lastNotificationTimeStamp;
   private final HelixManager _manager;
   private final PropertyKey _propertyKey;
-
+  BlockingQueue<NotificationContext> _queue = new LinkedBlockingQueue<NotificationContext>(1000);
+  private static boolean asyncBatchModeEnabled = false;
+  static {
+    asyncBatchModeEnabled = Boolean.parseBoolean(System.getProperty("isAsyncBatchModeEnabled"));
+    logger.info("isAsyncBatchModeEnabled: " + asyncBatchModeEnabled);
+  }
   /**
    * maintain the expected notification types
    * this is fix for HELIX-195: race condition between FINALIZE callbacks and Zk callbacks
    */
   private List<NotificationContext.Type> _expectTypes = nextNotificationType.get(Type.FINALIZE);
 
-  public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey,
-      Object listener, EventType[] eventTypes, ChangeType changeType) {
+  public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey, Object listener, EventType[] eventTypes, ChangeType changeType) {
     if (listener == null) {
       throw new HelixException("listener could not be null");
     }
@@ -111,6 +117,10 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
     this._eventTypes = eventTypes;
     this._changeType = changeType;
     this._lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
+    this._queue = new LinkedBlockingQueue<NotificationContext>(1000);
+    if (asyncBatchModeEnabled) {
+      new Thread(new CallbackInvoker(this)).start();
+    }
     init();
   }
 
@@ -122,13 +132,58 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
     return _path;
   }
 
+  class CallbackInvoker implements Runnable {
+    private CallbackHandler handler;
+
+    CallbackInvoker(CallbackHandler handler) {
+      this.handler = handler;
+    }
+
+    public void run() {
+      while (true) {
+        try {
+          NotificationContext notificationToProcess = _queue.take();
+          int mergedCallbacks = 0;
+          // remove all elements in the queue that have the same type
+          while (true) {
+            NotificationContext nextItem = _queue.peek();
+            if (nextItem != null && notificationToProcess.getType() == nextItem.getType()) {
+              notificationToProcess = _queue.take();
+              mergedCallbacks++;
+            } else {
+              break;
+            }
+          }
+          try {
+            logger.info("Num callbacks merged for path:" + handler.getPath() + " : " + mergedCallbacks);
+            handler.invoke(notificationToProcess);
+          } catch (Exception e) {
+            logger.warn("Exception in callback processing thread. Skipping callback", e);
+          }
+        } catch (InterruptedException e) {
+          logger.warn("Interrupted exception in callback processing thread. Exiting thread, new callbacks will not be processed", e);
+          break;
+        }
+      }
+    }
+  }
+
+  public void enqueueTask(NotificationContext changeContext) throws Exception {
+    //async mode only applicable to CALLBACK from ZK, During INIT and FINALIZE invoke the callback's immediately.
+    if (asyncBatchModeEnabled && changeContext.getType() != NotificationContext.Type.CALLBACK) {
+      logger.info("Enqueuing callback");
+      _queue.put(changeContext);
+    } else {
+      invoke(changeContext);
+    }
+  }
+
   public void invoke(NotificationContext changeContext) throws Exception {
     // This allows the listener to work with one change at a time
     synchronized (_manager) {
       Type type = changeContext.getType();
       if (!_expectTypes.contains(type)) {
-        logger.warn("Skip processing callbacks for listener: " + _listener + ", path: " + _path
-            + ", expected types: " + _expectTypes + " but was " + type);
+        logger.warn("Skip processing callbacks for listener: " + _listener + ", path: " + _path + ", expected types: " + _expectTypes + " but was " + type);
         return;
       }
       _expectTypes = nextNotificationType.get(type);
@@ -136,8 +191,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
       // Builder keyBuilder = _accessor.keyBuilder();
       long start = System.currentTimeMillis();
       if (logger.isInfoEnabled()) {
-        logger.info(Thread.currentThread().getId() + " START:INVOKE " + _path + " listener:"
-            + _listener.getClass().getCanonicalName());
+        logger.info(Thread.currentThread().getId() + " START:INVOKE " + _path + " listener:" + _listener.getClass().getCanonicalName());
       }
 
       if (_changeType == IDEAL_STATE) {
@@ -165,18 +219,16 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
         List<HelixProperty> configs = _accessor.getChildValues(_propertyKey);
         listener.onConfigChange(configs, changeContext);
       } else if (_changeType == LIVE_INSTANCE) {
-        LiveInstanceChangeListener liveInstanceChangeListener =
-            (LiveInstanceChangeListener) _listener;
+        LiveInstanceChangeListener liveInstanceChangeListener = (LiveInstanceChangeListener) _listener;
         subscribeForChanges(changeContext, _path, true, true);
         List<LiveInstance> liveInstances = _accessor.getChildValues(_propertyKey);
 
         liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext);
 
       } else if (_changeType == CURRENT_STATE) {
-        CurrentStateChangeListener currentStateChangeListener =
-            (CurrentStateChangeListener) _listener;
+        CurrentStateChangeListener currentStateChangeListener = (CurrentStateChangeListener) _listener;
         subscribeForChanges(changeContext, _path, true, true);
-        String instanceName = PropertyPathBuilder.getInstanceNameFromPath(_path);
+        String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
 
         List<CurrentState> currentStates = _accessor.getChildValues(_propertyKey);
 
@@ -185,7 +237,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
       } else if (_changeType == MESSAGE) {
         MessageListener messageListener = (MessageListener) _listener;
         subscribeForChanges(changeContext, _path, true, false);
-        String instanceName = PropertyPathBuilder.getInstanceNameFromPath(_path);
+        String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
         List<Message> messages = _accessor.getChildValues(_propertyKey);
 
         messageListener.onMessage(instanceName, messages, changeContext);
@@ -211,8 +263,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
 
       long end = System.currentTimeMillis();
       if (logger.isInfoEnabled()) {
-        logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path + " listener:"
-            + _listener.getClass().getCanonicalName() + " Took: " + (end - start) + "ms");
+        logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path + " listener:" + _listener.getClass().getCanonicalName() + " Took: " + (end - start)
+            + "ms");
       }
     }
   }
@@ -220,12 +272,10 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
   private void subscribeChildChange(String path, NotificationContext context) {
     NotificationContext.Type type = context.getType();
     if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK) {
-      logger.info(_manager.getInstanceName() + " subscribes child-change. path: " + path
-          + ", listener: " + _listener);
+      logger.info(_manager.getInstanceName() + " subscribes child-change. path: " + path + ", listener: " + _listener);
       _zkClient.subscribeChildChanges(path, this);
     } else if (type == NotificationContext.Type.FINALIZE) {
-      logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + path
-          + ", listener: " + _listener);
+      logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + path + ", listener: " + _listener);
 
       _zkClient.unsubscribeChildChanges(path, this);
     }
@@ -235,22 +285,20 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
     NotificationContext.Type type = context.getType();
     if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK) {
       if (logger.isDebugEnabled()) {
-        logger.debug(_manager.getInstanceName() + " subscribe data-change. path: " + path
-            + ", listener: " + _listener);
+        logger.debug(_manager.getInstanceName() + " subscribe data-change. path: " + path + ", listener: " + _listener);
       }
       _zkClient.subscribeDataChanges(path, this);
 
     } else if (type == NotificationContext.Type.FINALIZE) {
-      logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + path
-          + ", listener: " + _listener);
+      logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + path + ", listener: " + _listener);
 
       _zkClient.unsubscribeDataChanges(path, this);
     }
   }
 
   // TODO watchParent is always true. consider remove it
-  private void subscribeForChanges(NotificationContext context, String path, boolean watchParent,
-      boolean watchChild) {
+  private void subscribeForChanges(NotificationContext context, String path, boolean watchParent, boolean watchChild) {
+    long start = System.currentTimeMillis();
     if (watchParent) {
       subscribeChildChange(path, context);
     }
@@ -301,10 +349,11 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
         }
         }
       } catch (ZkNoNodeException e) {
-        logger.warn("fail to subscribe child/data change. path: " + path + ", listener: "
-            + _listener, e);
+        logger.warn("fail to subscribe child/data change. path: " + path + ", listener: " + _listener, e);
       }
     }
+    long end = System.currentTimeMillis();
+    logger.info("Subcribing to path:" + path + " took:" + (end - start));
 
   }
 
@@ -321,7 +370,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
     try {
       NotificationContext changeContext = new NotificationContext(_manager);
       changeContext.setType(NotificationContext.Type.INIT);
-      invoke(changeContext);
+      enqueueTask(changeContext);
     } catch (Exception e) {
       String msg = "Exception while invoking init callback for listener:" + _listener;
       ZKExceptionHandler.getInstance().handle(msg, e);
@@ -335,11 +384,10 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
       if (dataPath != null && dataPath.startsWith(_path)) {
         NotificationContext changeContext = new NotificationContext(_manager);
         changeContext.setType(NotificationContext.Type.CALLBACK);
-        invoke(changeContext);
+        enqueueTask(changeContext);
       }
     } catch (Exception e) {
-      String msg =
-          "exception in handling data-change. path: " + dataPath + ", listener: " + _listener;
+      String msg = "exception in handling data-change. path: " + dataPath + ", listener: " + _listener;
       ZKExceptionHandler.getInstance().handle(msg, e);
     }
   }
@@ -349,14 +397,12 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
     try {
       updateNotificationTime(System.nanoTime());
       if (dataPath != null && dataPath.startsWith(_path)) {
-        logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + dataPath
-            + ", listener: " + _listener);
+        logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + dataPath + ", listener: " + _listener);
         _zkClient.unsubscribeDataChanges(dataPath, this);
 
         // only needed for bucketized parent, but OK if we don't have child-change
         // watch on the bucketized parent path
-        logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + dataPath
-            + ", listener: " + _listener);
+        logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + dataPath + ", listener: " + _listener);
         _zkClient.unsubscribeChildChanges(dataPath, this);
         // No need to invoke() since this event will handled by child-change on parent-node
         // NotificationContext changeContext = new NotificationContext(_manager);
@@ -364,9 +410,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
         // invoke(changeContext);
       }
     } catch (Exception e) {
-      String msg =
-          "exception in handling data-delete-change. path: " + dataPath + ", listener: "
-              + _listener;
+      String msg = "exception in handling data-delete-change. path: " + dataPath + ", listener: " + _listener;
       ZKExceptionHandler.getInstance().handle(msg, e);
     }
   }
@@ -384,13 +428,11 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
           _manager.removeListener(_propertyKey, _listener);
         } else {
           changeContext.setType(NotificationContext.Type.CALLBACK);
-          invoke(changeContext);
+          enqueueTask(changeContext);
         }
       }
     } catch (Exception e) {
-      String msg =
-          "exception in handling child-change. instance: " + _manager.getInstanceName()
-              + ", parentPath: " + parentPath + ", listener: " + _listener;
+      String msg = "exception in handling child-change. instance: " + _manager.getInstanceName() + ", parentPath: " + parentPath + ", listener: " + _listener;
       ZKExceptionHandler.getInstance().handle(msg, e);
     }
   }
@@ -402,7 +444,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
     try {
       NotificationContext changeContext = new NotificationContext(_manager);
       changeContext.setType(NotificationContext.Type.FINALIZE);
-      invoke(changeContext);
+      enqueueTask(changeContext);
     } catch (Exception e) {
       String msg = "Exception while resetting the listener:" + _listener;
       ZKExceptionHandler.getInstance().handle(msg, e);