You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/09/27 20:26:11 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-881] Add job tag field that can be used to filter job statuses

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d4795e  [GOBBLIN-881] Add job tag field that can be used to filter job statuses
5d4795e is described below

commit 5d4795e28fe7502b0e2233fc830c2b3f7adeb637
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Fri Sep 27 13:26:03 2019 -0700

    [GOBBLIN-881] Add job tag field that can be used to filter job statuses
    
    Closes #2735 from jack-moseley/job-tags
---
 .../gobblin/configuration/ConfigurationKeys.java     |  1 +
 .../apache/gobblin/metrics/event/TimingEvent.java    |  1 +
 ...apache.gobblin.service.flowstatuses.restspec.json |  4 ++++
 .../org/apache/gobblin/service/JobStatus.pdsc        |  6 ++++++
 ...apache.gobblin.service.flowstatuses.snapshot.json |  9 +++++++++
 .../org/apache/gobblin/service/FlowStatusClient.java |  7 ++++---
 .../org/apache/gobblin/service/FlowStatusTest.java   | 17 +++++++++++++----
 .../apache/gobblin/service/FlowStatusResource.java   |  8 +++++---
 .../service/monitoring/FlowStatusGenerator.java      | 20 +++++++++++++++-----
 .../apache/gobblin/service/monitoring/JobStatus.java |  1 +
 .../service/monitoring/JobStatusRetriever.java       |  3 ++-
 .../modules/orchestration/TimingEventUtils.java      |  2 ++
 12 files changed, 63 insertions(+), 16 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 3538c89..a485427 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -152,6 +152,7 @@ public class ConfigurationKeys {
    */
   public static final String JOB_NAME_KEY = "job.name";
   public static final String JOB_GROUP_KEY = "job.group";
+  public static final String JOB_TAG_KEY = "job.tag";
   public static final String JOB_DESCRIPTION_KEY = "job.description";
   // Job launcher type
   public static final String JOB_LAUNCHER_TYPE_KEY = "launcher.type";
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index d09629a..a16da82 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -78,6 +78,7 @@ public class TimingEvent extends GobblinEventBuilder implements Closeable {
     public static final String FLOW_EXECUTION_ID_FIELD = "flowExecutionId";
     public static final String JOB_NAME_FIELD = "jobName";
     public static final String JOB_GROUP_FIELD = "jobGroup";
+    public static final String JOB_TAG_FIELD = "jobTag";
     public static final String JOB_EXECUTION_ID_FIELD = "jobExecutionId";
     public static final String SPEC_EXECUTOR_FIELD = "specExecutor";
     public static final String LOW_WATERMARK_FIELD = "lowWatermark";
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json
index 9912e43..6e6b67e 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json
@@ -24,6 +24,10 @@
         "name" : "count",
         "type" : "int",
         "optional" : true
+      }, {
+        "name" : "tag",
+        "type" : "string",
+        "optional" : true
       } ]
     } ],
     "entity" : {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatus.pdsc b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatus.pdsc
index 7e8e48a..f13d366 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatus.pdsc
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatus.pdsc
@@ -15,6 +15,12 @@
       "doc" : "Identifier of the job"
     },
     {
+      "name" : "jobTag",
+      "type" : "string",
+      "optional" : true,
+      "doc" : "Tag of the job"
+    },
+    {
       "name" : "executionStatus",
       "type" : "ExecutionStatus",
       "doc" : "Job execution status"
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
index d84ea73..a161726 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
@@ -144,6 +144,11 @@
       "type" : "JobId",
       "doc" : "Identifier of the job"
     }, {
+      "name" : "jobTag",
+      "type" : "string",
+      "doc" : "Tag of the job",
+      "optional" : true
+    }, {
       "name" : "executionStatus",
       "type" : "ExecutionStatus",
       "doc" : "Job execution status"
@@ -225,6 +230,10 @@
           "name" : "count",
           "type" : "int",
           "optional" : true
+        }, {
+          "name" : "tag",
+          "type" : "string",
+          "optional" : true
         } ]
       } ],
       "entity" : {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
index 08a9fbe..3da1547 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
@@ -131,16 +131,17 @@ public class FlowStatusClient implements Closeable {
   /**
    * Get the latest flow status
    * @param flowId identifier of flow status to get
-   * @return a list of {@link FlowStatus}es corresponding to the latest <code>count</code> executions.
+   * @return a list of {@link FlowStatus}es corresponding to the latest <code>count</code> executions, containing only
+   * jobStatuses that match the given tag.
    * @throws RemoteInvocationException
    */
-  public List<FlowStatus> getLatestFlowStatus(FlowId flowId, Integer count)
+  public List<FlowStatus> getLatestFlowStatus(FlowId flowId, Integer count, String tag)
       throws RemoteInvocationException {
     LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " +
         flowId.getFlowName() + " count " + Integer.toString(count));
 
     FindRequest<FlowStatus> findRequest = _flowstatusesRequestBuilders.findByLatestFlowStatus().flowIdParam(flowId).
-        addReqParam("count", count, Integer.class).build();
+        addReqParam("count", count, Integer.class).addParam("tag", tag, String.class).build();
 
     Response<CollectionResponse<FlowStatus>> response =
         _restClient.get().sendRequest(findRequest).getResponse();
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index 6a86607..b008f83 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -125,14 +125,18 @@ public class FlowStatusTest {
         .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(5000L)
         .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).build();
     org.apache.gobblin.service.monitoring.JobStatus js2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
-        .flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(2000L).endTime(6000L)
+        .flowName("flow1").jobGroup("jgroup1").jobName("job1").jobTag("dataset1").startTime(2000L).endTime(6000L)
         .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Test message 2")
         .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
+    org.apache.gobblin.service.monitoring.JobStatus js3 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
+        .flowName("flow1").jobGroup("jgroup1").jobName("job2").jobTag("dataset2").startTime(2000L).endTime(6000L)
+        .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Test message 3")
+        .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
     org.apache.gobblin.service.monitoring.JobStatus fs2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
         .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
         .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).build();
     List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList1 = Lists.newArrayList(js1, fs1);
-    List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList2 = Lists.newArrayList(js2, fs2);
+    List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList2 = Lists.newArrayList(js2, js3, fs2);
     _listOfJobStatusLists = Lists.newArrayList();
     _listOfJobStatusLists.add(jobStatusList1);
     _listOfJobStatusLists.add(jobStatusList2);
@@ -144,7 +148,7 @@ public class FlowStatusTest {
     Assert.assertEquals(flowStatus.getId().getFlowName(), "flow1");
     Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(), 1L);
     Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 7000L);
