You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/09/27 20:26:11 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-881] Add job
tag field that can be used to filter job statuses
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5d4795e [GOBBLIN-881] Add job tag field that can be used to filter job statuses
5d4795e is described below
commit 5d4795e28fe7502b0e2233fc830c2b3f7adeb637
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Fri Sep 27 13:26:03 2019 -0700
[GOBBLIN-881] Add job tag field that can be used to filter job statuses
Closes #2735 from jack-moseley/job-tags
---
.../gobblin/configuration/ConfigurationKeys.java | 1 +
.../apache/gobblin/metrics/event/TimingEvent.java | 1 +
...apache.gobblin.service.flowstatuses.restspec.json | 4 ++++
.../org/apache/gobblin/service/JobStatus.pdsc | 6 ++++++
...apache.gobblin.service.flowstatuses.snapshot.json | 9 +++++++++
.../org/apache/gobblin/service/FlowStatusClient.java | 7 ++++---
.../org/apache/gobblin/service/FlowStatusTest.java | 17 +++++++++++++----
.../apache/gobblin/service/FlowStatusResource.java | 8 +++++---
.../service/monitoring/FlowStatusGenerator.java | 20 +++++++++++++++-----
.../apache/gobblin/service/monitoring/JobStatus.java | 1 +
.../service/monitoring/JobStatusRetriever.java | 3 ++-
.../modules/orchestration/TimingEventUtils.java | 2 ++
12 files changed, 63 insertions(+), 16 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 3538c89..a485427 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -152,6 +152,7 @@ public class ConfigurationKeys {
*/
public static final String JOB_NAME_KEY = "job.name";
public static final String JOB_GROUP_KEY = "job.group";
+ public static final String JOB_TAG_KEY = "job.tag";
public static final String JOB_DESCRIPTION_KEY = "job.description";
// Job launcher type
public static final String JOB_LAUNCHER_TYPE_KEY = "launcher.type";
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index d09629a..a16da82 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -78,6 +78,7 @@ public class TimingEvent extends GobblinEventBuilder implements Closeable {
public static final String FLOW_EXECUTION_ID_FIELD = "flowExecutionId";
public static final String JOB_NAME_FIELD = "jobName";
public static final String JOB_GROUP_FIELD = "jobGroup";
+ public static final String JOB_TAG_FIELD = "jobTag";
public static final String JOB_EXECUTION_ID_FIELD = "jobExecutionId";
public static final String SPEC_EXECUTOR_FIELD = "specExecutor";
public static final String LOW_WATERMARK_FIELD = "lowWatermark";
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json
index 9912e43..6e6b67e 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json
@@ -24,6 +24,10 @@
"name" : "count",
"type" : "int",
"optional" : true
+ }, {
+ "name" : "tag",
+ "type" : "string",
+ "optional" : true
} ]
} ],
"entity" : {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatus.pdsc b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatus.pdsc
index 7e8e48a..f13d366 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatus.pdsc
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatus.pdsc
@@ -15,6 +15,12 @@
"doc" : "Identifier of the job"
},
{
+ "name" : "jobTag",
+ "type" : "string",
+ "optional" : true,
+ "doc" : "Tag of the job"
+ },
+ {
"name" : "executionStatus",
"type" : "ExecutionStatus",
"doc" : "Job execution status"
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
index d84ea73..a161726 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
@@ -144,6 +144,11 @@
"type" : "JobId",
"doc" : "Identifier of the job"
}, {
+ "name" : "jobTag",
+ "type" : "string",
+ "doc" : "Tag of the job",
+ "optional" : true
+ }, {
"name" : "executionStatus",
"type" : "ExecutionStatus",
"doc" : "Job execution status"
@@ -225,6 +230,10 @@
"name" : "count",
"type" : "int",
"optional" : true
+ }, {
+ "name" : "tag",
+ "type" : "string",
+ "optional" : true
} ]
} ],
"entity" : {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
index 08a9fbe..3da1547 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
@@ -131,16 +131,17 @@ public class FlowStatusClient implements Closeable {
/**
* Get the latest flow status
* @param flowId identifier of flow status to get
- * @return a list of {@link FlowStatus}es corresponding to the latest <code>count</code> executions.
+ * @return a list of {@link FlowStatus}es corresponding to the latest <code>count</code> executions, containing only
+ * jobStatuses that match the given tag.
* @throws RemoteInvocationException
*/
- public List<FlowStatus> getLatestFlowStatus(FlowId flowId, Integer count)
+ public List<FlowStatus> getLatestFlowStatus(FlowId flowId, Integer count, String tag)
throws RemoteInvocationException {
LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " +
flowId.getFlowName() + " count " + Integer.toString(count));
FindRequest<FlowStatus> findRequest = _flowstatusesRequestBuilders.findByLatestFlowStatus().flowIdParam(flowId).
- addReqParam("count", count, Integer.class).build();
+ addReqParam("count", count, Integer.class).addParam("tag", tag, String.class).build();
Response<CollectionResponse<FlowStatus>> response =
_restClient.get().sendRequest(findRequest).getResponse();
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index 6a86607..b008f83 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -125,14 +125,18 @@ public class FlowStatusTest {
.flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(5000L)
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).build();
org.apache.gobblin.service.monitoring.JobStatus js2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
- .flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(2000L).endTime(6000L)
+ .flowName("flow1").jobGroup("jgroup1").jobName("job1").jobTag("dataset1").startTime(2000L).endTime(6000L)
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Test message 2")
.processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
+ org.apache.gobblin.service.monitoring.JobStatus js3 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
+ .flowName("flow1").jobGroup("jgroup1").jobName("job2").jobTag("dataset2").startTime(2000L).endTime(6000L)
+ .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Test message 3")
+ .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
org.apache.gobblin.service.monitoring.JobStatus fs2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
.flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).build();
List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList1 = Lists.newArrayList(js1, fs1);
- List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList2 = Lists.newArrayList(js2, fs2);
+ List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList2 = Lists.newArrayList(js2, js3, fs2);
_listOfJobStatusLists = Lists.newArrayList();
_listOfJobStatusLists.add(jobStatusList1);
_listOfJobStatusLists.add(jobStatusList2);
@@ -144,7 +148,7 @@ public class FlowStatusTest {
Assert.assertEquals(flowStatus.getId().getFlowName(), "flow1");
Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(), 1L);
Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 7000L);
- Assert.assertEquals(flowStatus.getMessage(), js2.getMessage());
+ Assert.assertEquals(flowStatus.getMessage(), messageJoiner.join(js2.getMessage(), js3.getMessage()));
Assert.assertEquals(flowStatus.getExecutionStatus(), ExecutionStatus.COMPLETE);
JobStatusArray jobStatuses = flowStatus.getJobStatuses();
@@ -158,10 +162,15 @@ public class FlowStatusTest {
compareJobStatus(js, mjs);
}
- List<FlowStatus> flowStatusList = _client.getLatestFlowStatus(flowId, 2);
+ List<FlowStatus> flowStatusList = _client.getLatestFlowStatus(flowId, 2, null);
Assert.assertEquals(flowStatusList.size(), 2);
Assert.assertEquals(flowStatusList.get(0).getId().getFlowExecutionId(), (Long) 1L);
Assert.assertEquals(flowStatusList.get(1).getId().getFlowExecutionId(), (Long) 0L);
+ Assert.assertEquals(flowStatusList.get(0).getJobStatuses().size(), 2);
+
+ List<FlowStatus> flowStatusList2 = _client.getLatestFlowStatus(flowId, 1, "dataset1");
+ Assert.assertEquals(flowStatusList2.get(0).getJobStatuses().size(), 1);
+ Assert.assertEquals(flowStatusList2.get(0).getJobStatuses().get(0).getJobTag(), "dataset1");
}
/**
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index 9cd9508..767efa4 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
+import com.linkedin.data.template.SetMode;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.server.PagingContext;
@@ -68,7 +69,7 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
LOG.info("Get called with flowGroup " + flowGroup + " flowName " + flowName + " flowExecutionId " + flowExecutionId);
org.apache.gobblin.service.monitoring.FlowStatus flowStatus =
- _flowStatusGenerator.getFlowStatus(flowName, flowGroup, flowExecutionId);
+ _flowStatusGenerator.getFlowStatus(flowName, flowGroup, flowExecutionId, null);
// this returns null to raise a 404 error if flowStatus is null
return convertFlowStatus(flowStatus);
@@ -76,14 +77,14 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
@Finder("latestFlowStatus")
public List<FlowStatus> getLatestFlowStatus(@Context PagingContext context,
- @QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count) {
+ @QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag) {
if (count == null) {
count = 1;
}
LOG.info("getLatestFlowStatus called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + count);
List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses =
- _flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count);
+ _flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag);
if (flowStatuses != null) {
return flowStatuses.stream().map(this::convertFlowStatus).collect(Collectors.toList());
@@ -128,6 +129,7 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
jobStatus.setFlowId(flowId)
.setJobId(new JobId().setJobName(queriedJobStatus.getJobName())
.setJobGroup(queriedJobStatus.getJobGroup()))
+ .setJobTag(queriedJobStatus.getJobTag(), SetMode.IGNORE_NULL)
.setExecutionStatistics(new JobStatistics()
.setExecutionStartTime(queriedJobStatus.getStartTime())
.setExecutionEndTime(queriedJobStatus.getEndTime())
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index b334c3e..c757485 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import lombok.Builder;
@@ -43,16 +44,18 @@ public class FlowStatusGenerator {
* @param flowName
* @param flowGroup
* @param count
+ * @param tag
* @return the latest <code>count</code>{@link FlowStatus}es. null is returned if there is no flow execution found.
+ * If tag is not null, the job status list only contains jobs matching the tag.
*/
- public List<FlowStatus> getLatestFlowStatus(String flowName, String flowGroup, int count) {
+ public List<FlowStatus> getLatestFlowStatus(String flowName, String flowGroup, int count, String tag) {
List<Long> flowExecutionIds = jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, count);
if (flowExecutionIds == null || flowExecutionIds.isEmpty()) {
return null;
}
List<FlowStatus> flowStatuses =
- flowExecutionIds.stream().map(flowExecutionId -> getFlowStatus(flowName, flowGroup, flowExecutionId))
+ flowExecutionIds.stream().map(flowExecutionId -> getFlowStatus(flowName, flowGroup, flowExecutionId, tag))
.collect(Collectors.toList());
return flowStatuses;
@@ -63,13 +66,20 @@ public class FlowStatusGenerator {
* @param flowName
* @param flowGroup
* @param flowExecutionId
- * @return the flow status, null is returned if the flow status does not exist
+ * @param tag String to filter the returned job statuses
+ * @return the flow status, null is returned if the flow status does not exist. If tag is not null, the job status
+ * list only contains jobs matching the tag.
*/
- public FlowStatus getFlowStatus(String flowName, String flowGroup, long flowExecutionId) {
+ public FlowStatus getFlowStatus(String flowName, String flowGroup, long flowExecutionId, String tag) {
FlowStatus flowStatus = null;
Iterator<JobStatus> jobStatusIterator =
jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
+ if (tag != null) {
+ jobStatusIterator = Iterators.filter(jobStatusIterator, js -> JobStatusRetriever.isFlowStatus(js) ||
+ (js.getJobTag() != null && js.getJobTag().equals(tag)));
+ }
+
if (jobStatusIterator.hasNext()) {
flowStatus = new FlowStatus(flowName, flowGroup, flowExecutionId, jobStatusIterator);
}
@@ -85,7 +95,7 @@ public class FlowStatusGenerator {
* @return true, if any jobs of the flow are RUNNING.
*/
public boolean isFlowRunning(String flowName, String flowGroup) {
- List<FlowStatus> flowStatusList = getLatestFlowStatus(flowName, flowGroup, 1);
+ List<FlowStatus> flowStatusList = getLatestFlowStatus(flowName, flowGroup, 1, null);
if (flowStatusList == null || flowStatusList.isEmpty()) {
return false;
} else {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
index 4f3dc2f..9a07407 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
@@ -32,6 +32,7 @@ import lombok.Getter;
public class JobStatus {
private final String jobName;
private final String jobGroup;
+ private final String jobTag;
private final long jobExecutionId;
private final long flowExecutionId;
private final String flowName;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 09068d6..fd9bdcb 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -71,6 +71,7 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
long flowExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
String jobName = jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
String jobGroup = jobState.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+ String jobTag = jobState.getProp(TimingEvent.FlowEventConstants.JOB_TAG_FIELD);
long jobExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, "0"));
String eventName = jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
long startTime = Long.parseLong(jobState.getProp(TimingEvent.JOB_START_TIME, "0"));
@@ -84,7 +85,7 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
boolean shouldRetry = Boolean.parseBoolean(jobState.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, "false"));
return JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
- jobName(jobName).jobGroup(jobGroup).jobExecutionId(jobExecutionId).eventName(eventName).
+ jobName(jobName).jobGroup(jobGroup).jobTag(jobTag).jobExecutionId(jobExecutionId).eventName(eventName).
lowWatermark(lowWatermark).highWatermark(highWatermark).startTime(startTime).endTime(endTime).
message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).
shouldRetry(shouldRetry).build();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
index afe4820..1f65833 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -27,6 +27,7 @@ import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
class TimingEventUtils {
@@ -57,6 +58,7 @@ class TimingEventUtils {
jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
jobMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
jobMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.JOB_GROUP_KEY));
+ jobMetadata.put(TimingEvent.FlowEventConstants.JOB_TAG_FIELD, ConfigUtils.getString(jobSpec.getConfig(), ConfigurationKeys.JOB_TAG_KEY, null));
jobMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, specExecutor.getClass().getCanonicalName());
jobMetadata.put(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, Integer.toString(jobExecutionPlan.getMaxAttempts()));
jobMetadata.put(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, Integer.toString(jobExecutionPlan.getCurrentAttempts()));