You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hz...@apache.org on 2020/10/06 01:44:15 UTC

[helix] branch master updated: Enhancement of ClusterVerifier and TaskDriver (#1432)

This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 5dfd02e  Enhancement of ClusterVerifier and TaskDriver (#1432)
5dfd02e is described below

commit 5dfd02e329d5a709176e571381f0c2adf78383a3
Author: kaisun2000 <52...@users.noreply.github.com>
AuthorDate: Mon Oct 5 18:44:03 2020 -0700

    Enhancement of ClusterVerifier and TaskDriver (#1432)
    
    Fix clusterVerifier to take waitTillVeriy and also enhance
    logging of TaskDriver and TaskUtil to make sure the job
    adding and selective update race condition would have a
    log
---
 .../apache/helix/messaging/handling/HelixTaskExecutor.java  |  4 +++-
 .../src/main/java/org/apache/helix/task/TaskDriver.java     | 10 ++++++++--
 .../src/main/java/org/apache/helix/task/TaskUtil.java       |  3 ++-
 .../ClusterVerifiers/BestPossibleExternalViewVerifier.java  | 12 ++++++------
 .../ClusterVerifiers/StrictMatchExternalViewVerifier.java   |  8 ++++----
 .../tools/ClusterVerifiers/ZkHelixClusterVerifier.java      | 13 +++++++++++--
 6 files changed, 34 insertions(+), 16 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 45f31da..f5c2dea 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -177,7 +177,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     _lock = new Object();
     _statusUpdateUtil = new StatusUpdateUtil();
 
-    _timer = new Timer("HelixTaskExecutor_timer", true); // created as a daemon timer thread to handle task timeout
+    _timer = new Timer("HelixTaskExecutor_Timer", true); // created as a daemon timer thread to handle task timeout
 
     _isShuttingDown = false;
 
@@ -622,6 +622,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
         item.factory().reset();
       }
     }
+    // threads pool specific to STATE_TRANSITION.Key specific pool are not shut down.
+    // this is a potential area to improve. https://github.com/apache/helix/issues/1245
 
     StringBuilder sb = new StringBuilder();
     // Log all tasks that fail to terminate
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 6af6791..fe43166 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -941,9 +941,15 @@ public class TaskDriver {
         && System.currentTimeMillis() < st + timeout);
 
     if (ctx == null || !allowedStates.contains(ctx.getJobState(jobName))) {
+      WorkflowConfig wfcfg = getWorkflowConfig(workflowName);
+      JobConfig jobConfig = getJobConfig(jobName);
+      JobContext jbCtx = getJobContext(jobName);
       throw new HelixException(
-          String.format("Workflow \"%s\" context is null or job \"%s\" is not in states: %s",
-              workflowName, jobName, allowedStates));
+          String.format("Workflow \"%s\" context is null or job \"%s\" is not in states: %s; ctx is %s, jobState is %s, wf cfg %s, jobcfg %s, jbctx %s",
+              workflowName, jobName, allowedStates,
+              ctx == null ? "null" : ctx, ctx != null ? ctx.getJobState(jobName) : "null",
+              wfcfg, jobConfig, jbCtx));
+
     }
 
     return ctx.getJobState(jobName);
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 3c2732f..fa89cde 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -952,6 +952,7 @@ public class TaskUtil {
         return false;
       }
     }
+    LOG.info("removed job context {}.", path);
     return true;
   }
 
@@ -971,7 +972,7 @@ public class TaskUtil {
         return false;
       }
     }
