You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/03/21 01:13:59 UTC

[GitHub] [helix] jiajunwang commented on a change in pull request #904: Fix the concurrent modification error happens during the HelixManager initHandlers() call

jiajunwang commented on a change in pull request #904: Fix the concurrent modification error happens during the HelixManager initHandlers() call
URL: https://github.com/apache/helix/pull/904#discussion_r395946837
 
 

 ##########
 File path: helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
 ##########
 @@ -466,6 +468,79 @@ public void testSessionExpiredWhenResetHandlers() throws Exception {
     deleteCluster(clusterName);
   }
 
+  @Test
+  public void testConcurrentInitCallbackHandlers() throws Exception {
+    final String clusterName =
+        CLUSTER_PREFIX + "_" + _className + "_" + TestHelper.getTestMethodName();
+    TestHelper.setupEmptyCluster(_gZkClient, clusterName);
+    final String spectatorName = "testConcurrentHandlerChangeSpectator";
+    try {
+      BlockingHandleNewSessionZkHelixManager helixManager =
+          new BlockingHandleNewSessionZkHelixManager(clusterName, spectatorName,
+              InstanceType.SPECTATOR, _gZkClient.getServers());
+      helixManager.connect();
+      // Add two mock listeners that will add more callback handlers while handling INIT or CALLBACK event.
+      // Note that we have to test with 2 separate listeners so one of them has a chance to fail if
+      // there is a concurrent modification exception.
+      helixManager.addLiveInstanceChangeListener(
+          (LiveInstanceChangeListener) (liveInstances, changeContext) -> {
+            if (changeContext.getType() != NotificationContext.Type.FINALIZE) {
+              for (LiveInstance liveInstance : liveInstances) {
+                if (liveInstance.getInstanceName().equals("localhost_1")) {
+                  try {
+                    helixManager.addCurrentStateChangeListener(
+                        (CurrentStateChangeListener) (instanceName, statesInfo, currentStateChangeContext) -> {
+                          // empty callback
+                        }, liveInstance.getInstanceName(), liveInstance.getEphemeralOwner());
+                  } catch (Exception e) {
+                    throw new HelixException(
+                        "Unexpected exception in the testConcurrentHandlerProcessing.", e);
+                  }
+                }
+              }
+            }
+          });
+      helixManager.addLiveInstanceChangeListener(
+          (LiveInstanceChangeListener) (liveInstances, changeContext) -> {
+            if (changeContext.getType() != NotificationContext.Type.FINALIZE) {
+              for (LiveInstance liveInstance : liveInstances) {
+                if (liveInstance.getInstanceName().equals("localhost_2")) {
+                  try {
+                    helixManager.addCurrentStateChangeListener(
+                        (CurrentStateChangeListener) (instanceName, statesInfo, currentStateChangeContext) -> {
+                          // empty callback
+                        }, liveInstance.getInstanceName(), liveInstance.getEphemeralOwner());
+                  } catch (Exception e) {
+                    throw new HelixException(
+                        "Unexpected exception in the testConcurrentHandlerProcessing.", e);
+                  }
+                }
+              }
+            }
+          });
+      // Session expire will trigger all callbacks to be init. And the injected liveInstance
+      // listener will trigger more callbackhandlers to be registered during the init process.
+      ZkTestHelper.asyncExpireSession(helixManager.getZkClient());
+      // Create mock live instance znodes to trigger the internal callback handling logic which will
+      // modify the handler list.
+      setupLiveInstances(clusterName, new int[] { 1, 2 });
+      // Start new session handling so the manager will call the initHandler() for initializing all
+      // existing handlers.
+      helixManager.proceedNewSessionHandling();
+      // Ensure the new session has been processed.
+      TestHelper.verify(() -> helixManager.getHandleNewSessionEndTime() != 0, 3000);
+      // Verify that both new mock current state callback handlers have been initialized normally.
+      // Note that if there is concurrent modification that cause errors, one of the callback will
+      // not be initialized normally.
+      for (CallbackHandler handler : helixManager.getHandlers()) {
+        Assert.assertTrue(handler.isReady(),
+            "CallbackHandler is not initialized as expected. It might be caused by a ConcurrentModificationException");
 
 Review comment:
   This is done by the auto format.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org