You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/11/09 19:50:37 UTC

[helix] branch master updated: Remove duplicate subscribe in CallBackHandler (#1504)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 11fa63a  Remove duplicate subscribe in CallBackHandler (#1504)
11fa63a is described below

commit 11fa63ae3039cbf2dc56ca1c9596e8379751db0c
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Mon Nov 9 11:50:29 2020 -0800

    Remove duplicate subscribe in CallBackHandler (#1504)
    
    Remove duplicate subscribe in CallBackHandler.handleChildChange()
    
    Duplicate subscribes lead to longer time spend when process callbacks in zkClient, which eventually leads to increased PendingCallback queue size. This PR removes duplicate subscribeForChanges in CallBackHandler to improve performance.
---
 .../java/org/apache/helix/NotificationContext.java |  9 +++
 .../apache/helix/manager/zk/CallbackHandler.java   | 64 ++--------------------
 .../integration/TestZkCallbackHandlerLeak.java     | 28 ++++++----
 3 files changed, 30 insertions(+), 71 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/NotificationContext.java b/helix-core/src/main/java/org/apache/helix/NotificationContext.java
index 4195d94..b35968b 100644
--- a/helix-core/src/main/java/org/apache/helix/NotificationContext.java
+++ b/helix-core/src/main/java/org/apache/helix/NotificationContext.java
@@ -43,6 +43,7 @@ public class NotificationContext {
   private String _pathChanged;
   private String _eventName;
   private long _creationTime;
+  private boolean _isChildChange;
 
   /**
    * Get the name associated with the event
@@ -227,4 +228,12 @@ public class NotificationContext {
   public void setChangeType(HelixConstants.ChangeType changeType) {
     this._changeType = changeType;
   }
+
+  public boolean getIsChildChange() {
+    return _isChildChange;
+  }
+
+  public void setIsChildChange(boolean isChildChange) {
+    this._isChildChange = isChildChange;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index dece3de..a57a678 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -118,8 +118,6 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
     nextNotificationType.put(Type.FINALIZE, Arrays.asList(Type.INIT));
   }
 
-  // processor to handle async zk event resubscription.
-  private static DedupEventProcessor SubscribeChangeEventProcessor;
 
   private final String _path;
   private final Object _listener;
@@ -142,50 +140,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
   // indicated whether this CallbackHandler is ready to serve event callback from ZkClient.
   private boolean _ready = false;
 
-  static {
-    SubscribeChangeEventProcessor = new DedupEventProcessor<CallbackHandler, SubscribeChangeEvent>(
-        "Singleton", "CallbackHandler-AsycSubscribe") {
-      @Override
-      protected void handleEvent(SubscribeChangeEvent event) {
-        logger.info("CallbackHandler {}, resubscribe change listener to path: {}, for listener: {}, watchChild: {}",
-            event.handler._uid, event.path, event.listener, event.watchChild);
-        try {
-          if (event.handler.isReady()) {
-            event.handler.subscribeForChanges(event.callbackType, event.path, event.watchChild);
-          } else {
-            logger.info("CallbackHandler is not ready, stop subscribing changes listener to "
-                    + "path: {} for listener: {} watchChild: {}", event.path, event.listener,
-                event.listener);
-          }
-        } catch (Exception e) {
-          logger.error("Failed to resubscribe change to path: {} for listener: {}", event.path,
-              event.listener, e);
-        }
-      }
-    };
-
-    SubscribeChangeEventProcessor.start();
-  }
-
-  class SubscribeChangeEvent {
-    final CallbackHandler handler;
-    final String path;
-    final NotificationContext.Type callbackType;
-    final Object listener;
-    final boolean watchChild;
-
-    SubscribeChangeEvent(CallbackHandler handler, NotificationContext.Type callbackType,
-        String path, boolean watchChild, Object listener) {
-      this.handler = handler;
-      this.path = path;
-      this.callbackType = callbackType;
-      this.listener = listener;
-      this.watchChild = watchChild;
-    }
-  }
-
-  class CallbackProcessor
-      extends DedupEventProcessor<NotificationContext.Type, NotificationContext> {
+  class CallbackProcessor extends DedupEventProcessor<NotificationContext.Type, NotificationContext> {
     private CallbackHandler _handler;
 
     public CallbackProcessor(CallbackHandler handler) {
@@ -402,13 +357,9 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
       }
       _expectTypes = nextNotificationType.get(type);
 
-      if (type == Type.INIT || type == Type.FINALIZE) {
+      if (type == Type.INIT || type == Type.FINALIZE || changeContext.getIsChildChange()) {
         subscribeForChanges(changeContext.getType(), _path, _watchChild);
-      } else {
-        // put SubscribeForChange run in async thread to reduce the latency of zk callback handling.
-        subscribeForChangesAsyn(changeContext.getType(), _path, _watchChild);
       }
-
       if (_changeType == IDEAL_STATE) {
         IdealStateChangeListener idealStateChangeListener = (IdealStateChangeListener) _listener;
         List<IdealState> idealStates = preFetch(_propertyKey);
@@ -598,14 +549,6 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
     }
   }
 
-  /** Subscribe Changes in asynchronously */
-  private void subscribeForChangesAsyn(NotificationContext.Type callbackType, String path,
-      boolean watchChild) {
-    SubscribeChangeEvent subscribeEvent =
-        new SubscribeChangeEvent(this, callbackType, path, watchChild, _listener);
-    SubscribeChangeEventProcessor.queueEvent(subscribeEvent.handler, subscribeEvent);
-  }
-
   private void subscribeForChanges(NotificationContext.Type callbackType, String path,
       boolean watchChild) {
     logger.info("CallbackHandler {} subscribing changes listener to path: {}, callback type: {}, "
@@ -734,6 +677,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
         changeContext.setType(NotificationContext.Type.CALLBACK);
         changeContext.setPathChanged(dataPath);
         changeContext.setChangeType(_changeType);
+        changeContext.setIsChildChange(false);
         enqueueTask(changeContext);
       }
     } catch (Exception e) {
@@ -796,7 +740,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
           changeContext.setType(NotificationContext.Type.CALLBACK);
           changeContext.setPathChanged(parentPath);
           changeContext.setChangeType(_changeType);
-          subscribeForChanges(changeContext.getType(), _path, _watchChild);
+          changeContext.setIsChildChange(true);
           enqueueTask(changeContext);
         }
       }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 60dbf91..2f27d4b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -26,9 +26,6 @@ import java.util.Set;
 
 import org.apache.helix.CurrentStateChangeListener;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyType;
@@ -40,12 +37,12 @@ import org.apache.helix.integration.manager.ClusterSpectatorManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.integration.manager.ZkTestManager;
 import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.spectator.RoutingTableProvider;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.spectator.RoutingTableProvider;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
 import org.slf4j.Logger;
@@ -467,23 +464,32 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     cs.setSessionId(jobSessionId);
     cs.setStateModelDefRef(db0.getStateModelDefRef());
 
+    Map<String, List<String>> rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient());
+    Assert.assertFalse(rpWatchPaths.get("dataWatches").contains(jobKey.getPath()));
+
     LOG.info("add job");
-    boolean rtJob = false;
     for (int i = 0; i < mJobUpdateCnt; i++) {
-      rtJob = jobAccesor.setProperty(jobKey, cs);
+      jobAccesor.setProperty(jobKey, cs);
     }
 
+    // verify new watcher is installed on the new node
+    boolean result = TestHelper.verify(() -> {
+      return ZkTestHelper.getListenersByZkPath(ZK_ADDR).keySet().contains(jobKey.getPath());
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(result, "Should get initial clusterConfig callback invoked");
+    rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient());
+    Assert.assertTrue(rpWatchPaths.get("dataWatches").contains(jobKey.getPath()));
+
     LOG.info("remove job");
-    rtJob = jobParticipant.getZkClient().delete(jobKey.getPath());
+    jobParticipant.getZkClient().delete(jobKey.getPath());
 
     // validate the job watch is not leaked.
     Thread.sleep(5000);
 
     Map<String, Set<String>> listenersByZkPath = ZkTestHelper.getListenersByZkPath(ZK_ADDR);
-    boolean jobKeyExists = listenersByZkPath.keySet().contains(jobKey.getPath());
-    Assert.assertFalse(jobKeyExists);
+    Assert.assertFalse(listenersByZkPath.keySet().contains(jobKey.getPath()));
 
-    Map<String, List<String>> rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient());
+    rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient());
     List<String> existWatches = rpWatchPaths.get("existWatches");
     Assert.assertTrue(existWatches.isEmpty());