-    Assert.assertEquals(flowStatus.getMessage(), js2.getMessage());
+    Assert.assertEquals(flowStatus.getMessage(), messageJoiner.join(js2.getMessage(), js3.getMessage()));
     Assert.assertEquals(flowStatus.getExecutionStatus(), ExecutionStatus.COMPLETE);
 
     JobStatusArray jobStatuses = flowStatus.getJobStatuses();
@@ -158,10 +162,15 @@ public class FlowStatusTest {
       compareJobStatus(js, mjs);
     }
 
-    List<FlowStatus> flowStatusList = _client.getLatestFlowStatus(flowId, 2);
+    List<FlowStatus> flowStatusList = _client.getLatestFlowStatus(flowId, 2, null);
     Assert.assertEquals(flowStatusList.size(), 2);
     Assert.assertEquals(flowStatusList.get(0).getId().getFlowExecutionId(), (Long) 1L);
     Assert.assertEquals(flowStatusList.get(1).getId().getFlowExecutionId(), (Long) 0L);
+    Assert.assertEquals(flowStatusList.get(0).getJobStatuses().size(), 2);
+
+    List<FlowStatus> flowStatusList2 = _client.getLatestFlowStatus(flowId, 1, "dataset1");
+    Assert.assertEquals(flowStatusList2.get(0).getJobStatuses().size(), 1);
+    Assert.assertEquals(flowStatusList2.get(0).getJobStatuses().get(0).getJobTag(), "dataset1");
   }
 
   /**
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index 9cd9508..767efa4 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
+import com.linkedin.data.template.SetMode;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.server.PagingContext;
@@ -68,7 +69,7 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
     LOG.info("Get called with flowGroup " + flowGroup + " flowName " + flowName + " flowExecutionId " + flowExecutionId);
 
     org.apache.gobblin.service.monitoring.FlowStatus flowStatus =
-        _flowStatusGenerator.getFlowStatus(flowName, flowGroup, flowExecutionId);
+        _flowStatusGenerator.getFlowStatus(flowName, flowGroup, flowExecutionId, null);
 
     // this returns null to raise a 404 error if flowStatus is null
     return convertFlowStatus(flowStatus);
@@ -76,14 +77,14 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
 
   @Finder("latestFlowStatus")
   public List<FlowStatus> getLatestFlowStatus(@Context PagingContext context,
-      @QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count) {
+      @QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag) {
     if (count == null) {
       count = 1;
     }
     LOG.info("getLatestFlowStatus called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + count);
 
     List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses =
-        _flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count);
+        _flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag);
 
     if (flowStatuses != null) {
       return flowStatuses.stream().map(this::convertFlowStatus).collect(Collectors.toList());
@@ -128,6 +129,7 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
       jobStatus.setFlowId(flowId)
           .setJobId(new JobId().setJobName(queriedJobStatus.getJobName())
               .setJobGroup(queriedJobStatus.getJobGroup()))
+          .setJobTag(queriedJobStatus.getJobTag(), SetMode.IGNORE_NULL)
           .setExecutionStatistics(new JobStatistics()
               .setExecutionStartTime(queriedJobStatus.getStartTime())
               .setExecutionEndTime(queriedJobStatus.getEndTime())
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index b334c3e..c757485 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 
 import lombok.Builder;
@@ -43,16 +44,18 @@ public class FlowStatusGenerator {
    * @param flowName
    * @param flowGroup
    * @param count
+   * @param tag
    * @return the latest <code>count</code>{@link FlowStatus}es. null is returned if there is no flow execution found.
+   * If tag is not null, the job status list only contains jobs matching the tag.
    */