-
+    LOG.info("removed job config {}.", cfgKey.getPath());
     return true;
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 744624d..3460b23 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -78,8 +78,8 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
    */
   @Deprecated
   public BestPossibleExternalViewVerifier(String zkAddr, String clusterName, Set<String> resources,
-      Map<String, Map<String, String>> errStates, Set<String> expectLiveInstances) {
-    super(zkAddr, clusterName);
+      Map<String, Map<String, String>> errStates, Set<String> expectLiveInstances, int waitTillVerify) {
+    super(zkAddr, clusterName, waitTillVerify);
     _errStates = errStates;
     _resources = resources;
     _expectLiveInstances = expectLiveInstances;
@@ -97,8 +97,8 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
   @Deprecated
   public BestPossibleExternalViewVerifier(RealmAwareZkClient zkClient, String clusterName,
       Set<String> resources, Map<String, Map<String, String>> errStates,
-      Set<String> expectLiveInstances) {
-    super(zkClient, clusterName, 0);
+      Set<String> expectLiveInstances, int waitTillVerify) {
+    super(zkClient, clusterName, waitTillVerify);
     _errStates = errStates;
     _resources = resources;
     _expectLiveInstances = expectLiveInstances;
@@ -138,13 +138,13 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
 
       if (_zkClient != null) {
         return new BestPossibleExternalViewVerifier(_zkClient, _clusterName, _resources, _errStates,
-            _expectLiveInstances);
+            _expectLiveInstances, _waitPeriodTillVerify);
       }
 
       if (_realmAwareZkConnectionConfig == null || _realmAwareZkClientConfig == null) {
         // For backward-compatibility
         return new BestPossibleExternalViewVerifier(_zkAddress, _clusterName, _resources,
-            _errStates, _expectLiveInstances);
+            _errStates, _expectLiveInstances, _waitPeriodTillVerify);
       }
 
       validate();
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
index 1073dad..bb991bd 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
@@ -60,7 +60,7 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
   @Deprecated
   public StrictMatchExternalViewVerifier(String zkAddr, String clusterName, Set<String> resources,
       Set<String> expectLiveInstances) {
-    this(zkAddr, clusterName, resources, expectLiveInstances, false);
+    this(zkAddr, clusterName, resources, expectLiveInstances, false, 0);
   }
 
   @Deprecated
@@ -71,8 +71,8 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
 
   @Deprecated
   private StrictMatchExternalViewVerifier(String zkAddr, String clusterName, Set<String> resources,
-      Set<String> expectLiveInstances, boolean isDeactivatedNodeAware) {
-    super(zkAddr, clusterName);
+      Set<String> expectLiveInstances, boolean isDeactivatedNodeAware, int waitTillVerify) {
+    super(zkAddr, clusterName, waitTillVerify);
     _resources = resources;
     _expectLiveInstances = expectLiveInstances;
     _isDeactivatedNodeAware = isDeactivatedNodeAware;
@@ -108,7 +108,7 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
       if (_realmAwareZkConnectionConfig == null || _realmAwareZkClientConfig == null) {
         // For backward-compatibility
         return new StrictMatchExternalViewVerifier(_zkAddress, _clusterName, _resources,
-            _expectLiveInstances, _isDeactivatedNodeAware);
+            _expectLiveInstances, _isDeactivatedNodeAware, _waitPeriodTillVerify);
       }
 
       validate();
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
index 734065f..9050592 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
@@ -107,7 +107,7 @@ public abstract class ZkHelixClusterVerifier
   }
 
   @Deprecated
-  public ZkHelixClusterVerifier(String zkAddr, String clusterName) {
+  public ZkHelixClusterVerifier(String zkAddr, String clusterName, int waitPeriodTillVerify) {
     if (clusterName == null || clusterName.isEmpty()) {
       throw new IllegalArgumentException("ZkHelixClusterVerifier: clusterName is null or empty!");
     }
@@ -138,7 +138,7 @@ public abstract class ZkHelixClusterVerifier
     _clusterName = clusterName;
     _accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient));
     _keyBuilder = _accessor.keyBuilder();
-    _waitPeriodTillVerify = 0;
+    _waitPeriodTillVerify = waitPeriodTillVerify;
   }
 
   /**
@@ -355,6 +355,15 @@ public abstract class ZkHelixClusterVerifier
       return setZkAddress(zkAddress);
     }
 
+    /**
+     * The class of verify() methods in this class and its subclass such as
+     * BestPossibleExternalViewVerifier is intend to wait for the cluster converging to a stable
+     * state after changes in the cluster. However, after making changes, it would take some time
+     * till controller taking the changes in. Thus, if we verify() too early, before controller
+     * taking the changes, the class may mistake the previous stable cluster state as new (expected)
+     * stable state. This would cause various issues. Thus, we supply a waitPeriod before starting
+     * to validate next expected state to avoid this pre-mature stable state validation.
+     */
     public B setWaitTillVerify(int waitPeriod) {
       _waitPeriodTillVerify = waitPeriod;
       return (B) this;