You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2019/01/28 19:54:04 UTC

[3/3] helix git commit: Improve the callback handler behavior regarding batch mode event handling when handler is reset.

Improve the callback handler behavior regarding batch mode event handling when handler is reset.

For new session handling, the callback handler should not interrupt the current executing process. This could cause a pending request failed unexpectedly. Note that after the change, closing a callback will still interrupt thread to avoid thread leak.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8c8f79c5
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8c8f79c5
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8c8f79c5

Branch: refs/heads/master
Commit: 8c8f79c5e545818c0e4478addea453fc42b6d003
Parents: 867aa5f
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Fri Jan 11 15:53:17 2019 -0800
Committer: jiajunwang <er...@gmail.com>
Committed: Fri Jan 25 13:04:07 2019 -0800

----------------------------------------------------------------------
 .../helix/common/DedupEventProcessor.java       |  4 ++
 .../helix/manager/zk/CallbackHandler.java       | 55 ++++++++++++++------
 .../apache/helix/manager/zk/ZKHelixManager.java | 14 ++---
 3 files changed, 50 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/8c8f79c5/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java b/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java
index 10e3b00..c26ecd8 100644
--- a/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java
+++ b/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java
@@ -64,6 +64,10 @@ public abstract class DedupEventProcessor<T, E> extends Thread {
     _eventQueue.put(eventType, event);
   }
 
+  public void resetEventQueue() {
+    _eventQueue.clear();
+  }
+
   public void shutdown() {
     this.interrupt();
     _eventQueue.clear();

http://git-wip-us.apache.org/repos/asf/helix/blob/8c8f79c5/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 9e9d1a7..713d214 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
@@ -37,7 +38,13 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.NotificationContext.Type;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.listeners.BatchMode;
 import org.apache.helix.api.listeners.ClusterConfigChangeListener;
 import org.apache.helix.api.listeners.ConfigChangeListener;
 import org.apache.helix.api.listeners.ControllerChangeListener;
@@ -47,15 +54,9 @@ import org.apache.helix.api.listeners.IdealStateChangeListener;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
 import org.apache.helix.api.listeners.MessageListener;
+import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.api.listeners.ResourceConfigChangeListener;
 import org.apache.helix.api.listeners.ScopedConfigChangeListener;
-import org.apache.helix.api.listeners.BatchMode;
-import org.apache.helix.api.listeners.PreFetch;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.NotificationContext.Type;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.common.DedupEventProcessor;
 import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ClusterConfig;
@@ -67,9 +68,9 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.monitoring.mbeans.HelixCallbackMonitor;
+import org.apache.zookeeper.Watcher.Event.EventType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.Watcher.Event.EventType;
 
 import static org.apache.helix.HelixConstants.ChangeType.*;
 
@@ -320,7 +321,13 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
             "CallbackHandler is not ready, ignore change callback from path: "
                 + _path + ", for listener: " + _listener);
       } else {
-        _batchCallbackProcessor.queueEvent(changeContext.getType(), changeContext);
+        synchronized (this) {
+          if (_batchCallbackProcessor != null) {
+            _batchCallbackProcessor.queueEvent(changeContext.getType(), changeContext);
+          } else {
+            throw new HelixException("Failed to process callback in batch mode. Batch Callback Processor does not exist.");
+          }
+        }
       }
     } else {
       invoke(changeContext);
@@ -573,11 +580,14 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
     logger.info("initializing CallbackHandler: " + this.toString() + " content: " + getContent());
 
     if (_batchModeEnabled) {
-      if (_batchCallbackProcessor != null) {
-        _batchCallbackProcessor.shutdown();
+      synchronized (this) {
+        if (_batchCallbackProcessor != null) {
+          _batchCallbackProcessor.resetEventQueue();
+        } else {
+          _batchCallbackProcessor = new CallbackProcessor(this);
+          _batchCallbackProcessor.start();
+        }
       }
-      _batchCallbackProcessor = new CallbackProcessor(this);
-      _batchCallbackProcessor.start();
     }
 
     updateNotificationTime(System.nanoTime());
@@ -674,12 +684,25 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
   /**
    * Invoke the listener for the last time so that the listener could clean up resources
    */
+  @Deprecated
   public void reset() {
-    logger.info("Resetting CallbackHandler: " + this.toString());
+    reset(true);
+  }
+
+  void reset(boolean isShutdown) {
+    logger.info("Resetting CallbackHandler: {}. Is resetting for shutdown: {}.", this.toString(),
+        isShutdown);
     try {
       _ready = false;
-      if (_batchCallbackProcessor != null) {
-        _batchCallbackProcessor.shutdown();
+      synchronized (this) {
+        if (_batchCallbackProcessor != null) {
+          if (isShutdown) {
+            _batchCallbackProcessor.shutdown();
+            _batchCallbackProcessor = null;
+          } else {
+            _batchCallbackProcessor.resetEventQueue();
+          }
+        }
       }
       NotificationContext changeContext = new NotificationContext(_manager);
       changeContext.setType(NotificationContext.Type.FINALIZE);

http://git-wip-us.apache.org/repos/asf/helix/blob/8c8f79c5/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index ba3c16f..aa08c76 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -298,7 +298,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
 
       // handler.reset() may modify the handlers list, so do it outside the iteration
       for (CallbackHandler handler : toRemove) {
-        handler.reset();
+        handler.reset(true);
       }
     }
 
@@ -717,10 +717,10 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       // TODO reset user defined handlers only
       // TODO Fix the issue that when connection disconnected, reset handlers will be blocked. -- JJ
       // This is because reset logic contains ZK operations.
-      resetHandlers();
+      resetHandlers(true);
 
       if (_leaderElectionHandler != null) {
-        _leaderElectionHandler.reset();
+        _leaderElectionHandler.reset(true);
       }
 
     } finally {
@@ -947,7 +947,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     }
   }
 
-  void resetHandlers() {
+  void resetHandlers(boolean isShutdown) {
     synchronized (this) {
       if (_handlers != null) {
         // get a copy of the list and iterate over the copy list
@@ -956,7 +956,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
         tmpHandlers.addAll(_handlers);
 
         for (CallbackHandler handler : tmpHandlers) {
-          handler.reset();
+          handler.reset(isShutdown);
           LOG.info("reset handler: " + handler.getPath() + ", " + handler.getListener());
         }
       }
@@ -1054,9 +1054,9 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
      */
     stopTimerTasks();
     if (_leaderElectionHandler != null) {
-      _leaderElectionHandler.reset();
+      _leaderElectionHandler.reset(false);
     }
-    resetHandlers();
+    resetHandlers(false);
 
     /**
      * clean up write-through cache