You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/09/25 23:44:21 UTC
[2/6] helix git commit: Add JobTypes in WorkflowConfig and improve
monitor performance
Add JobTypes in WorkflowConfig and improve monitor performance
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c8ec6246
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c8ec6246
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c8ec6246
Branch: refs/heads/master
Commit: c8ec6246ffdbd377e2ec1c58404ed1bbffb076d1
Parents: ad760ae
Author: Junkai Xue <jx...@linkedin.com>
Authored: Mon Sep 25 14:39:04 2017 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Mon Sep 25 14:39:04 2017 -0700
----------------------------------------------------------------------
.../monitoring/mbeans/ClusterStatusMonitor.java | 15 ++++-----
.../java/org/apache/helix/task/TaskDriver.java | 35 ++++++++++++++++----
2 files changed, 34 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/c8ec6246/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 468a0ce..bae3a60 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -483,12 +483,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
if (workflow.isEmpty()) {
continue;
}
- Set<String> allJobs = driver.getWorkflowConfig(workflow).getJobDag().getAllNodes();
+ WorkflowConfig workflowConfig = driver.getWorkflowConfig(workflow);
+ Set<String> allJobs = workflowConfig.getJobDag().getAllNodes();
WorkflowContext workflowContext = driver.getWorkflowContext(workflow);
for (String job : allJobs) {
- TaskState currentState =
- workflowContext == null ? TaskState.NOT_STARTED : workflowContext.getJobState(job);
- updateJobGauges(driver.getJobConfig(job), currentState);
+ TaskState currentState = workflowContext == null ? TaskState.NOT_STARTED : workflowContext.getJobState(job);
+ updateJobGauges(workflowConfig.getJobTypes() == null ? null : workflowConfig.getJobTypes().get(job),
+ currentState);
}
}
}
@@ -499,13 +500,9 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
_perTypeJobMonitorMap.get(jobType).updateJobCounters(to);
}
- private void updateJobGauges(JobConfig jobConfig, TaskState current) {
+ private void updateJobGauges(String jobType, TaskState current) {
// When first time for WorkflowRebalancer call, jobconfig may not ready.
// Thus only check it for gauge.
- if (jobConfig == null) {
- return;
- }
- String jobType = jobConfig.getJobType();
jobType = preProcessJobMonitor(jobType);
_perTypeJobMonitorMap.get(jobType).updateJobGauge(current);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c8ec6246/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index d3dba5b..a639cd0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -202,19 +202,28 @@ public class TaskDriver {
LOG.info("Starting workflow " + flow.getName());
flow.validate();
- // first, add workflow config.
- if (!TaskUtil.setResourceConfig(_accessor, flow.getName(),
- new WorkflowConfig(flow.getWorkflowConfig(), flow.getName()))) {
- LOG.error("Failed to add workflow configuration for workflow " + flow.getName());
- }
+ WorkflowConfig newWorkflowConfig =
+ new WorkflowConfig.Builder().setConfigMap(flow.getResourceConfigMap())
+ .setWorkflowId(flow.getName()).build();
- // then add all job configs.
+ Map<String, String> jobTypes = new HashMap<String, String>();
+ // add all job configs.
for (String job : flow.getJobConfigs().keySet()) {
JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(flow.getJobConfigs().get(job));
if (flow.getTaskConfigs() != null && flow.getTaskConfigs().containsKey(job)) {
jobCfgBuilder.addTaskConfigs(flow.getTaskConfigs().get(job));
}
- addJobConfig(job, jobCfgBuilder.build());
+ JobConfig jobCfg = jobCfgBuilder.build();
+ if (jobCfg.getJobType() != null) {
+ jobTypes.put(job, jobCfg.getJobType());
+ }
+ addJobConfig(job, jobCfg);
+ }
+ newWorkflowConfig.setJobTypes(jobTypes);
+
+ // add workflow config.
+ if (!TaskUtil.setResourceConfig(_accessor, flow.getName(), newWorkflowConfig)) {
+ LOG.error("Failed to add workflow configuration for workflow " + flow.getName());
}
// Finally add workflow resource.
@@ -549,6 +558,7 @@ public class TaskDriver {
// add job config first.
addJobConfig(namespacedJobName, jobConfig);
+ final String jobType = jobConfig.getJobType();
// Add the job to the end of the queue in the DAG
DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
@@ -580,6 +590,17 @@ public class TaskDriver {
jobDag.addParentToChild(candidate, namespacedJobName);
}
+ // Add job type if job type is not null
+ if (jobType != null) {
+ Map<String, String> jobTypes =
+ currentData.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name());
+ if (jobTypes == null) {
+ jobTypes = new HashMap<String, String>();
+ }
+ jobTypes.put(jobName, jobType);
+ currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes);
+ }
+
// Save the updated DAG
try {
currentData