You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/09/29 01:38:11 UTC
[1/4] helix git commit: Add ABORT state in TaskState
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x 9fc6c540b -> dca1ed05b
Add ABORT state in TaskState
1. Add ABORT state in TaskState
2. Set tasks IN_PROGRESS to ABORT when workflow fails
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/516ff0dc
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/516ff0dc
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/516ff0dc
Branch: refs/heads/helix-0.6.x
Commit: 516ff0dcad13cea578168b41752b62e45d43362d
Parents: 9fc6c54
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Sep 22 16:28:58 2016 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Sep 22 16:28:58 2016 -0700
----------------------------------------------------------------------
.../org/apache/helix/task/TaskRebalancer.java | 5 +++
.../java/org/apache/helix/task/TaskState.java | 6 +++-
.../TestGenericTaskAssignmentCalculator.java | 36 ++++++++++++++++++++
3 files changed, 46 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/516ff0dc/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 22f91e7..1517c5a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -83,6 +83,11 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
failedJobs ++;
if (failedJobs > cfg.getFailureThreshold()) {
ctx.setWorkflowState(TaskState.FAILED);
+ for (String jobToFail : cfg.getJobDag().getAllNodes()) {
+ if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
+ ctx.setJobState(jobToFail, TaskState.ABORTED);
+ }
+ }
return true;
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/516ff0dc/helix-core/src/main/java/org/apache/helix/task/TaskState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskState.java b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
index dac5df9..1000a9b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
@@ -43,5 +43,9 @@ public enum TaskState {
/**
* All the task partitions have completed normally.
*/
- COMPLETED
+ COMPLETED,
+ /**
+ * The task are aborted due to workflow fail
+ */
+ ABORTED
}
http://git-wip-us.apache.org/repos/asf/helix/blob/516ff0dc/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
index 5645009..7cff051 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
@@ -39,6 +39,7 @@ import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskResult;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.Workflow;
import org.apache.helix.tools.ClusterSetup;
import org.testng.Assert;
@@ -54,6 +55,7 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
private Map<String, Integer> _runCounts = Maps.newHashMap();
private TaskConfig _taskConfig;
private Map<String, String> _jobCommandMap;
+ private boolean failTask;
@BeforeClass
public void beforeClass() throws Exception {
@@ -110,6 +112,7 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
@Test
public void testMultipleJobAssignment() throws InterruptedException {
+ failTask = false;
String workflowName = TestHelper.getTestMethodName();
Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
@@ -130,6 +133,7 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
@Test
public void testMultipleTaskAssignment() throws InterruptedException {
+ failTask = false;
String workflowName = TestHelper.getTestMethodName();
Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
@@ -148,6 +152,35 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
Assert.assertEquals(_runCounts.size(), 5);
}
+ @Test
+ public void testAbortTaskForWorkflowFail()
+ throws InterruptedException {
+ failTask = true;
+ String workflowName = TestHelper.getTestMethodName();
+ Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
+ List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
+ taskConfigs.add(_taskConfig);
+ JobConfig.Builder jobBuilder =
+ new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
+ .setJobCommandConfigMap(_jobCommandMap);
+
+ for (int i = 0; i < 5; i++) {
+ workflowBuilder.addJob("JOB" + i, jobBuilder);
+ }
+
+ _driver.start(workflowBuilder.build());
+ _driver.pollForWorkflowState(workflowName, TaskState.FAILED);
+
+ int abortedTask = 0;
+ for (TaskState jobState : _driver.getWorkflowContext(workflowName).getJobStates().values()) {
+ if (jobState == TaskState.ABORTED) {
+ abortedTask++;
+ }
+ }
+
+ Assert.assertEquals(abortedTask, 4);
+ }
+
private class TaskOne extends MockTask {
private final String _instanceName;
@@ -165,6 +198,9 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
public TaskResult run() {
_invokedClasses.add(getClass().getName());
_runCounts.put(_instanceName, _runCounts.get(_instanceName) + 1);
+ if (failTask) {
+ return new TaskResult(TaskResult.Status.FAILED, "");
+ }
return new TaskResult(TaskResult.Status.COMPLETED, "");
}
}
[4/4] helix git commit: Fix NPE when first time call
WorkflowRebalancer
Posted by lx...@apache.org.
Fix NPE when first time call WorkflowRebalancer
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/dca1ed05
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/dca1ed05
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/dca1ed05
Branch: refs/heads/helix-0.6.x
Commit: dca1ed05bfccce69814fdb59b095d734d1c82de0
Parents: f5705dc
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Sep 22 16:37:12 2016 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Sep 22 16:37:12 2016 -0700
----------------------------------------------------------------------
.../apache/helix/monitoring/mbeans/ClusterStatusMonitor.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/dca1ed05/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 9b13e76..7f996c5 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -463,6 +463,11 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
private void updateJobGauges(JobConfig jobConfig, TaskState current) {
+ // When first time for WorkflowRebalancer call, jobconfig may not ready.
+ // Thus only check it for gauge.
+ if (jobConfig == null) {
+ return;
+ }
String jobType = jobConfig.getJobType();
jobType = preProcessJobMonitor(jobType);
_perTypeJobMonitorMap.get(jobType).updateJobGauge(current);
[3/4] helix git commit: Add AbortedJobCount in JobMonior
Posted by lx...@apache.org.
Add AbortedJobCount in JobMonior
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f5705dc9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f5705dc9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f5705dc9
Branch: refs/heads/helix-0.6.x
Commit: f5705dc9201716543c72427c004f1f64211e304e
Parents: 4c88f81
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Sep 22 16:36:34 2016 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Sep 22 16:36:34 2016 -0700
----------------------------------------------------------------------
.../org/apache/helix/monitoring/mbeans/JobMonitor.java | 9 +++++++++
.../apache/helix/monitoring/mbeans/JobMonitorMBean.java | 10 ++++++++--
.../main/java/org/apache/helix/task/TaskRebalancer.java | 2 ++
3 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/f5705dc9/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
index 96e05ca..8fca450 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
@@ -32,6 +32,7 @@ public class JobMonitor implements JobMonitorMBean {
private long _successfullJobCount;
private long _failedJobCount;
+ private long _abortedJobCount;
private long _existingJobGauge;
private long _queuedJobGauge;
private long _runningJobGauge;
@@ -41,6 +42,7 @@ public class JobMonitor implements JobMonitorMBean {
_jobType = jobType;
_successfullJobCount = 0L;
_failedJobCount = 0L;
+ _abortedJobCount = 0L;
_existingJobGauge = 0L;
_queuedJobGauge = 0L;
_runningJobGauge = 0L;
@@ -57,6 +59,11 @@ public class JobMonitor implements JobMonitorMBean {
}
@Override
+ public long getAbortedJobCount() {
+ return _abortedJobCount;
+ }
+
+ @Override
public long getExistingJobGauge() {
return _existingJobGauge;
}
@@ -89,6 +96,8 @@ public class JobMonitor implements JobMonitorMBean {
_failedJobCount++;
} else if (to.equals(TaskState.COMPLETED)) {
_successfullJobCount++;
+ } else if (to.equals(TaskState.ABORTED)) {
+ _abortedJobCount++;
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/f5705dc9/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
index 31a2643..5d30ec9 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
@@ -27,18 +27,24 @@ import org.apache.helix.monitoring.SensorNameProvider;
public interface JobMonitorMBean extends SensorNameProvider {
/**
- * Get numbers of the succeeded jobs
+ * Get number of the succeeded jobs
* @return
*/
public long getSuccessfulJobCount();
/**
- * Get numbers of failed jobs
+ * Get number of failed jobs
* @return
*/
public long getFailedJobCount();
/**
+ * Get number of the aborted jobs
+ * @return
+ */
+ public long getAbortedJobCount();
+
+ /**
* Get number of existing jobs registered
* @return
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/f5705dc9/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 6abb3f4..54f7bd5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -87,6 +87,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
for (String jobToFail : cfg.getJobDag().getAllNodes()) {
if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
ctx.setJobState(jobToFail, TaskState.ABORTED);
+ _clusterStatusMonitor
+ .updateJobCounters(TaskUtil.getJobCfg(_manager, jobToFail), TaskState.ABORTED);
}
}
return true;
[2/4] helix git commit: Fix Workflow and Job metrics Counters
Posted by lx...@apache.org.
Fix Workflow and Job metrics Counters
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4c88f815
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4c88f815
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4c88f815
Branch: refs/heads/helix-0.6.x
Commit: 4c88f815e996d896a1582a59c7df83e0f3d20285
Parents: 516ff0d
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Sep 22 16:35:40 2016 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Sep 22 16:35:40 2016 -0700
----------------------------------------------------------------------
.../controller/GenericHelixController.java | 9 +--
.../monitoring/mbeans/ClusterStatusMonitor.java | 58 ++++++++++++++------
.../helix/monitoring/mbeans/JobMonitor.java | 54 +++++++++---------
.../monitoring/mbeans/JobMonitorMBean.java | 6 --
.../monitoring/mbeans/WorkflowMonitor.java | 55 +++++++++----------
.../monitoring/mbeans/WorkflowMonitorMBean.java | 6 --
.../org/apache/helix/task/JobRebalancer.java | 11 +---
.../org/apache/helix/task/TaskRebalancer.java | 2 +
.../apache/helix/task/WorkflowRebalancer.java | 16 +-----
9 files changed, 104 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/4c88f815/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index fb30f0d..8016e44 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -28,8 +28,6 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicReference;
-import javax.management.MalformedObjectNameException;
-
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.helix.ConfigChangeListener;
import org.apache.helix.ControllerChangeListener;
@@ -273,11 +271,10 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
} else {
if (_clusterStatusMonitor == null) {
_clusterStatusMonitor = new ClusterStatusMonitor(manager.getClusterName());
- TaskDriver driver = new TaskDriver(manager);
- _clusterStatusMonitor.setWorkflowsStatus(driver);
- _clusterStatusMonitor.setJobsStatus(driver);
}
-
+ TaskDriver driver = new TaskDriver(manager);
+ _clusterStatusMonitor.refreshWorkflowsStatus(driver);
+ _clusterStatusMonitor.refreshJobsStatus(driver);
event.addAttribute("clusterStatusMonitor", _clusterStatusMonitor);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4c88f815/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 55b774d..9b13e76 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -42,6 +42,7 @@ import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskState;
+import org.apache.helix.task.Workflow;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.apache.log4j.Logger;
@@ -394,21 +395,36 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
}
- public void setWorkflowsStatus(TaskDriver driver) {
+ public void refreshWorkflowsStatus(TaskDriver driver) {
+ for (WorkflowMonitor workflowMonitor : _perTypeWorkflowMonitorMap.values()) {
+ workflowMonitor.resetGauges();
+ }
+
Map<String, WorkflowConfig> workflowConfigMap = driver.getWorkflows();
for (String workflow : workflowConfigMap.keySet()) {
if (workflowConfigMap.get(workflow).isRecurring()) {
continue;
}
WorkflowContext workflowContext = driver.getWorkflowContext(workflow);
- TaskState toState =
+ TaskState currentState =
workflowContext == null ? TaskState.NOT_STARTED : workflowContext.getWorkflowState();
- updateWorkflowStatus(workflowConfigMap.get(workflow), null, toState);
+ updateWorkflowGauges(workflowConfigMap.get(workflow), currentState);
}
}
- public void updateWorkflowStatus(WorkflowConfig workflowConfig, TaskState from, TaskState to) {
+ public void updateWorkflowCounters(WorkflowConfig workflowConfig, TaskState to) {
+ String workflowType = workflowConfig.getWorkflowType();
+ workflowType = preProcessWorkflow(workflowType);
+ _perTypeWorkflowMonitorMap.get(workflowType).updateWorkflowCounters(to);
+ }
+
+ private void updateWorkflowGauges(WorkflowConfig workflowConfig, TaskState current) {
String workflowType = workflowConfig.getWorkflowType();
+ workflowType = preProcessWorkflow(workflowType);
+ _perTypeWorkflowMonitorMap.get(workflowType).updateWorkflowGauges(current);
+ }
+
+ private String preProcessWorkflow(String workflowType) {
if (workflowType == null || workflowType.length() == 0) {
workflowType = DEFAULT_WORKFLOW_JOB_TYPE;
}
@@ -422,28 +438,37 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
_perTypeWorkflowMonitorMap.put(workflowType, monitor);
}
-
- _perTypeWorkflowMonitorMap.get(workflowType).updateWorkflowStats(from, to);
+ return workflowType;
}
- public void setJobsStatus(TaskDriver driver) {
+ public void refreshJobsStatus(TaskDriver driver) {
+ for (JobMonitor jobMonitor : _perTypeJobMonitorMap.values()) {
+ jobMonitor.resetJobGauge();
+ }
for (String workflow : driver.getWorkflows().keySet()) {
Set<String> allJobs = driver.getWorkflowConfig(workflow).getJobDag().getAllNodes();
WorkflowContext workflowContext = driver.getWorkflowContext(workflow);
-
for (String job : allJobs) {
- TaskState toState = null;
- if (workflowContext != null) {
- toState = workflowContext.getJobState(job);
- }
- toState = toState == null ? TaskState.NOT_STARTED : toState;
- updateJobStatus(driver.getJobConfig(job), null, toState);
+ TaskState currentState =
+ workflowContext == null ? TaskState.NOT_STARTED : workflowContext.getJobState(job);
+ updateJobGauges(driver.getJobConfig(job), currentState);
}
}
}
- public void updateJobStatus(JobConfig jobConfig, TaskState from, TaskState to) {
+ public void updateJobCounters(JobConfig jobConfig, TaskState to) {
+ String jobType = jobConfig.getJobType();
+ jobType = preProcessJobMonitor(jobType);
+ _perTypeJobMonitorMap.get(jobType).updateJobCounters(to);
+ }
+
+ private void updateJobGauges(JobConfig jobConfig, TaskState current) {
String jobType = jobConfig.getJobType();
+ jobType = preProcessJobMonitor(jobType);
+ _perTypeJobMonitorMap.get(jobType).updateJobGauge(current);
+ }
+
+ private String preProcessJobMonitor(String jobType) {
if (jobType == null || jobType.length() == 0) {
jobType = DEFAULT_WORKFLOW_JOB_TYPE;
}
@@ -457,8 +482,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
_perTypeJobMonitorMap.put(jobType, monitor);
}
-
- _perTypeJobMonitorMap.get(jobType).updateJobStats(from, to);
+ return jobType;
}
private synchronized void registerInstances(Collection<InstanceMonitor> instances)
http://git-wip-us.apache.org/repos/asf/helix/blob/4c88f815/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
index 5754b8d..96e05ca 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
@@ -20,6 +20,8 @@ package org.apache.helix.monitoring.mbeans;
*/
import org.apache.helix.task.TaskState;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
public class JobMonitor implements JobMonitorMBean {
@@ -28,7 +30,6 @@ public class JobMonitor implements JobMonitorMBean {
private String _clusterName;
private String _jobType;
- private long _allJobCount;
private long _successfullJobCount;
private long _failedJobCount;
private long _existingJobGauge;
@@ -38,7 +39,6 @@ public class JobMonitor implements JobMonitorMBean {
public JobMonitor(String clusterName, String jobType) {
_clusterName = clusterName;
_jobType = jobType;
- _allJobCount = 0L;
_successfullJobCount = 0L;
_failedJobCount = 0L;
_existingJobGauge = 0L;
@@ -47,11 +47,6 @@ public class JobMonitor implements JobMonitorMBean {
}
@Override
- public long getAllJobCount() {
- return _allJobCount;
- }
-
- @Override
public long getSuccessfulJobCount() {
return _successfullJobCount;
}
@@ -86,33 +81,36 @@ public class JobMonitor implements JobMonitorMBean {
}
/**
- * Update job metrics with transition state
- * @param from The from state of job, just created when it is null
+ * Update job counters with transition state
* @param to The to state of job, cleaned by ZK when it is null
*/
- public void updateJobStats(TaskState from, TaskState to) {
- if (from == null) {
- // From null means a new job has been created
- _existingJobGauge++;
- _queuedJobGauge++;
- _allJobCount++;
- } else if (from.equals(TaskState.NOT_STARTED)) {
- // From NOT_STARTED means queued job number has been decreased one
- _queuedJobGauge--;
- } else if (from.equals(TaskState.IN_PROGRESS)) {
- // From IN_PROGRESS means running job number has been decreased one
- _runningJobGauge--;
+ public void updateJobCounters(TaskState to) {
+ if (to.equals(TaskState.FAILED)) {
+ _failedJobCount++;
+ } else if (to.equals(TaskState.COMPLETED)) {
+ _successfullJobCount++;
}
+ }
- if (to == null) {
- // To null means the job has been cleaned from ZK
- _existingJobGauge--;
+ /**
+ * Reset job gauges
+ */
+ public void resetJobGauge() {
+ _queuedJobGauge = 0L;
+ _existingJobGauge = 0L;
+ _runningJobGauge = 0L;
+ }
+
+ /**
+ * Refresh job gauges
+ * @param to The current state of job
+ */
+ public void updateJobGauge(TaskState to) {
+ _existingJobGauge++;
+ if (to == null || to.equals(TaskState.NOT_STARTED)) {
+ _queuedJobGauge++;
} else if (to.equals(TaskState.IN_PROGRESS)) {
_runningJobGauge++;
- } else if (to.equals(TaskState.FAILED)) {
- _failedJobCount++;
- } else if (to.equals(TaskState.COMPLETED)) {
- _successfullJobCount++;
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4c88f815/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
index 2685096..31a2643 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
@@ -27,12 +27,6 @@ import org.apache.helix.monitoring.SensorNameProvider;
public interface JobMonitorMBean extends SensorNameProvider {
/**
- * Get numbers of all job count
- * @return
- */
- public long getAllJobCount();
-
- /**
* Get numbers of the succeeded jobs
* @return
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/4c88f815/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
index 631c650..5789677 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
@@ -27,7 +27,6 @@ public class WorkflowMonitor implements WorkflowMonitorMBean {
private String _clusterName;
private String _workflowType;
- private long _allWorkflowCount;
private long _successfulWorkflowCount;
private long _failedWorkflowCount;
private long _existingWorkflowGauge;
@@ -38,7 +37,6 @@ public class WorkflowMonitor implements WorkflowMonitorMBean {
public WorkflowMonitor(String clusterName, String workflowType) {
_clusterName = clusterName;
_workflowType = workflowType;
- _allWorkflowCount = 0L;
_successfulWorkflowCount = 0L;
_failedWorkflowCount = 0L;
_existingWorkflowGauge = 0L;
@@ -47,11 +45,6 @@ public class WorkflowMonitor implements WorkflowMonitorMBean {
}
@Override
- public long getAllWorkflowCount() {
- return _allWorkflowCount;
- }
-
- @Override
public long getSuccessfulWorkflowCount() {
return _successfulWorkflowCount;
}
@@ -83,34 +76,38 @@ public class WorkflowMonitor implements WorkflowMonitorMBean {
public String getWorkflowType() {
return _workflowType;
}
+
/**
* Update workflow with transition state
- * @param from The from state of a workflow, created when it is null
- * @param to The to state of a workflow, cleaned by ZK when it is null
+ * @param to The to state of a workflow
*/
- public void updateWorkflowStats(TaskState from, TaskState to) {
- if (from == null) {
- // From null means a new workflow has been created
- _allWorkflowCount++;
- _queuedWorkflowGauge++;
- _existingWorkflowGauge++;
- } else if (from.equals(TaskState.NOT_STARTED)) {
- // From NOT_STARTED means queued workflow number has been decreased one
- _queuedWorkflowGauge--;
- } else if (from.equals(TaskState.IN_PROGRESS)) {
- // From IN_PROGRESS means running workflow number has been decreased one
- _runningWorkflowGauge--;
- }
-
- if (to == null) {
- // To null means the job has been cleaned from ZK
- _existingWorkflowGauge--;
- } else if (to.equals(TaskState.IN_PROGRESS)) {
- _runningWorkflowGauge++;
- } else if (to.equals(TaskState.FAILED)) {
+ public void updateWorkflowCounters(TaskState to) {
+ if (to.equals(TaskState.FAILED)) {
_failedWorkflowCount++;
} else if (to.equals(TaskState.COMPLETED)) {
_successfulWorkflowCount++;
}
}
+
+ /**
+ * Reset gauges
+ */
+ public void resetGauges() {
+ _existingWorkflowGauge = 0L;
+ _runningWorkflowGauge = 0L;
+ _queuedWorkflowGauge = 0L;
+ }
+
+ /**
+ * Refresh gauges via transition state
+ * @param current current workflow state
+ */
+ public void updateWorkflowGauges(TaskState current) {
+ if (current == null || current.equals(TaskState.NOT_STARTED)) {
+ _queuedWorkflowGauge++;
+ } else if (current.equals(TaskState.IN_PROGRESS)) {
+ _runningWorkflowGauge++;
+ }
+ _existingWorkflowGauge++;
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4c88f815/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
index a8746ad..8d7076c 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
@@ -27,12 +27,6 @@ import org.apache.helix.monitoring.SensorNameProvider;
public interface WorkflowMonitorMBean extends SensorNameProvider {
/**
- * Get number of all workflows registered
- * @return
- */
- public long getAllWorkflowCount();
-
- /**
* Get number of succeeded workflows
* @return
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/4c88f815/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 7676dab..a36cb85 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -64,7 +64,6 @@ public class JobRebalancer extends TaskRebalancer {
LOG.error("Job configuration is NULL for " + jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
- _clusterStatusMonitor.updateJobStatus(jobCfg, null, TaskState.NOT_STARTED);
String workflowResource = jobCfg.getWorkflow();
// Fetch workflow configuration and context
@@ -98,7 +97,6 @@ public class JobRebalancer extends TaskRebalancer {
workflowResource, jobName, workflowState, jobState));
cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
_scheduledRebalancer.removeScheduledRebalance(jobName);
- _clusterStatusMonitor.updateJobStatus(jobCfg, jobState, null);
return buildEmptyAssignment(jobName, currStateOutput);
}
@@ -197,15 +195,11 @@ public class JobRebalancer extends TaskRebalancer {
// Workflow has been stopped if all in progress jobs are stopped
if (isWorkflowStopped(workflowCtx, workflowConfig)) {
workflowCtx.setWorkflowState(TaskState.STOPPED);
- _clusterStatusMonitor.updateJobStatus(jobCfg, TaskState.NOT_STARTED, TaskState.STOPPED);
-
}
} else {
workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
// Workflow is in progress if any task is in progress
workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
- _clusterStatusMonitor.updateJobStatus(jobCfg, TaskState.NOT_STARTED, TaskState.IN_PROGRESS);
-
}
// Used to keep track of tasks that have already been assigned to instances.
@@ -231,7 +225,7 @@ public class JobRebalancer extends TaskRebalancer {
jobCtx.setInfo(failureMsg);
markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx);
markAllPartitionsError(jobCtx, TaskPartitionState.ERROR, false);
- _clusterStatusMonitor.updateJobStatus(jobCfg, TaskState.IN_PROGRESS, TaskState.FAILED);
+ _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.FAILED);
return new ResourceAssignment(jobResource);
}
@@ -349,6 +343,7 @@ public class JobRebalancer extends TaskRebalancer {
// to fail the job immediately
if (skippedPartitions.size() >= jobCfg.getFailureThreshold()) {
markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx);
+ _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.FAILED);
markAllPartitionsError(jobCtx, currState, false);
addAllPartitions(allPartitions, partitionsToDropFromIs);
@@ -390,7 +385,7 @@ public class JobRebalancer extends TaskRebalancer {
if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) {
markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx);
- _clusterStatusMonitor.updateJobStatus(jobCfg, TaskState.IN_PROGRESS, TaskState.COMPLETED);
+ _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED);
// remove IdealState of this job
cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4c88f815/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 1517c5a..6abb3f4 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -83,6 +83,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
failedJobs ++;
if (failedJobs > cfg.getFailureThreshold()) {
ctx.setWorkflowState(TaskState.FAILED);
+ _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.FAILED);
for (String jobToFail : cfg.getJobDag().getAllNodes()) {
if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
ctx.setJobState(jobToFail, TaskState.ABORTED);
@@ -98,6 +99,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
if (!incomplete && cfg.isTerminable()) {
ctx.setWorkflowState(TaskState.COMPLETED);
+ _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.COMPLETED);
return true;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4c88f815/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 9a3f7d8..f72f1a8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -63,7 +63,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
workflowCtx.setStartTime(System.currentTimeMillis());
LOG.debug("Workflow context is created for " + workflow);
- _clusterStatusMonitor.updateWorkflowStatus(workflowCfg, null, TaskState.NOT_STARTED);
}
// Clean up if workflow marked for deletion
@@ -71,7 +70,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
if (targetState == TargetState.DELETE) {
LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context.");
cleanupWorkflow(workflow, workflowCfg);
- _clusterStatusMonitor.updateWorkflowStatus(workflowCfg, TaskState.COMPLETED, null);
return buildEmptyAssignment(workflow, currStateOutput);
}
@@ -85,8 +83,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED
&& isWorkflowFinished(workflowCtx, workflowCfg)) {
workflowCtx.setFinishTime(currentTime);
- _clusterStatusMonitor
- .updateWorkflowStatus(workflowCfg, TaskState.IN_PROGRESS, TaskState.COMPLETED);
TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
}
@@ -97,8 +93,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
LOG.info("Workflow " + workflow + " passed expiry time, cleaning up the workflow context.");
cleanupWorkflow(workflow, workflowCfg);
- _clusterStatusMonitor
- .updateWorkflowStatus(workflowCfg, TaskState.IN_PROGRESS, TaskState.FAILED);
} else {
// schedule future cleanup work
long cleanupTime = workflowCtx.getFinishTime() + expiryTime;
@@ -170,14 +164,14 @@ public class WorkflowRebalancer extends TaskRebalancer {
HelixAdmin admin = _manager.getClusterManagmentTool();
IdealState jobIS = admin.getResourceIdealState(_manager.getClusterName(), jobResource);
- TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobResource,
- new ZNRecord(TaskUtil.USER_CONTENT_NODE));
if (jobIS != null) {
LOG.info("Job " + jobResource + " idealstate already exists!");
return;
}
// Set up job resource based on partitions from target resource
+ TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobResource,
+ new ZNRecord(TaskUtil.USER_CONTENT_NODE));
int numIndependentTasks = jobConfig.getTaskConfigMap().size();
int numPartitions = numIndependentTasks;
@@ -293,12 +287,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
try {
// Start the cloned workflow
driver.start(clonedWf);
- _clusterStatusMonitor
- .updateWorkflowStatus(workflowCfg, TaskState.NOT_STARTED, TaskState.IN_PROGRESS);
} catch (Exception e) {
LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
_clusterStatusMonitor
- .updateWorkflowStatus(workflowCfg, TaskState.NOT_STARTED, TaskState.FAILED);
+ .updateWorkflowCounters(clonedWf.getWorkflowConfig(), TaskState.FAILED);
}
// Persist workflow start regardless of success to avoid retrying and failing
workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
@@ -314,8 +306,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
if (scheduledTime > 0 && currentTime > scheduledTime) {
_scheduledRebalancer.removeScheduledRebalance(workflow);
}
- _clusterStatusMonitor
- .updateWorkflowStatus(workflowCfg, TaskState.NOT_STARTED, TaskState.IN_PROGRESS);
return true;
}
} else {