-  public List<FlowStatus> getLatestFlowStatus(String flowName, String flowGroup, int count) {
+  public List<FlowStatus> getLatestFlowStatus(String flowName, String flowGroup, int count, String tag) {
     List<Long> flowExecutionIds = jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, count);
 
     if (flowExecutionIds == null || flowExecutionIds.isEmpty()) {
       return null;
     }
     List<FlowStatus> flowStatuses =
-        flowExecutionIds.stream().map(flowExecutionId -> getFlowStatus(flowName, flowGroup, flowExecutionId))
+        flowExecutionIds.stream().map(flowExecutionId -> getFlowStatus(flowName, flowGroup, flowExecutionId, tag))
             .collect(Collectors.toList());
 
     return flowStatuses;
@@ -63,13 +66,20 @@ public class FlowStatusGenerator {
    * @param flowName
    * @param flowGroup
    * @param flowExecutionId
-   * @return the flow status, null is returned if the flow status does not exist
+   * @param tag String to filter the returned job statuses
+   * @return the flow status, null is returned if the flow status does not exist. If tag is not null, the job status
+   * list only contains jobs matching the tag.
    */
-  public FlowStatus getFlowStatus(String flowName, String flowGroup, long flowExecutionId) {
+  public FlowStatus getFlowStatus(String flowName, String flowGroup, long flowExecutionId, String tag) {
     FlowStatus flowStatus = null;
     Iterator<JobStatus> jobStatusIterator =
         jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
 
+    if (tag != null) {
+      jobStatusIterator = Iterators.filter(jobStatusIterator, js -> JobStatusRetriever.isFlowStatus(js) ||
+          (js.getJobTag() != null && js.getJobTag().equals(tag)));
+    }
+
     if (jobStatusIterator.hasNext()) {
       flowStatus = new FlowStatus(flowName, flowGroup, flowExecutionId, jobStatusIterator);
     }
@@ -85,7 +95,7 @@ public class FlowStatusGenerator {
    * @return true, if any jobs of the flow are RUNNING.
    */
   public boolean isFlowRunning(String flowName, String flowGroup) {
-    List<FlowStatus> flowStatusList = getLatestFlowStatus(flowName, flowGroup, 1);
+    List<FlowStatus> flowStatusList = getLatestFlowStatus(flowName, flowGroup, 1, null);
     if (flowStatusList == null || flowStatusList.isEmpty()) {
       return false;
     } else {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
index 4f3dc2f..9a07407 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
@@ -32,6 +32,7 @@ import lombok.Getter;
 public class JobStatus {
   private final String jobName;
   private final String jobGroup;
+  private final String jobTag;
   private final long jobExecutionId;
   private final long flowExecutionId;
   private final String flowName;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 09068d6..fd9bdcb 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -71,6 +71,7 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
     long flowExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
     String jobName = jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
     String jobGroup = jobState.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+    String jobTag = jobState.getProp(TimingEvent.FlowEventConstants.JOB_TAG_FIELD);
     long jobExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, "0"));
     String eventName = jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
     long startTime = Long.parseLong(jobState.getProp(TimingEvent.JOB_START_TIME, "0"));
@@ -84,7 +85,7 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
     boolean shouldRetry = Boolean.parseBoolean(jobState.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, "false"));
 
     return JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
-        jobName(jobName).jobGroup(jobGroup).jobExecutionId(jobExecutionId).eventName(eventName).
+        jobName(jobName).jobGroup(jobGroup).jobTag(jobTag).jobExecutionId(jobExecutionId).eventName(eventName).
         lowWatermark(lowWatermark).highWatermark(highWatermark).startTime(startTime).endTime(endTime).
         message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).
         shouldRetry(shouldRetry).build();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
index afe4820..1f65833 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -27,6 +27,7 @@ import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 class TimingEventUtils {
@@ -57,6 +58,7 @@ class TimingEventUtils {
     jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
     jobMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
     jobMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.JOB_GROUP_KEY));
+    jobMetadata.put(TimingEvent.FlowEventConstants.JOB_TAG_FIELD, ConfigUtils.getString(jobSpec.getConfig(), ConfigurationKeys.JOB_TAG_KEY, null));
     jobMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, specExecutor.getClass().getCanonicalName());
     jobMetadata.put(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, Integer.toString(jobExecutionPlan.getMaxAttempts()));
     jobMetadata.put(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, Integer.toString(jobExecutionPlan.getCurrentAttempts()));