You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/09/25 23:44:21 UTC

[2/6] helix git commit: Add JobTypes in WorkflowConfig and improve monitor performance

Add JobTypes in WorkflowConfig and improve monitor performance


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c8ec6246
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c8ec6246
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c8ec6246

Branch: refs/heads/master
Commit: c8ec6246ffdbd377e2ec1c58404ed1bbffb076d1
Parents: ad760ae
Author: Junkai Xue <jx...@linkedin.com>
Authored: Mon Sep 25 14:39:04 2017 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Mon Sep 25 14:39:04 2017 -0700

----------------------------------------------------------------------
 .../monitoring/mbeans/ClusterStatusMonitor.java | 15 ++++-----
 .../java/org/apache/helix/task/TaskDriver.java  | 35 ++++++++++++++++----
 2 files changed, 34 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c8ec6246/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 468a0ce..bae3a60 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -483,12 +483,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       if (workflow.isEmpty()) {
         continue;
       }
-      Set<String> allJobs = driver.getWorkflowConfig(workflow).getJobDag().getAllNodes();
+      WorkflowConfig workflowConfig = driver.getWorkflowConfig(workflow);
+      Set<String> allJobs = workflowConfig.getJobDag().getAllNodes();
       WorkflowContext workflowContext = driver.getWorkflowContext(workflow);
       for (String job : allJobs) {
-        TaskState currentState =
-            workflowContext == null ? TaskState.NOT_STARTED : workflowContext.getJobState(job);
-        updateJobGauges(driver.getJobConfig(job), currentState);
+        TaskState currentState = workflowContext == null ? TaskState.NOT_STARTED : workflowContext.getJobState(job);
+        updateJobGauges(workflowConfig.getJobTypes() == null ? null : workflowConfig.getJobTypes().get(job),
+            currentState);
       }
     }
   }
@@ -499,13 +500,9 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     _perTypeJobMonitorMap.get(jobType).updateJobCounters(to);
   }
 
-  private void updateJobGauges(JobConfig jobConfig, TaskState current) {
+  private void updateJobGauges(String jobType, TaskState current) {
     // When first time for WorkflowRebalancer call, jobconfig may not ready.
     // Thus only check it for gauge.
-    if (jobConfig == null) {
-      return;
-    }
-    String jobType = jobConfig.getJobType();
     jobType = preProcessJobMonitor(jobType);
     _perTypeJobMonitorMap.get(jobType).updateJobGauge(current);
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/c8ec6246/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index d3dba5b..a639cd0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -202,19 +202,28 @@ public class TaskDriver {
     LOG.info("Starting workflow " + flow.getName());
     flow.validate();
 
-    // first, add workflow config.
-    if (!TaskUtil.setResourceConfig(_accessor, flow.getName(),
-        new WorkflowConfig(flow.getWorkflowConfig(), flow.getName()))) {
-      LOG.error("Failed to add workflow configuration for workflow " + flow.getName());
-    }
+    WorkflowConfig newWorkflowConfig =
+        new WorkflowConfig.Builder().setConfigMap(flow.getResourceConfigMap())
+            .setWorkflowId(flow.getName()).build();
 
-    // then add all job configs.
+    Map<String, String> jobTypes = new HashMap<String, String>();
+    // add all job configs.
     for (String job : flow.getJobConfigs().keySet()) {
       JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(flow.getJobConfigs().get(job));
       if (flow.getTaskConfigs() != null && flow.getTaskConfigs().containsKey(job)) {
         jobCfgBuilder.addTaskConfigs(flow.getTaskConfigs().get(job));
       }
-      addJobConfig(job, jobCfgBuilder.build());
+      JobConfig jobCfg = jobCfgBuilder.build();
+      if (jobCfg.getJobType() != null) {
+        jobTypes.put(job, jobCfg.getJobType());
+      }
+      addJobConfig(job, jobCfg);
+    }
+    newWorkflowConfig.setJobTypes(jobTypes);
+
+    // add workflow config.
+    if (!TaskUtil.setResourceConfig(_accessor, flow.getName(), newWorkflowConfig)) {
+      LOG.error("Failed to add workflow configuration for workflow " + flow.getName());
     }
 
     // Finally add workflow resource.
@@ -549,6 +558,7 @@ public class TaskDriver {
 
     // add job config first.
     addJobConfig(namespacedJobName, jobConfig);
+    final String jobType = jobConfig.getJobType();
 
     // Add the job to the end of the queue in the DAG
     DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
@@ -580,6 +590,17 @@ public class TaskDriver {
           jobDag.addParentToChild(candidate, namespacedJobName);
         }
 
+        // Add job type if job type is not null
+        if (jobType != null) {
+          Map<String, String> jobTypes =
+              currentData.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name());
+          if (jobTypes == null) {
+            jobTypes = new HashMap<String, String>();
+          }
+          jobTypes.put(jobName, jobType);
+          currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes);
+        }
+
         // Save the updated DAG
         try {
           currentData