You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/03/01 19:38:08 UTC

[gobblin] branch master updated: [GOBBLIN-1790] Add and change appropriate job status fields for observability events (#3649)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c6239f74 [GOBBLIN-1790] Add and change appropriate job status fields for observability events (#3649)
5c6239f74 is described below

commit 5c6239f74853d3fd7f84fa3e33e9c402c3f470ac
Author: William Lo <lo...@gmail.com>
AuthorDate: Wed Mar 1 11:38:00 2023 -0800

    [GOBBLIN-1790] Add and change appropriate job status fields for observability events (#3649)
    
    * Add and change appropriate job status fields for observability events
    
    * Add workunit planning time duration and address review
    
    * Add modification time to event metadata
    
    * fix tests
---
 .../gobblin/configuration/ConfigurationKeys.java   |  2 +
 .../avro/GaaSObservabilityEventExperimental.avsc   | 18 ++++++
 .../apache/gobblin/metrics/event/TimingEvent.java  |  4 ++
 .../apache/gobblin/azkaban/AzkabanJobLauncher.java |  2 +
 .../runtime/KafkaAvroJobStatusMonitorTest.java     | 12 +++-
 .../gobblin/runtime/AbstractJobLauncher.java       |  4 +-
 .../modules/orchestration/TimingEventUtils.java    |  6 +-
 .../service/modules/spec/JobExecutionPlan.java     |  4 +-
 .../monitoring/GaaSObservabilityEventProducer.java | 18 +++---
 .../monitoring/KafkaAvroJobStatusMonitor.java      |  9 ++-
 .../spec/JobExecutionPlanDagFactoryTest.java       | 29 ++++++++-
 .../monitoring/GaaSObservabilityProducerTest.java  | 70 ++++++++++++++++++++--
 12 files changed, 158 insertions(+), 20 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index fe62d5eec..4de8177ed 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -130,6 +130,7 @@ public class ConfigurationKeys {
    */
   public static final String FLOW_NAME_KEY = "flow.name";
   public static final String FLOW_GROUP_KEY = "flow.group";
+  public static final String FLOW_EDGE_ID_KEY = "flow.edgeId";
   public static final String FLOW_DESCRIPTION_KEY = "flow.description";
   public static final String FLOW_EXECUTION_ID_KEY = "flow.executionId";
   public static final String FLOW_FAILURE_OPTION = "flow.failureOption";
@@ -139,6 +140,7 @@ public class ConfigurationKeys {
   public static final String FLOW_EXPLAIN_KEY = "flow.explain";
   public static final String FLOW_UNSCHEDULE_KEY = "flow.unschedule";
   public static final String FLOW_OWNING_GROUP_KEY = "flow.owningGroup";
+  public static final String FLOW_SPEC_EXECUTOR = "flow.edge.specExecutors";
 
   /**
    * Common topology configuration properties.
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
index 11cd927d3..9710e7b1e 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
@@ -86,6 +86,24 @@
       "doc": "Finish time of the job in millis since Epoch, null if the job was never run",
       "compliance": "NONE"
     },
+    {
+      "name": "jobPlanningPhaseStartTime",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Start time of the workunit planning phase in millis since Epoch, null if the job was never run or fails to reach this phase",
+      "compliance": "NONE"
+    },
+    {
+      "name": "jobPlanningPhaseEndTime",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "End time of the workunit planning phase in millis since Epoch, null if the job was never run or fails to reach this phase",
+      "compliance": "NONE"
+    },
     {
       "name": "executionUserUrn",
       "type": [
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index c6751a79b..39bf7de11 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -81,6 +81,8 @@ public class TimingEvent extends GobblinEventBuilder implements Closeable {
     public static final String FLOW_NAME_FIELD = "flowName";
     public static final String FLOW_GROUP_FIELD = "flowGroup";
     public static final String FLOW_EXECUTION_ID_FIELD = "flowExecutionId";
+    public static final String FLOW_EDGE_FIELD = "flowEdge";
+    public static final String FLOW_MODIFICATION_TIME_FIELD = "flowModificationTime";
     public static final String JOB_NAME_FIELD = "jobName";
     public static final String JOB_GROUP_FIELD = "jobGroup";
     public static final String JOB_TAG_FIELD = "jobTag";
@@ -105,6 +107,8 @@ public class TimingEvent extends GobblinEventBuilder implements Closeable {
   public static final String JOB_ORCHESTRATED_TIME = "jobOrchestratedTime";
   public static final String JOB_START_TIME = "jobStartTime";
   public static final String JOB_SKIPPED_TIME = "jobSkippedTime";
+  public static final String WORKUNIT_PLAN_START_TIME = "workunitPlanStartTime";
+  public static final String WORKUNIT_PLAN_END_TIME = "workunitPlanEndTime";
   public static final String JOB_END_TIME = "jobEndTime";
   public static final String JOB_LAST_PROGRESS_EVENT_TIME = "jobLastProgressEventTime";
   public static final String JOB_COMPLETION_PERCENTAGE = "jobCompletionPercentage";
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index c26d3ef89..5da49e90b 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -68,6 +68,7 @@ import org.apache.gobblin.runtime.listeners.CompositeJobListener;
 import org.apache.gobblin.runtime.listeners.EmailNotificationJobListener;
 import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.runtime.services.MetricsReportingService;
+import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.HadoopUtils;
@@ -415,6 +416,7 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
         jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "")));
     metadataTags.add(new Tag<>(TimingEvent.METADATA_MESSAGE, jobExecutionUrl));
 
+    metadataTags.add(new Tag<>(AzkabanProjectConfig.USER_TO_PROXY, jobProps.getProperty(AzkabanProjectConfig.USER_TO_PROXY, "")));
     LOG.debug(String.format("AzkabanJobLauncher.addAdditionalMetadataTags: metadataTags %s", metadataTags));
 
     return metadataTags;
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index b9a3521a8..5c78989a5 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -522,6 +522,7 @@ public class KafkaAvroJobStatusMonitorTest {
     //Submit GobblinTrackingEvents to Kafka
     ImmutableList.of(
         createFlowCompiledEvent(),
+        createWorkUnitTimingEvent(),
         createJobSucceededEvent()
     ).forEach(event -> {
       context.submitEvent(event);
@@ -545,6 +546,7 @@ public class KafkaAvroJobStatusMonitorTest {
     State state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", "NA");
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name());
 
+    getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());
 
@@ -555,7 +557,8 @@ public class KafkaAvroJobStatusMonitorTest {
     Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
     Assert.assertEquals(event1.getFlowName(), this.flowName);
     Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
-
+    Assert.assertEquals(event1.getJobPlanningPhaseStartTime(), Long.valueOf(2));
+    Assert.assertEquals(event1.getJobPlanningPhaseEndTime(), Long.valueOf(3));
     jobStatusMonitor.shutDown();
   }
 
@@ -680,9 +683,16 @@ public class KafkaAvroJobStatusMonitorTest {
     event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
     event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
     return event;
+  }
 
+  private GobblinTrackingEvent createWorkUnitTimingEvent() {
+    Map<String, String> metadata = Maps.newHashMap();
+    metadata.put(TimingEvent.METADATA_START_TIME, "2");
+    metadata.put(TimingEvent.METADATA_END_TIME, "3");
+    return createGTE(TimingEvent.RunJobTimings.WORK_UNITS_PREPARATION, metadata);
   }
 
+
   private GobblinTrackingEvent createGTE(String eventName, Map<String, String> customMetadata) {
     String namespace = "org.apache.gobblin.metrics";
     Long timestamp = System.currentTimeMillis();
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index e7c038fdf..217c95b47 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -431,8 +431,6 @@ public abstract class AbstractJobLauncher implements JobLauncher {
     String jobId = this.jobContext.getJobId();
     final JobState jobState = this.jobContext.getJobState();
     boolean isWorkUnitsEmpty = false;
-    TimingEvent workUnitsCreationTimer =
-        this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_CREATION);
     try {
       MDC.put(ConfigurationKeys.JOB_NAME_KEY, this.jobContext.getJobName());
       MDC.put(ConfigurationKeys.JOB_KEY_KEY, this.jobContext.getJobKey());
@@ -463,6 +461,8 @@ public abstract class AbstractJobLauncher implements JobLauncher {
             ((SourceDecorator<?, ?>) source).getEventBus().register(this);
           }
         }
+        TimingEvent workUnitsCreationTimer =
+            this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_CREATION);
         WorkUnitStream workUnitStream;
         if (source instanceof WorkUnitStreamSource) {
           workUnitStream = ((WorkUnitStreamSource) source).getWorkunitStream(jobState);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
index 01b6ff493..65b464c88 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -59,11 +59,15 @@ class TimingEventUtils {
     jobMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
     jobMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.JOB_GROUP_KEY));
     jobMetadata.put(TimingEvent.FlowEventConstants.JOB_TAG_FIELD, ConfigUtils.getString(jobSpec.getConfig(), ConfigurationKeys.JOB_TAG_KEY, null));
-    jobMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, specExecutor.getClass().getCanonicalName());
+    jobMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, specExecutor.getUri().toString());
     jobMetadata.put(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, Integer.toString(jobExecutionPlan.getMaxAttempts()));
     jobMetadata.put(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, Integer.toString(jobExecutionPlan.getCurrentAttempts()));
     jobMetadata.put(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, Integer.toString(jobExecutionPlan.getCurrentGeneration()));
     jobMetadata.put(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, Boolean.toString(false));
+    jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
+        ConfigUtils.getString(jobSpec.getConfig(), ConfigurationKeys.FLOW_EDGE_ID_KEY, ""));
+    jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_MODIFICATION_TIME_FIELD, Long.toString(
+        ConfigUtils.getLong(jobSpec.getConfig(), FlowSpec.MODIFICATION_TIME_KEY, 0L)));
 
     return jobMetadata;
   }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index e04e7cb90..beccb1a13 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -103,10 +103,10 @@ public class JobExecutionPlan {
       String flowFailureOption = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_FAILURE_OPTION, DagManager.DEFAULT_FLOW_FAILURE_OPTION);
       String flowInputPath = ConfigUtils.getString(flowConfig, DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX
           + "." + DatasetDescriptorConfigKeys.PATH_KEY, "");
+      Long flowModTime = ConfigUtils.getLong(flowConfig, FlowSpec.MODIFICATION_TIME_KEY, 0L);
 
       String jobName = ConfigUtils.getString(jobConfig, ConfigurationKeys.JOB_NAME_KEY, "");
       String edgeId = ConfigUtils.getString(jobConfig, FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "");
-
       // Modify the job name to include the flow group, flow name, edge id, and a random string to avoid collisions since
       // job names are assumed to be unique within a dag.
       int hash = flowInputPath.hashCode();
@@ -140,6 +140,8 @@ public class JobExecutionPlan {
           .withValue(ConfigurationKeys.JOB_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup))
           //Add flow failure option
           .withValue(ConfigurationKeys.FLOW_FAILURE_OPTION, ConfigValueFactory.fromAnyRef(flowFailureOption))
+          .withValue(ConfigurationKeys.FLOW_EDGE_ID_KEY, ConfigValueFactory.fromAnyRef(edgeId))
+          .withValue(FlowSpec.MODIFICATION_TIME_KEY, ConfigValueFactory.fromAnyRef(flowModTime))
       );
 
       //Add tracking config to JobSpec.
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
index 80599d997..e3e5ae11d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
@@ -40,7 +40,7 @@ import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
 import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
 import org.apache.gobblin.service.ExecutionStatus;
-
+import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
 
 
 /**
@@ -90,6 +90,9 @@ public abstract class GaaSObservabilityEventProducer implements Closeable {
   private GaaSObservabilityEventExperimental createGaaSObservabilityEvent(final State jobState) {
     Long jobStartTime = jobState.contains(TimingEvent.JOB_START_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_START_TIME) : null;
     Long jobEndTime = jobState.contains(TimingEvent.JOB_END_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_END_TIME) : null;
+    Long jobOrchestratedTime = jobState.contains(TimingEvent.JOB_ORCHESTRATED_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_ORCHESTRATED_TIME) : null;
+    Long jobPlanningPhaseStartTime = jobState.contains(TimingEvent.WORKUNIT_PLAN_START_TIME) ? jobState.getPropAsLong(TimingEvent.WORKUNIT_PLAN_START_TIME) : null;
+    Long jobPlanningPhaseEndTime = jobState.contains(TimingEvent.WORKUNIT_PLAN_END_TIME) ? jobState.getPropAsLong(TimingEvent.WORKUNIT_PLAN_END_TIME) : null;
     GaaSObservabilityEventExperimental.Builder builder = GaaSObservabilityEventExperimental.newBuilder();
     List<Issue> issueList = null;
     try {
@@ -105,19 +108,20 @@ public abstract class GaaSObservabilityEventProducer implements Closeable {
     builder.setTimestamp(System.currentTimeMillis())
         .setFlowName(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD))
         .setFlowGroup(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD))
+        .setFlowGraphEdgeId(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, ""))
         .setFlowExecutionId(jobState.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))
+        .setLastFlowModificationTime(jobState.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_MODIFICATION_TIME_FIELD, 0))
         .setJobName(jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD))
         .setExecutorUrl(jobState.getProp(TimingEvent.METADATA_MESSAGE))
+        .setExecutorId(jobState.getProp(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, ""))
         .setJobStartTime(jobStartTime)
         .setJobEndTime(jobEndTime)
+        .setJobOrchestratedTime(jobOrchestratedTime)
+        .setJobPlanningPhaseStartTime(jobPlanningPhaseStartTime)
+        .setJobPlanningPhaseEndTime(jobPlanningPhaseEndTime)
         .setIssues(issueList)
         .setJobStatus(status)
-        // TODO: Populate the below fields in a separate PR
-        .setExecutionUserUrn(null)
-        .setExecutorId("")
-        .setLastFlowModificationTime(0)
-        .setFlowGraphEdgeId("")
-        .setJobOrchestratedTime(null); // TODO: Investigate why TimingEvent.JOB_ORCHESTRATED_TIME is never propagated to the JobStatus
+        .setExecutionUserUrn(jobState.getProp(AzkabanProjectConfig.USER_TO_PROXY, null));
     return builder.build();
   }
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index 20328c36d..23de601a1 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -127,6 +127,11 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
       case TimingEvent.FlowTimings.FLOW_COMPILED:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPILED.name());
         break;
+      case TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION:
+        properties.put(TimingEvent.WORKUNIT_PLAN_START_TIME, properties.getProperty(TimingEvent.METADATA_START_TIME));
+        properties.put(TimingEvent.WORKUNIT_PLAN_END_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
+        break;
+      case TimingEvent.LauncherTimings.JOB_START:
       case TimingEvent.FlowTimings.FLOW_RUNNING:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
         break;
@@ -141,11 +146,9 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.ORCHESTRATED.name());
         properties.put(TimingEvent.JOB_ORCHESTRATED_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
         break;
-      case TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION:
       case TimingEvent.LauncherTimings.JOB_PREPARE:
-      case TimingEvent.LauncherTimings.JOB_START:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
-        properties.put(TimingEvent.JOB_START_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
+        properties.put(TimingEvent.JOB_START_TIME, properties.getProperty(TimingEvent.METADATA_START_TIME));
         break;
       case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
       case TimingEvent.LauncherTimings.JOB_SUCCEEDED:
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
index 7463a2565..83cb05f02 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -77,6 +77,7 @@ public class JobExecutionPlanDagFactoryTest {
     properties = new Properties();
     properties.put("specStore.fs.dir", "/tmp/testSpecStoreDir");
     properties.put("specExecInstance.capabilities", "source:destination");
+    properties.put(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, "testSpecExecutorInstanceUri");
     Config specExecutorConfig = ConfigUtils.propertiesToConfig(properties);
     this.specExecutor = new InMemorySpecExecutor(specExecutorConfig);
   }
@@ -89,7 +90,8 @@ public class JobExecutionPlanDagFactoryTest {
       Config config = jobTemplate.getRawTemplateConfig()
           .withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef("testFlowName"))
           .withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef("testFlowGroup"))
-          .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+          .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(System.currentTimeMillis()))
+          .withValue(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, ConfigValueFactory.fromAnyRef("source:destination:edgeName1"));
 
       String jobSpecUri = Files.getNameWithoutExtension(new Path(jobTemplate.getUri()).getName());
       jobExecutionPlans.add(new JobExecutionPlan(JobSpec.builder(jobSpecUri).withConfig(config).
@@ -111,6 +113,10 @@ public class JobExecutionPlanDagFactoryTest {
     Assert.assertEquals(endNodeName, "job4");
     templateUri = new Path(dag.getEndNodes().get(0).getValue().getJobSpec().getTemplateURI().get()).getName();
     Assert.assertEquals(templateUri, "job4.job");
+    String flowEdgeId = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY);
+    Assert.assertEquals(flowEdgeId, "source:destination:edgeName1");
+    String specExecutorId = dag.getStartNodes().get(0).getValue().getSpecExecutor().getUri().toString();
+    Assert.assertEquals(specExecutorId, "testSpecExecutorInstanceUri");
 
     Dag.DagNode<JobExecutionPlan> startNode = dag.getStartNodes().get(0);
     List<Dag.DagNode<JobExecutionPlan>> nextNodes = dag.getChildren(startNode);
@@ -227,7 +233,28 @@ public class JobExecutionPlanDagFactoryTest {
     Dag<JobExecutionPlan> dag1 = new JobExecutionPlanDagFactory().createDag(Arrays.asList(jobExecutionPlan));
 
     Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY).length(), 142);
+  }
+
+  @Test
+  public void testCreateJobSpecAdditionalProps() throws Exception {
+    long currentTime = System.currentTimeMillis();
+    Config flowConfig = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName")
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup")
+        .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * * ?")
+        .addPrimitive(FlowSpec.MODIFICATION_TIME_KEY, currentTime).build();
+
+    Config jobConfig = ConfigBuilder.create()
+        .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName1")
+        .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * * ?").build();
+
+    FlowSpec flowSpec = FlowSpec.builder("testFlowSpec").withConfig(flowConfig).build();
+    JobExecutionPlan jobExecutionPlan = new JobExecutionPlan.Factory().createPlan(flowSpec, jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
+        ConfigValueFactory.fromAnyRef("testUri")), new InMemorySpecExecutor(ConfigFactory.empty()), 0L, ConfigFactory.empty());
+
+    Dag<JobExecutionPlan> dag1 = new JobExecutionPlanDagFactory().createDag(Arrays.asList(jobExecutionPlan));
 
+    Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getLong(FlowSpec.MODIFICATION_TIME_KEY), currentTime);
+    Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EDGE_ID_KEY), "source:destination:edgeName1");
   }
 
 }
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
index e97d56524..2055d455e 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
@@ -41,6 +41,7 @@ import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
 
 
 public class GaaSObservabilityProducerTest {
@@ -48,7 +49,7 @@ public class GaaSObservabilityProducerTest {
   private MultiContextIssueRepository issueRepository = new InMemoryMultiContextIssueRepository();
 
   @Test
-  public void testCreateGaaSObservabilityEvent() throws Exception {
+  public void testCreateGaaSObservabilityEventWithFullMetadata() throws Exception {
     String flowGroup = "testFlowGroup1";
     String flowName = "testFlowName1";
     String jobName = String.format("%s_%s_%s", flowGroup, flowName, "testJobName1");
@@ -61,13 +62,18 @@ public class GaaSObservabilityProducerTest {
     Map<String, String> gteEventMetadata = Maps.newHashMap();
     gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
     gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, "1");
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, flowExecutionId);
     gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
     gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, flowName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, "flowEdge");
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, "specExecutor");
+    gteEventMetadata.put(AzkabanProjectConfig.USER_TO_PROXY, "azkabanUser");
     gteEventMetadata.put(TimingEvent.METADATA_MESSAGE, "hostName");
-    gteEventMetadata.put(TimingEvent.METADATA_START_TIME, "1");
-    gteEventMetadata.put(TimingEvent.METADATA_END_TIME, "100");
+    gteEventMetadata.put(TimingEvent.JOB_START_TIME, "20");
+    gteEventMetadata.put(TimingEvent.JOB_END_TIME, "100");
     gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPLETE.name());
+    gteEventMetadata.put(TimingEvent.JOB_ORCHESTRATED_TIME, "1");
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_MODIFICATION_TIME_FIELD, "20");
 
     Properties jobStatusProps = new Properties();
     jobStatusProps.putAll(gteEventMetadata);
@@ -85,6 +91,62 @@ public class GaaSObservabilityProducerTest {
     Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED);
     Assert.assertEquals(event.getExecutorUrl(), "hostName");
     Assert.assertEquals(event.getIssues().size(), 1);
+    Assert.assertEquals(event.getFlowGraphEdgeId(), "flowEdge");
+    Assert.assertEquals(event.getExecutorId(), "specExecutor");
+    Assert.assertEquals(event.getExecutionUserUrn(), "azkabanUser");
+    Assert.assertEquals(event.getJobOrchestratedTime(), Long.valueOf(1));
+    Assert.assertEquals(event.getLastFlowModificationTime(), Long.valueOf(20));
+    Assert.assertEquals(event.getJobStartTime(), Long.valueOf(20));
+    Assert.assertEquals(event.getJobEndTime(), Long.valueOf(100));
+
+    AvroSerializer<GaaSObservabilityEventExperimental> serializer = new AvroBinarySerializer<>(
+        GaaSObservabilityEventExperimental.SCHEMA$, new NoopSchemaVersionWriter()
+    );
+    serializer.serializeRecord(event);
+  }
+
+  @Test
+  public void testCreateGaaSObservabilityEventWithPartialMetadata() throws Exception {
+    String flowGroup = "testFlowGroup2";
+    String flowName = "testFlowName2";
+    String jobName = String.format("%s_%s_%s", flowGroup, flowName, "testJobName1");
+    String flowExecutionId = "1";
+    this.issueRepository.put(
+        TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, flowExecutionId, jobName),
+        createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+    );
+    MockGaaSObservabilityEventProducer producer = new MockGaaSObservabilityEventProducer(new State(), this.issueRepository);
+    Map<String, String> gteEventMetadata = Maps.newHashMap();
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, "1");
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, flowName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, "flowEdge");
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, "specExecutor");
+    gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.CANCELLED.name());
+
+    Properties jobStatusProps = new Properties();
+    jobStatusProps.putAll(gteEventMetadata);
+    producer.emitObservabilityEvent(new State(jobStatusProps));
+
+    List<GaaSObservabilityEventExperimental> emittedEvents = producer.getTestEmittedEvents();
+
+    Assert.assertEquals(emittedEvents.size(), 1);
+    Iterator<GaaSObservabilityEventExperimental> iterator = emittedEvents.iterator();
+    GaaSObservabilityEventExperimental event = iterator.next();
+    Assert.assertEquals(event.getFlowGroup(), flowGroup);
+    Assert.assertEquals(event.getFlowName(), flowName);
+    Assert.assertEquals(event.getJobName(), jobName);
+    Assert.assertEquals(event.getFlowExecutionId(), Long.valueOf(flowExecutionId));
+    Assert.assertEquals(event.getJobStatus(), JobStatus.CANCELLED);
+    Assert.assertEquals(event.getIssues().size(), 1);
+    Assert.assertEquals(event.getFlowGraphEdgeId(), "flowEdge");
+    Assert.assertEquals(event.getExecutorId(), "specExecutor");
+    Assert.assertEquals(event.getJobOrchestratedTime(), null);
+    Assert.assertEquals(event.getJobStartTime(), null);
+    Assert.assertEquals(event.getExecutionUserUrn(), null);
+    Assert.assertEquals(event.getExecutorUrl(), null);
 
     AvroSerializer<GaaSObservabilityEventExperimental> serializer = new AvroBinarySerializer<>(
         GaaSObservabilityEventExperimental.SCHEMA$, new NoopSchemaVersionWriter()