You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/09/29 01:38:11 UTC

[1/4] helix git commit: Add ABORT state in TaskState

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 9fc6c540b -> dca1ed05b


Add ABORT state in TaskState

1. Add ABORT state in TaskState
2. Set tasks IN_PROGRESS to ABORT when workflow fails


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/516ff0dc
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/516ff0dc
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/516ff0dc

Branch: refs/heads/helix-0.6.x
Commit: 516ff0dcad13cea578168b41752b62e45d43362d
Parents: 9fc6c54
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Sep 22 16:28:58 2016 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Sep 22 16:28:58 2016 -0700

----------------------------------------------------------------------
 .../org/apache/helix/task/TaskRebalancer.java   |  5 +++
 .../java/org/apache/helix/task/TaskState.java   |  6 +++-
 .../TestGenericTaskAssignmentCalculator.java    | 36 ++++++++++++++++++++
 3 files changed, 46 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/516ff0dc/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 22f91e7..1517c5a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -83,6 +83,11 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
         failedJobs ++;
         if (failedJobs > cfg.getFailureThreshold()) {
           ctx.setWorkflowState(TaskState.FAILED);
+          for (String jobToFail : cfg.getJobDag().getAllNodes()) {
+            if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
+              ctx.setJobState(jobToFail, TaskState.ABORTED);
+            }
+          }
           return true;
         }
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/516ff0dc/helix-core/src/main/java/org/apache/helix/task/TaskState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskState.java b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
index dac5df9..1000a9b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
@@ -43,5 +43,9 @@ public enum TaskState {
   /**
    * All the task partitions have completed normally.
    */
-  COMPLETED
+  COMPLETED,
+  /**
+   * The task are aborted due to workflow fail
+   */
+  ABORTED
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/516ff0dc/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
index 5645009..7cff051 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
@@ -39,6 +39,7 @@ import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskResult;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
 import org.apache.helix.tools.ClusterSetup;
 import org.testng.Assert;
@@ -54,6 +55,7 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
   private Map<String, Integer> _runCounts = Maps.newHashMap();
   private TaskConfig _taskConfig;
   private Map<String, String> _jobCommandMap;
+  private boolean failTask;
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -110,6 +112,7 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
 
   @Test
   public void testMultipleJobAssignment() throws InterruptedException {
+    failTask = false;
     String workflowName = TestHelper.getTestMethodName();
     Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
     List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
@@ -130,6 +133,7 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
 
   @Test
   public void testMultipleTaskAssignment() throws InterruptedException {
+    failTask = false;
     String workflowName = TestHelper.getTestMethodName();
     Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
 
@@ -148,6 +152,35 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
     Assert.assertEquals(_runCounts.size(), 5);
   }
 
+  @Test
+  public void testAbortTaskForWorkflowFail()
+      throws InterruptedException {
+    failTask = true;
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
+    List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
+    taskConfigs.add(_taskConfig);
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
+            .setJobCommandConfigMap(_jobCommandMap);
+
+    for (int i = 0; i < 5; i++) {
+      workflowBuilder.addJob("JOB" + i, jobBuilder);
+    }
+
+    _driver.start(workflowBuilder.build());
+    _driver.pollForWorkflowState(workflowName, TaskState.FAILED);
+
+    int abortedTask = 0;
+    for (TaskState jobState : _driver.getWorkflowContext(workflowName).getJobStates().values()) {
+      if (jobState == TaskState.ABORTED) {
+        abortedTask++;
+      }
+    }
+
+    Assert.assertEquals(abortedTask, 4);
+  }
+
   private class TaskOne extends MockTask {
     private final String _instanceName;
 
@@ -165,6 +198,9 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
     public TaskResult run() {
       _invokedClasses.add(getClass().getName());
       _runCounts.put(_instanceName, _runCounts.get(_instanceName) + 1);
+      if (failTask) {
+        return new TaskResult(TaskResult.Status.FAILED, "");
+      }
       return new TaskResult(TaskResult.Status.COMPLETED, "");
     }
   }


[4/4] helix git commit: Fix NPE when first time call WorkflowRebalancer

Posted by lx...@apache.org.
Fix NPE when first time call WorkflowRebalancer


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/dca1ed05
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/dca1ed05
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/dca1ed05

Branch: refs/heads/helix-0.6.x
Commit: dca1ed05bfccce69814fdb59b095d734d1c82de0
Parents: f5705dc
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Sep 22 16:37:12 2016 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Sep 22 16:37:12 2016 -0700

----------------------------------------------------------------------
 .../apache/helix/monitoring/mbeans/ClusterStatusMonitor.java    | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/dca1ed05/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 9b13e76..7f996c5 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -463,6 +463,11 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   }
 
   private void updateJobGauges(JobConfig jobConfig, TaskState current) {
+    // When first time for WorkflowRebalancer call, jobconfig may not ready.
+    // Thus only check it for gauge.
+    if (jobConfig == null) {
+      return;
+    }
     String jobType = jobConfig.getJobType();
     jobType = preProcessJobMonitor(jobType);
     _perTypeJobMonitorMap.get(jobType).updateJobGauge(current);


[3/4] helix git commit: Add AbortedJobCount in JobMonior

Posted by lx...@apache.org.
Add AbortedJobCount in JobMonior


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f5705dc9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f5705dc9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f5705dc9

Branch: refs/heads/helix-0.6.x
Commit: f5705dc9201716543c72427c004f1f64211e304e
Parents: 4c88f81
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Sep 22 16:36:34 2016 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Sep 22 16:36:34 2016 -0700

----------------------------------------------------------------------
 .../org/apache/helix/monitoring/mbeans/JobMonitor.java    |  9 +++++++++
 .../apache/helix/monitoring/mbeans/JobMonitorMBean.java   | 10 ++++++++--
 .../main/java/org/apache/helix/task/TaskRebalancer.java   |  2 ++
 3 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/f5705dc9/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
index 96e05ca..8fca450 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
@@ -32,6 +32,7 @@ public class JobMonitor implements JobMonitorMBean {
 
   private long _successfullJobCount;
   private long _failedJobCount;
+  private long _abortedJobCount;
   private long _existingJobGauge;
   private long _queuedJobGauge;
   private long _runningJobGauge;
@@ -41,6 +42,7 @@ public class JobMonitor implements JobMonitorMBean {
     _jobType = jobType;
     _successfullJobCount = 0L;
     _failedJobCount = 0L;
+    _abortedJobCount = 0L;
     _existingJobGauge = 0L;
     _queuedJobGauge = 0L;
     _runningJobGauge = 0L;
@@ -57,6 +59,11 @@ public class JobMonitor implements JobMonitorMBean {
   }
 
   @Override
+  public long getAbortedJobCount() {
+    return _abortedJobCount;
+  }
+
+  @Override
   public long getExistingJobGauge() {
     return _existingJobGauge;
   }
@@ -89,6 +96,8 @@ public class JobMonitor implements JobMonitorMBean {
       _failedJobCount++;
     } else if (to.equals(TaskState.COMPLETED)) {
       _successfullJobCount++;
+    } else if (to.equals(TaskState.ABORTED)) {
+      _abortedJobCount++;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/f5705dc9/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
index 31a2643..5d30ec9 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
@@ -27,18 +27,24 @@ import org.apache.helix.monitoring.SensorNameProvider;
 public interface JobMonitorMBean extends SensorNameProvider {
 
   /**
-   * Get numbers of the succeeded jobs
+   * Get number of the succeeded jobs
    * @return
    */
   public long getSuccessfulJobCount();
 
   /**
-   * Get numbers of failed jobs
+   * Get number of failed jobs
    * @return
    */
   public long getFailedJobCount();
 
   /**
+   * Get number of the aborted jobs
+   * @return
+   */
+  public long getAbortedJobCount();
+
+  /**
    * Get number of existing jobs registered
    * @return
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/f5705dc9/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 6abb3f4..54f7bd5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -87,6 +87,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
           for (String jobToFail : cfg.getJobDag().getAllNodes()) {
             if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
               ctx.setJobState(jobToFail, TaskState.ABORTED);
+              _clusterStatusMonitor
+                  .updateJobCounters(TaskUtil.getJobCfg(_manager, jobToFail), TaskState.ABORTED);
             }
           }
           return true;


[2/4] helix git commit: Fix Workflow and Job metrics Counters

Posted by lx...@apache.org.
Fix Workflow and Job metrics Counters


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4c88f815
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4c88f815
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4c88f815

Branch: refs/heads/helix-0.6.x
Commit: 4c88f815e996d896a1582a59c7df83e0f3d20285
Parents: 516ff0d
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Sep 22 16:35:40 2016 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Sep 22 16:35:40 2016 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |  9 +--
 .../monitoring/mbeans/ClusterStatusMonitor.java | 58 ++++++++++++++------
 .../helix/monitoring/mbeans/JobMonitor.java     | 54 +++++++++---------
 .../monitoring/mbeans/JobMonitorMBean.java      |  6 --
 .../monitoring/mbeans/WorkflowMonitor.java      | 55 +++++++++----------
 .../monitoring/mbeans/WorkflowMonitorMBean.java |  6 --
 .../org/apache/helix/task/JobRebalancer.java    | 11 +---
 .../org/apache/helix/task/TaskRebalancer.java   |  2 +
 .../apache/helix/task/WorkflowRebalancer.java   | 16 +-----
 9 files changed, 104 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/4c88f815/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index fb30f0d..8016e44 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -28,8 +28,6 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicReference;
 
-import javax.management.MalformedObjectNameException;
-
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.helix.ConfigChangeListener;
 import org.apache.helix.ControllerChangeListener;
@@ -273,11 +271,10 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       } else {
         if (_clusterStatusMonitor == null) {
           _clusterStatusMonitor = new ClusterStatusMonitor(manager.getClusterName());
-          TaskDriver driver = new TaskDriver(manager);
-          _clusterStatusMonitor.setWorkflowsStatus(driver);
-          _clusterStatusMonitor.setJobsStatus(driver);
         }
-
+        TaskDriver driver = new TaskDriver(manager);
+        _clusterStatusMonitor.refreshWorkflowsStatus(driver);
+        _clusterStatusMonitor.refreshJobsStatus(driver);
         event.addAttribute("clusterStatusMonitor", _clusterStatusMonitor);
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/4c88f815/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 55b774d..9b13e76 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -42,6 +42,7 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskState;
+import org.apache.helix.task.Workflow;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.log4j.Logger;
@@ -394,21 +395,36 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
-  public void setWorkflowsStatus(TaskDriver driver) {
+  public void refreshWorkflowsStatus(TaskDriver driver) {
+    for (WorkflowMonitor workflowMonitor : _perTypeWorkflowMonitorMap.values()) {
+      workflowMonitor.resetGauges();
+    }
+
     Map<String, WorkflowConfig> workflowConfigMap = driver.getWorkflows();
     for (String workflow : workflowConfigMap.keySet()) {
       if (workflowConfigMap.get(workflow).isRecurring()) {
         continue;
       }
       WorkflowContext workflowContext = driver.getWorkflowContext(workflow);
-      TaskState toState =
+      TaskState currentState =
           workflowContext == null ? TaskState.NOT_STARTED : workflowContext.getWorkflowState();
-      updateWorkflowStatus(workflowConfigMap.get(workflow), null, toState);
+      updateWorkflowGauges(workflowConfigMap.get(workflow), currentState);
     }
   }
 
-  public void updateWorkflowStatus(WorkflowConfig workflowConfig, TaskState from, TaskState to) {
+  public void updateWorkflowCounters(WorkflowConfig workflowConfig, TaskState to) {
+    String workflowType = workflowConfig.getWorkflowType();
+    workflowType = preProcessWorkflow(workflowType);
+    _perTypeWorkflowMonitorMap.get(workflowType).updateWorkflowCounters(to);
+  }
+
+  private void updateWorkflowGauges(WorkflowConfig workflowConfig, TaskState current) {
     String workflowType = workflowConfig.getWorkflowType();
+    workflowType = preProcessWorkflow(workflowType);
+    _perTypeWorkflowMonitorMap.get(workflowType).updateWorkflowGauges(current);
+  }
+
+  private String preProcessWorkflow(String workflowType) {
     if (workflowType == null || workflowType.length() == 0) {
       workflowType = DEFAULT_WORKFLOW_JOB_TYPE;
     }
@@ -422,28 +438,37 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       }
       _perTypeWorkflowMonitorMap.put(workflowType, monitor);
     }
-
-    _perTypeWorkflowMonitorMap.get(workflowType).updateWorkflowStats(from, to);
+    return workflowType;
   }
 
-  public void setJobsStatus(TaskDriver driver) {
+  public void refreshJobsStatus(TaskDriver driver) {
+    for (JobMonitor jobMonitor : _perTypeJobMonitorMap.values()) {
+      jobMonitor.resetJobGauge();
+    }
     for (String workflow : driver.getWorkflows().keySet()) {
       Set<String> allJobs = driver.getWorkflowConfig(workflow).getJobDag().getAllNodes();
       WorkflowContext workflowContext = driver.getWorkflowContext(workflow);
-
       for (String job : allJobs) {
-        TaskState toState = null;
-        if (workflowContext != null) {
-          toState = workflowContext.getJobState(job);
-        }
-        toState = toState == null ? TaskState.NOT_STARTED : toState;
-        updateJobStatus(driver.getJobConfig(job), null, toState);
+        TaskState currentState =
+            workflowContext == null ? TaskState.NOT_STARTED : workflowContext.getJobState(job);
+        updateJobGauges(driver.getJobConfig(job), currentState);
       }
     }
   }
 
-  public void updateJobStatus(JobConfig jobConfig, TaskState from, TaskState to) {
+  public void updateJobCounters(JobConfig jobConfig, TaskState to) {
+    String jobType = jobConfig.getJobType();
+    jobType = preProcessJobMonitor(jobType);
+    _perTypeJobMonitorMap.get(jobType).updateJobCounters(to);
+  }
+
+  private void updateJobGauges(JobConfig jobConfig, TaskState current) {
     String jobType = jobConfig.getJobType();
+    jobType = preProcessJobMonitor(jobType);
+    _perTypeJobMonitorMap.get(jobType).updateJobGauge(current);
+  }
+
+  private String preProcessJobMonitor(String jobType) {
     if (jobType == null || jobType.length() == 0) {
       jobType = DEFAULT_WORKFLOW_JOB_TYPE;
     }
@@ -457,8 +482,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       }
       _perTypeJobMonitorMap.put(jobType, monitor);
     }
-
-    _perTypeJobMonitorMap.get(jobType).updateJobStats(from, to);
+    return jobType;
   }
 
   private synchronized void registerInstances(Collection<InstanceMonitor> instances)

http://git-wip-us.apache.org/repos/asf/helix/blob/4c88f815/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
index 5754b8d..96e05ca 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
@@ -20,6 +20,8 @@ package org.apache.helix.monitoring.mbeans;
  */
 
 import org.apache.helix.task.TaskState;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
 
 public class JobMonitor implements JobMonitorMBean {
 
@@ -28,7 +30,6 @@ public class JobMonitor implements JobMonitorMBean {
   private String _clusterName;
   private String _jobType;
 
-  private long _allJobCount;
   private long _successfullJobCount;
   private long _failedJobCount;
   private long _existingJobGauge;
@@ -38,7 +39,6 @@ public class JobMonitor implements JobMonitorMBean {
   public JobMonitor(String clusterName, String jobType) {
     _clusterName = clusterName;
     _jobType = jobType;
-    _allJobCount = 0L;
     _successfullJobCount = 0L;
     _failedJobCount = 0L;
     _existingJobGauge = 0L;
@@ -47,11 +47,6 @@ public class JobMonitor implements JobMonitorMBean {
   }
 
   @Override
-  public long getAllJobCount() {
-    return _allJobCount;
-  }
-
-  @Override
   public long getSuccessfulJobCount() {
     return _successfullJobCount;
   }
@@ -86,33 +81,36 @@ public class JobMonitor implements JobMonitorMBean {
   }
 
   /**
-   * Update job metrics with transition state
-   * @param from The from state of job, just created when it is null
+   * Update job counters with transition state
    * @param to The to state of job, cleaned by ZK when it is null
    */
-  public void updateJobStats(TaskState from, TaskState to) {
-    if (from == null) {
-      // From null means a new job has been created
-      _existingJobGauge++;
-      _queuedJobGauge++;
-      _allJobCount++;
-    } else if (from.equals(TaskState.NOT_STARTED)) {
-      // From NOT_STARTED means queued job number has been decreased one
-      _queuedJobGauge--;
-    } else if (from.equals(TaskState.IN_PROGRESS)) {
-      // From IN_PROGRESS means running job number has been decreased one
-      _runningJobGauge--;
+  public void updateJobCounters(TaskState to) {
+    if (to.equals(TaskState.FAILED)) {
+      _failedJobCount++;
+    } else if (to.equals(TaskState.COMPLETED)) {
+      _successfullJobCount++;
     }
+  }
 
-    if (to == null) {
-      // To null means the job has been cleaned from ZK
-      _existingJobGauge--;
+  /**
+   * Reset job gauges
+   */
+  public void resetJobGauge() {
+    _queuedJobGauge = 0L;
+    _existingJobGauge = 0L;
+    _runningJobGauge = 0L;
+  }
+
+  /**
+   * Refresh job gauges
+   * @param to The current state of job
+   */
+  public void updateJobGauge(TaskState to) {
+    _existingJobGauge++;
+    if (to == null || to.equals(TaskState.NOT_STARTED)) {
+      _queuedJobGauge++;
     } else if (to.equals(TaskState.IN_PROGRESS)) {
       _runningJobGauge++;
-    } else if (to.equals(TaskState.FAILED)) {
-      _failedJobCount++;
-    } else if (to.equals(TaskState.COMPLETED)) {
-      _successfullJobCount++;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/4c88f815/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
index 2685096..31a2643 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
@@ -27,12 +27,6 @@ import org.apache.helix.monitoring.SensorNameProvider;
 public interface JobMonitorMBean extends SensorNameProvider {
 
   /**
-   * Get numbers of all job count
-   * @return
-   */
-  public long getAllJobCount();
-
-  /**
    * Get numbers of the succeeded jobs
    * @return
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/4c88f815/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
index 631c650..5789677 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
@@ -27,7 +27,6 @@ public class WorkflowMonitor implements WorkflowMonitorMBean {
   private String _clusterName;
   private String _workflowType;
 
-  private long _allWorkflowCount;
   private long _successfulWorkflowCount;
   private long _failedWorkflowCount;
   private long _existingWorkflowGauge;
@@ -38,7 +37,6 @@ public class WorkflowMonitor implements WorkflowMonitorMBean {
   public WorkflowMonitor(String clusterName, String workflowType) {
     _clusterName = clusterName;
     _workflowType = workflowType;
-    _allWorkflowCount = 0L;
     _successfulWorkflowCount = 0L;
     _failedWorkflowCount = 0L;
     _existingWorkflowGauge = 0L;
@@ -47,11 +45,6 @@ public class WorkflowMonitor implements WorkflowMonitorMBean {
   }
 
   @Override
-  public long getAllWorkflowCount() {
-    return _allWorkflowCount;
-  }
-
-  @Override
   public long getSuccessfulWorkflowCount() {
     return _successfulWorkflowCount;
   }
@@ -83,34 +76,38 @@ public class WorkflowMonitor implements WorkflowMonitorMBean {
   public String getWorkflowType() {
     return _workflowType;
   }
+
   /**
    * Update workflow with transition state
-   * @param from The from state of a workflow, created when it is null
-   * @param to The to state of a workflow, cleaned by ZK when it is null
+   * @param to The to state of a workflow
    */
-  public void updateWorkflowStats(TaskState from, TaskState to) {
-    if (from == null) {
-      // From null means a new workflow has been created
-      _allWorkflowCount++;
-      _queuedWorkflowGauge++;
-      _existingWorkflowGauge++;
-    } else if (from.equals(TaskState.NOT_STARTED)) {
-      // From NOT_STARTED means queued workflow number has been decreased one
-      _queuedWorkflowGauge--;
-    } else if (from.equals(TaskState.IN_PROGRESS)) {
-      // From IN_PROGRESS means running workflow number has been decreased one
-      _runningWorkflowGauge--;
-    }
-
-    if (to == null) {
-      // To null means the job has been cleaned from ZK
-      _existingWorkflowGauge--;
-    } else if (to.equals(TaskState.IN_PROGRESS)) {
-      _runningWorkflowGauge++;
-    } else if (to.equals(TaskState.FAILED)) {
+  public void updateWorkflowCounters(TaskState to) {
+   if (to.equals(TaskState.FAILED)) {
       _failedWorkflowCount++;
     } else if (to.equals(TaskState.COMPLETED)) {
       _successfulWorkflowCount++;
     }
   }
+
+  /**
+   * Reset gauges
+   */
+  public void resetGauges() {
+    _existingWorkflowGauge = 0L;
+    _runningWorkflowGauge = 0L;
+    _queuedWorkflowGauge = 0L;
+  }
+
+  /**
+   * Refresh gauges via transition state
+   * @param current current workflow state
+   */
+  public void updateWorkflowGauges(TaskState current) {
+    if (current == null || current.equals(TaskState.NOT_STARTED)) {
+      _queuedWorkflowGauge++;
+    } else if (current.equals(TaskState.IN_PROGRESS)) {
+      _runningWorkflowGauge++;
+    }
+    _existingWorkflowGauge++;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/4c88f815/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
index a8746ad..8d7076c 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
@@ -27,12 +27,6 @@ import org.apache.helix.monitoring.SensorNameProvider;
 public interface WorkflowMonitorMBean extends SensorNameProvider {
 
   /**
-   * Get number of all workflows registered
-   * @return
-   */
-  public long getAllWorkflowCount();
-
-  /**
    * Get number of succeeded workflows
    * @return
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/4c88f815/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 7676dab..a36cb85 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -64,7 +64,6 @@ public class JobRebalancer extends TaskRebalancer {
       LOG.error("Job configuration is NULL for " + jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
-    _clusterStatusMonitor.updateJobStatus(jobCfg, null, TaskState.NOT_STARTED);
     String workflowResource = jobCfg.getWorkflow();
 
     // Fetch workflow configuration and context
@@ -98,7 +97,6 @@ public class JobRebalancer extends TaskRebalancer {
           workflowResource, jobName, workflowState, jobState));
       cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
       _scheduledRebalancer.removeScheduledRebalance(jobName);
-      _clusterStatusMonitor.updateJobStatus(jobCfg, jobState, null);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
 
@@ -197,15 +195,11 @@ public class JobRebalancer extends TaskRebalancer {
       // Workflow has been stopped if all in progress jobs are stopped
       if (isWorkflowStopped(workflowCtx, workflowConfig)) {
         workflowCtx.setWorkflowState(TaskState.STOPPED);
-        _clusterStatusMonitor.updateJobStatus(jobCfg, TaskState.NOT_STARTED, TaskState.STOPPED);
-
       }
     } else {
       workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
       // Workflow is in progress if any task is in progress
       workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
-      _clusterStatusMonitor.updateJobStatus(jobCfg, TaskState.NOT_STARTED, TaskState.IN_PROGRESS);
-
     }
 
     // Used to keep track of tasks that have already been assigned to instances.
@@ -231,7 +225,7 @@ public class JobRebalancer extends TaskRebalancer {
       jobCtx.setInfo(failureMsg);
       markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx);
       markAllPartitionsError(jobCtx, TaskPartitionState.ERROR, false);
-      _clusterStatusMonitor.updateJobStatus(jobCfg, TaskState.IN_PROGRESS, TaskState.FAILED);
+      _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.FAILED);
       return new ResourceAssignment(jobResource);
     }
 
@@ -349,6 +343,7 @@ public class JobRebalancer extends TaskRebalancer {
             // to fail the job immediately
             if (skippedPartitions.size() >= jobCfg.getFailureThreshold()) {
               markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx);
+              _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.FAILED);
               markAllPartitionsError(jobCtx, currState, false);
               addAllPartitions(allPartitions, partitionsToDropFromIs);
 
@@ -390,7 +385,7 @@ public class JobRebalancer extends TaskRebalancer {
 
     if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) {
       markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx);
-      _clusterStatusMonitor.updateJobStatus(jobCfg, TaskState.IN_PROGRESS, TaskState.COMPLETED);
+      _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED);
       // remove IdealState of this job
       cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/4c88f815/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 1517c5a..6abb3f4 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -83,6 +83,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
         failedJobs ++;
         if (failedJobs > cfg.getFailureThreshold()) {
           ctx.setWorkflowState(TaskState.FAILED);
+          _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.FAILED);
           for (String jobToFail : cfg.getJobDag().getAllNodes()) {
             if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
               ctx.setJobState(jobToFail, TaskState.ABORTED);
@@ -98,6 +99,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
 
     if (!incomplete && cfg.isTerminable()) {
       ctx.setWorkflowState(TaskState.COMPLETED);
+      _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.COMPLETED);
       return true;
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/4c88f815/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 9a3f7d8..f72f1a8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -63,7 +63,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
       workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
       workflowCtx.setStartTime(System.currentTimeMillis());
       LOG.debug("Workflow context is created for " + workflow);
-      _clusterStatusMonitor.updateWorkflowStatus(workflowCfg, null, TaskState.NOT_STARTED);
     }
 
     // Clean up if workflow marked for deletion
@@ -71,7 +70,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
     if (targetState == TargetState.DELETE) {
       LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context.");
       cleanupWorkflow(workflow, workflowCfg);
-      _clusterStatusMonitor.updateWorkflowStatus(workflowCfg, TaskState.COMPLETED, null);
       return buildEmptyAssignment(workflow, currStateOutput);
     }
 
@@ -85,8 +83,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
     if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED
         && isWorkflowFinished(workflowCtx, workflowCfg)) {
       workflowCtx.setFinishTime(currentTime);
-      _clusterStatusMonitor
-          .updateWorkflowStatus(workflowCfg, TaskState.IN_PROGRESS, TaskState.COMPLETED);
       TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
     }
 
@@ -97,8 +93,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
       if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
         LOG.info("Workflow " + workflow + " passed expiry time, cleaning up the workflow context.");
         cleanupWorkflow(workflow, workflowCfg);
-        _clusterStatusMonitor
-            .updateWorkflowStatus(workflowCfg, TaskState.IN_PROGRESS, TaskState.FAILED);
       } else {
         // schedule future cleanup work
         long cleanupTime = workflowCtx.getFinishTime() + expiryTime;
@@ -170,14 +164,14 @@ public class WorkflowRebalancer extends TaskRebalancer {
     HelixAdmin admin = _manager.getClusterManagmentTool();
 
     IdealState jobIS = admin.getResourceIdealState(_manager.getClusterName(), jobResource);
-    TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobResource,
-        new ZNRecord(TaskUtil.USER_CONTENT_NODE));
     if (jobIS != null) {
       LOG.info("Job " + jobResource + " idealstate already exists!");
       return;
     }
 
     // Set up job resource based on partitions from target resource
+    TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobResource,
+        new ZNRecord(TaskUtil.USER_CONTENT_NODE));
     int numIndependentTasks = jobConfig.getTaskConfigMap().size();
 
     int numPartitions = numIndependentTasks;
@@ -293,12 +287,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
           try {
             // Start the cloned workflow
             driver.start(clonedWf);
-            _clusterStatusMonitor
-                .updateWorkflowStatus(workflowCfg, TaskState.NOT_STARTED, TaskState.IN_PROGRESS);
           } catch (Exception e) {
             LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
             _clusterStatusMonitor
-                .updateWorkflowStatus(workflowCfg, TaskState.NOT_STARTED, TaskState.FAILED);
+                .updateWorkflowCounters(clonedWf.getWorkflowConfig(), TaskState.FAILED);
           }
           // Persist workflow start regardless of success to avoid retrying and failing
           workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
@@ -314,8 +306,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
         if (scheduledTime > 0 && currentTime > scheduledTime) {
           _scheduledRebalancer.removeScheduledRebalance(workflow);
         }
-        _clusterStatusMonitor
-            .updateWorkflowStatus(workflowCfg, TaskState.NOT_STARTED, TaskState.IN_PROGRESS);
         return true;
       }
     } else {