You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hz...@apache.org on 2020/10/06 01:44:15 UTC
[helix] branch master updated: Enhancement of ClusterVerifier and
TaskDriver (#1432)
This is an automated email from the ASF dual-hosted git repository.
hzlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 5dfd02e Enhancement of ClusterVerifier and TaskDriver (#1432)
5dfd02e is described below
commit 5dfd02e329d5a709176e571381f0c2adf78383a3
Author: kaisun2000 <52...@users.noreply.github.com>
AuthorDate: Mon Oct 5 18:44:03 2020 -0700
Enhancement of ClusterVerifier and TaskDriver (#1432)
Fix clusterVerifier to take waitTillVeriy and also enhance
logging of TaskDriver and TaskUtil to make sure the job
adding and selective update race condition would have a
log
---
.../apache/helix/messaging/handling/HelixTaskExecutor.java | 4 +++-
.../src/main/java/org/apache/helix/task/TaskDriver.java | 10 ++++++++--
.../src/main/java/org/apache/helix/task/TaskUtil.java | 3 ++-
.../ClusterVerifiers/BestPossibleExternalViewVerifier.java | 12 ++++++------
.../ClusterVerifiers/StrictMatchExternalViewVerifier.java | 8 ++++----
.../tools/ClusterVerifiers/ZkHelixClusterVerifier.java | 13 +++++++++++--
6 files changed, 34 insertions(+), 16 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 45f31da..f5c2dea 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -177,7 +177,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
_lock = new Object();
_statusUpdateUtil = new StatusUpdateUtil();
- _timer = new Timer("HelixTaskExecutor_timer", true); // created as a daemon timer thread to handle task timeout
+ _timer = new Timer("HelixTaskExecutor_Timer", true); // created as a daemon timer thread to handle task timeout
_isShuttingDown = false;
@@ -622,6 +622,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
item.factory().reset();
}
}
+ // threads pool specific to STATE_TRANSITION.Key specific pool are not shut down.
+ // this is a potential area to improve. https://github.com/apache/helix/issues/1245
StringBuilder sb = new StringBuilder();
// Log all tasks that fail to terminate
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 6af6791..fe43166 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -941,9 +941,15 @@ public class TaskDriver {
&& System.currentTimeMillis() < st + timeout);
if (ctx == null || !allowedStates.contains(ctx.getJobState(jobName))) {
+ WorkflowConfig wfcfg = getWorkflowConfig(workflowName);
+ JobConfig jobConfig = getJobConfig(jobName);
+ JobContext jbCtx = getJobContext(jobName);
throw new HelixException(
- String.format("Workflow \"%s\" context is null or job \"%s\" is not in states: %s",
- workflowName, jobName, allowedStates));
+ String.format("Workflow \"%s\" context is null or job \"%s\" is not in states: %s; ctx is %s, jobState is %s, wf cfg %s, jobcfg %s, jbctx %s",
+ workflowName, jobName, allowedStates,
+ ctx == null ? "null" : ctx, ctx != null ? ctx.getJobState(jobName) : "null",
+ wfcfg, jobConfig, jbCtx));
+
}
return ctx.getJobState(jobName);
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 3c2732f..fa89cde 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -952,6 +952,7 @@ public class TaskUtil {
return false;
}
}
+ LOG.info("removed job context {}.", path);
return true;
}
@@ -971,7 +972,7 @@ public class TaskUtil {
return false;
}
}
-
+ LOG.info("removed job config {}.", cfgKey.getPath());
return true;
}
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 744624d..3460b23 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -78,8 +78,8 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
*/
@Deprecated
public BestPossibleExternalViewVerifier(String zkAddr, String clusterName, Set<String> resources,
- Map<String, Map<String, String>> errStates, Set<String> expectLiveInstances) {
- super(zkAddr, clusterName);
+ Map<String, Map<String, String>> errStates, Set<String> expectLiveInstances, int waitTillVerify) {
+ super(zkAddr, clusterName, waitTillVerify);
_errStates = errStates;
_resources = resources;
_expectLiveInstances = expectLiveInstances;
@@ -97,8 +97,8 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
@Deprecated
public BestPossibleExternalViewVerifier(RealmAwareZkClient zkClient, String clusterName,
Set<String> resources, Map<String, Map<String, String>> errStates,
- Set<String> expectLiveInstances) {
- super(zkClient, clusterName, 0);
+ Set<String> expectLiveInstances, int waitTillVerify) {
+ super(zkClient, clusterName, waitTillVerify);
_errStates = errStates;
_resources = resources;
_expectLiveInstances = expectLiveInstances;
@@ -138,13 +138,13 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
if (_zkClient != null) {
return new BestPossibleExternalViewVerifier(_zkClient, _clusterName, _resources, _errStates,
- _expectLiveInstances);
+ _expectLiveInstances, _waitPeriodTillVerify);
}
if (_realmAwareZkConnectionConfig == null || _realmAwareZkClientConfig == null) {
// For backward-compatibility
return new BestPossibleExternalViewVerifier(_zkAddress, _clusterName, _resources,
- _errStates, _expectLiveInstances);
+ _errStates, _expectLiveInstances, _waitPeriodTillVerify);
}
validate();
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
index 1073dad..bb991bd 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
@@ -60,7 +60,7 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
@Deprecated
public StrictMatchExternalViewVerifier(String zkAddr, String clusterName, Set<String> resources,
Set<String> expectLiveInstances) {
- this(zkAddr, clusterName, resources, expectLiveInstances, false);
+ this(zkAddr, clusterName, resources, expectLiveInstances, false, 0);
}
@Deprecated
@@ -71,8 +71,8 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
@Deprecated
private StrictMatchExternalViewVerifier(String zkAddr, String clusterName, Set<String> resources,
- Set<String> expectLiveInstances, boolean isDeactivatedNodeAware) {
- super(zkAddr, clusterName);
+ Set<String> expectLiveInstances, boolean isDeactivatedNodeAware, int waitTillVerify) {
+ super(zkAddr, clusterName, waitTillVerify);
_resources = resources;
_expectLiveInstances = expectLiveInstances;
_isDeactivatedNodeAware = isDeactivatedNodeAware;
@@ -108,7 +108,7 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
if (_realmAwareZkConnectionConfig == null || _realmAwareZkClientConfig == null) {
// For backward-compatibility
return new StrictMatchExternalViewVerifier(_zkAddress, _clusterName, _resources,
- _expectLiveInstances, _isDeactivatedNodeAware);
+ _expectLiveInstances, _isDeactivatedNodeAware, _waitPeriodTillVerify);
}
validate();
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
index 734065f..9050592 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
@@ -107,7 +107,7 @@ public abstract class ZkHelixClusterVerifier
}
@Deprecated
- public ZkHelixClusterVerifier(String zkAddr, String clusterName) {
+ public ZkHelixClusterVerifier(String zkAddr, String clusterName, int waitPeriodTillVerify) {
if (clusterName == null || clusterName.isEmpty()) {
throw new IllegalArgumentException("ZkHelixClusterVerifier: clusterName is null or empty!");
}
@@ -138,7 +138,7 @@ public abstract class ZkHelixClusterVerifier
_clusterName = clusterName;
_accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient));
_keyBuilder = _accessor.keyBuilder();
- _waitPeriodTillVerify = 0;
+ _waitPeriodTillVerify = waitPeriodTillVerify;
}
/**
@@ -355,6 +355,15 @@ public abstract class ZkHelixClusterVerifier
return setZkAddress(zkAddress);
}
+ /**
+ * The class of verify() methods in this class and its subclass such as
+ * BestPossibleExternalViewVerifier is intend to wait for the cluster converging to a stable
+ * state after changes in the cluster. However, after making changes, it would take some time
+ * till controller taking the changes in. Thus, if we verify() too early, before controller
+ * taking the changes, the class may mistake the previous stable cluster state as new (expected)
+ * stable state. This would cause various issues. Thus, we supply a waitPeriod before starting
+ * to validate next expected state to avoid this pre-mature stable state validation.
+ */
public B setWaitTillVerify(int waitPeriod) {
_waitPeriodTillVerify = waitPeriod;
return (B) this;