You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/03/01 19:38:08 UTC
[gobblin] branch master updated: [GOBBLIN-1790] Add and change appropriate job status fields for observability events (#3649)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5c6239f74 [GOBBLIN-1790] Add and change appropriate job status fields for observability events (#3649)
5c6239f74 is described below
commit 5c6239f74853d3fd7f84fa3e33e9c402c3f470ac
Author: William Lo <lo...@gmail.com>
AuthorDate: Wed Mar 1 11:38:00 2023 -0800
[GOBBLIN-1790] Add and change appropriate job status fields for observability events (#3649)
* Add and change appropriate job status fields for observability events
* Add workunit planning time duration and address review
* Add modification time to event metadata
* fix tests
---
.../gobblin/configuration/ConfigurationKeys.java | 2 +
.../avro/GaaSObservabilityEventExperimental.avsc | 18 ++++++
.../apache/gobblin/metrics/event/TimingEvent.java | 4 ++
.../apache/gobblin/azkaban/AzkabanJobLauncher.java | 2 +
.../runtime/KafkaAvroJobStatusMonitorTest.java | 12 +++-
.../gobblin/runtime/AbstractJobLauncher.java | 4 +-
.../modules/orchestration/TimingEventUtils.java | 6 +-
.../service/modules/spec/JobExecutionPlan.java | 4 +-
.../monitoring/GaaSObservabilityEventProducer.java | 18 +++---
.../monitoring/KafkaAvroJobStatusMonitor.java | 9 ++-
.../spec/JobExecutionPlanDagFactoryTest.java | 29 ++++++++-
.../monitoring/GaaSObservabilityProducerTest.java | 70 ++++++++++++++++++++--
12 files changed, 158 insertions(+), 20 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index fe62d5eec..4de8177ed 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -130,6 +130,7 @@ public class ConfigurationKeys {
*/
public static final String FLOW_NAME_KEY = "flow.name";
public static final String FLOW_GROUP_KEY = "flow.group";
+ public static final String FLOW_EDGE_ID_KEY = "flow.edgeId";
public static final String FLOW_DESCRIPTION_KEY = "flow.description";
public static final String FLOW_EXECUTION_ID_KEY = "flow.executionId";
public static final String FLOW_FAILURE_OPTION = "flow.failureOption";
@@ -139,6 +140,7 @@ public class ConfigurationKeys {
public static final String FLOW_EXPLAIN_KEY = "flow.explain";
public static final String FLOW_UNSCHEDULE_KEY = "flow.unschedule";
public static final String FLOW_OWNING_GROUP_KEY = "flow.owningGroup";
+ public static final String FLOW_SPEC_EXECUTOR = "flow.edge.specExecutors";
/**
* Common topology configuration properties.
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
index 11cd927d3..9710e7b1e 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
@@ -86,6 +86,24 @@
"doc": "Finish time of the job in millis since Epoch, null if the job was never run",
"compliance": "NONE"
},
+ {
+ "name": "jobPlanningPhaseStartTime",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Start time of the workunit planning phase in millis since Epoch, null if the job was never run or fails to reach this phase",
+ "compliance": "NONE"
+ },
+ {
+ "name": "jobPlanningPhaseEndTime",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "End time of the workunit planning phase in millis since Epoch, null if the job was never run or fails to reach this phase",
+ "compliance": "NONE"
+ },
{
"name": "executionUserUrn",
"type": [
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index c6751a79b..39bf7de11 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -81,6 +81,8 @@ public class TimingEvent extends GobblinEventBuilder implements Closeable {
public static final String FLOW_NAME_FIELD = "flowName";
public static final String FLOW_GROUP_FIELD = "flowGroup";
public static final String FLOW_EXECUTION_ID_FIELD = "flowExecutionId";
+ public static final String FLOW_EDGE_FIELD = "flowEdge";
+ public static final String FLOW_MODIFICATION_TIME_FIELD = "flowModificationTime";
public static final String JOB_NAME_FIELD = "jobName";
public static final String JOB_GROUP_FIELD = "jobGroup";
public static final String JOB_TAG_FIELD = "jobTag";
@@ -105,6 +107,8 @@ public class TimingEvent extends GobblinEventBuilder implements Closeable {
public static final String JOB_ORCHESTRATED_TIME = "jobOrchestratedTime";
public static final String JOB_START_TIME = "jobStartTime";
public static final String JOB_SKIPPED_TIME = "jobSkippedTime";
+ public static final String WORKUNIT_PLAN_START_TIME = "workunitPlanStartTime";
+ public static final String WORKUNIT_PLAN_END_TIME = "workunitPlanEndTime";
public static final String JOB_END_TIME = "jobEndTime";
public static final String JOB_LAST_PROGRESS_EVENT_TIME = "jobLastProgressEventTime";
public static final String JOB_COMPLETION_PERCENTAGE = "jobCompletionPercentage";
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index c26d3ef89..5da49e90b 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -68,6 +68,7 @@ import org.apache.gobblin.runtime.listeners.CompositeJobListener;
import org.apache.gobblin.runtime.listeners.EmailNotificationJobListener;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.runtime.services.MetricsReportingService;
+import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.HadoopUtils;
@@ -415,6 +416,7 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "")));
metadataTags.add(new Tag<>(TimingEvent.METADATA_MESSAGE, jobExecutionUrl));
+ metadataTags.add(new Tag<>(AzkabanProjectConfig.USER_TO_PROXY, jobProps.getProperty(AzkabanProjectConfig.USER_TO_PROXY, "")));
LOG.debug(String.format("AzkabanJobLauncher.addAdditionalMetadataTags: metadataTags %s", metadataTags));
return metadataTags;
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index b9a3521a8..5c78989a5 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -522,6 +522,7 @@ public class KafkaAvroJobStatusMonitorTest {
//Submit GobblinTrackingEvents to Kafka
ImmutableList.of(
createFlowCompiledEvent(),
+ createWorkUnitTimingEvent(),
createJobSucceededEvent()
).forEach(event -> {
context.submitEvent(event);
@@ -545,6 +546,7 @@ public class KafkaAvroJobStatusMonitorTest {
State state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", "NA");
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name());
+ getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());
@@ -555,7 +557,8 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
Assert.assertEquals(event1.getFlowName(), this.flowName);
Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
-
+ Assert.assertEquals(event1.getJobPlanningPhaseStartTime(), Long.valueOf(2));
+ Assert.assertEquals(event1.getJobPlanningPhaseEndTime(), Long.valueOf(3));
jobStatusMonitor.shutDown();
}
@@ -680,9 +683,16 @@ public class KafkaAvroJobStatusMonitorTest {
event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
return event;
+ }
+ private GobblinTrackingEvent createWorkUnitTimingEvent() {
+ Map<String, String> metadata = Maps.newHashMap();
+ metadata.put(TimingEvent.METADATA_START_TIME, "2");
+ metadata.put(TimingEvent.METADATA_END_TIME, "3");
+ return createGTE(TimingEvent.RunJobTimings.WORK_UNITS_PREPARATION, metadata);
}
+
private GobblinTrackingEvent createGTE(String eventName, Map<String, String> customMetadata) {
String namespace = "org.apache.gobblin.metrics";
Long timestamp = System.currentTimeMillis();
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index e7c038fdf..217c95b47 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -431,8 +431,6 @@ public abstract class AbstractJobLauncher implements JobLauncher {
String jobId = this.jobContext.getJobId();
final JobState jobState = this.jobContext.getJobState();
boolean isWorkUnitsEmpty = false;
- TimingEvent workUnitsCreationTimer =
- this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_CREATION);
try {
MDC.put(ConfigurationKeys.JOB_NAME_KEY, this.jobContext.getJobName());
MDC.put(ConfigurationKeys.JOB_KEY_KEY, this.jobContext.getJobKey());
@@ -463,6 +461,8 @@ public abstract class AbstractJobLauncher implements JobLauncher {
((SourceDecorator<?, ?>) source).getEventBus().register(this);
}
}
+ TimingEvent workUnitsCreationTimer =
+ this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_CREATION);
WorkUnitStream workUnitStream;
if (source instanceof WorkUnitStreamSource) {
workUnitStream = ((WorkUnitStreamSource) source).getWorkunitStream(jobState);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
index 01b6ff493..65b464c88 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -59,11 +59,15 @@ class TimingEventUtils {
jobMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
jobMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.JOB_GROUP_KEY));
jobMetadata.put(TimingEvent.FlowEventConstants.JOB_TAG_FIELD, ConfigUtils.getString(jobSpec.getConfig(), ConfigurationKeys.JOB_TAG_KEY, null));
- jobMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, specExecutor.getClass().getCanonicalName());
+ jobMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, specExecutor.getUri().toString());
jobMetadata.put(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, Integer.toString(jobExecutionPlan.getMaxAttempts()));
jobMetadata.put(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, Integer.toString(jobExecutionPlan.getCurrentAttempts()));
jobMetadata.put(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, Integer.toString(jobExecutionPlan.getCurrentGeneration()));
jobMetadata.put(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, Boolean.toString(false));
+ jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
+ ConfigUtils.getString(jobSpec.getConfig(), ConfigurationKeys.FLOW_EDGE_ID_KEY, ""));
+ jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_MODIFICATION_TIME_FIELD, Long.toString(
+ ConfigUtils.getLong(jobSpec.getConfig(), FlowSpec.MODIFICATION_TIME_KEY, 0L)));
return jobMetadata;
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index e04e7cb90..beccb1a13 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -103,10 +103,10 @@ public class JobExecutionPlan {
String flowFailureOption = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_FAILURE_OPTION, DagManager.DEFAULT_FLOW_FAILURE_OPTION);
String flowInputPath = ConfigUtils.getString(flowConfig, DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX
+ "." + DatasetDescriptorConfigKeys.PATH_KEY, "");
+ Long flowModTime = ConfigUtils.getLong(flowConfig, FlowSpec.MODIFICATION_TIME_KEY, 0L);
String jobName = ConfigUtils.getString(jobConfig, ConfigurationKeys.JOB_NAME_KEY, "");
String edgeId = ConfigUtils.getString(jobConfig, FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "");
-
// Modify the job name to include the flow group, flow name, edge id, and a random string to avoid collisions since
// job names are assumed to be unique within a dag.
int hash = flowInputPath.hashCode();
@@ -140,6 +140,8 @@ public class JobExecutionPlan {
.withValue(ConfigurationKeys.JOB_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup))
//Add flow failure option
.withValue(ConfigurationKeys.FLOW_FAILURE_OPTION, ConfigValueFactory.fromAnyRef(flowFailureOption))
+ .withValue(ConfigurationKeys.FLOW_EDGE_ID_KEY, ConfigValueFactory.fromAnyRef(edgeId))
+ .withValue(FlowSpec.MODIFICATION_TIME_KEY, ConfigValueFactory.fromAnyRef(flowModTime))
);
//Add tracking config to JobSpec.
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
index 80599d997..e3e5ae11d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
@@ -40,7 +40,7 @@ import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
import org.apache.gobblin.service.ExecutionStatus;
-
+import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
/**
@@ -90,6 +90,9 @@ public abstract class GaaSObservabilityEventProducer implements Closeable {
private GaaSObservabilityEventExperimental createGaaSObservabilityEvent(final State jobState) {
Long jobStartTime = jobState.contains(TimingEvent.JOB_START_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_START_TIME) : null;
Long jobEndTime = jobState.contains(TimingEvent.JOB_END_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_END_TIME) : null;
+ Long jobOrchestratedTime = jobState.contains(TimingEvent.JOB_ORCHESTRATED_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_ORCHESTRATED_TIME) : null;
+ Long jobPlanningPhaseStartTime = jobState.contains(TimingEvent.WORKUNIT_PLAN_START_TIME) ? jobState.getPropAsLong(TimingEvent.WORKUNIT_PLAN_START_TIME) : null;
+ Long jobPlanningPhaseEndTime = jobState.contains(TimingEvent.WORKUNIT_PLAN_END_TIME) ? jobState.getPropAsLong(TimingEvent.WORKUNIT_PLAN_END_TIME) : null;
GaaSObservabilityEventExperimental.Builder builder = GaaSObservabilityEventExperimental.newBuilder();
List<Issue> issueList = null;
try {
@@ -105,19 +108,20 @@ public abstract class GaaSObservabilityEventProducer implements Closeable {
builder.setTimestamp(System.currentTimeMillis())
.setFlowName(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD))
.setFlowGroup(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD))
+ .setFlowGraphEdgeId(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, ""))
.setFlowExecutionId(jobState.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))
+ .setLastFlowModificationTime(jobState.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_MODIFICATION_TIME_FIELD, 0))
.setJobName(jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD))
.setExecutorUrl(jobState.getProp(TimingEvent.METADATA_MESSAGE))
+ .setExecutorId(jobState.getProp(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, ""))
.setJobStartTime(jobStartTime)
.setJobEndTime(jobEndTime)
+ .setJobOrchestratedTime(jobOrchestratedTime)
+ .setJobPlanningPhaseStartTime(jobPlanningPhaseStartTime)
+ .setJobPlanningPhaseEndTime(jobPlanningPhaseEndTime)
.setIssues(issueList)
.setJobStatus(status)
- // TODO: Populate the below fields in a separate PR
- .setExecutionUserUrn(null)
- .setExecutorId("")
- .setLastFlowModificationTime(0)
- .setFlowGraphEdgeId("")
- .setJobOrchestratedTime(null); // TODO: Investigate why TimingEvent.JOB_ORCHESTRATED_TIME is never propagated to the JobStatus
+ .setExecutionUserUrn(jobState.getProp(AzkabanProjectConfig.USER_TO_PROXY, null));
return builder.build();
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index 20328c36d..23de601a1 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -127,6 +127,11 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
case TimingEvent.FlowTimings.FLOW_COMPILED:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPILED.name());
break;
+ case TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION:
+ properties.put(TimingEvent.WORKUNIT_PLAN_START_TIME, properties.getProperty(TimingEvent.METADATA_START_TIME));
+ properties.put(TimingEvent.WORKUNIT_PLAN_END_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
+ break;
+ case TimingEvent.LauncherTimings.JOB_START:
case TimingEvent.FlowTimings.FLOW_RUNNING:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
break;
@@ -141,11 +146,9 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.ORCHESTRATED.name());
properties.put(TimingEvent.JOB_ORCHESTRATED_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
break;
- case TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION:
case TimingEvent.LauncherTimings.JOB_PREPARE:
- case TimingEvent.LauncherTimings.JOB_START:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
- properties.put(TimingEvent.JOB_START_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
+ properties.put(TimingEvent.JOB_START_TIME, properties.getProperty(TimingEvent.METADATA_START_TIME));
break;
case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
case TimingEvent.LauncherTimings.JOB_SUCCEEDED:
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
index 7463a2565..83cb05f02 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -77,6 +77,7 @@ public class JobExecutionPlanDagFactoryTest {
properties = new Properties();
properties.put("specStore.fs.dir", "/tmp/testSpecStoreDir");
properties.put("specExecInstance.capabilities", "source:destination");
+ properties.put(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, "testSpecExecutorInstanceUri");
Config specExecutorConfig = ConfigUtils.propertiesToConfig(properties);
this.specExecutor = new InMemorySpecExecutor(specExecutorConfig);
}
@@ -89,7 +90,8 @@ public class JobExecutionPlanDagFactoryTest {
Config config = jobTemplate.getRawTemplateConfig()
.withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef("testFlowName"))
.withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef("testFlowGroup"))
- .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+ .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(System.currentTimeMillis()))
+ .withValue(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, ConfigValueFactory.fromAnyRef("source:destination:edgeName1"));
String jobSpecUri = Files.getNameWithoutExtension(new Path(jobTemplate.getUri()).getName());
jobExecutionPlans.add(new JobExecutionPlan(JobSpec.builder(jobSpecUri).withConfig(config).
@@ -111,6 +113,10 @@ public class JobExecutionPlanDagFactoryTest {
Assert.assertEquals(endNodeName, "job4");
templateUri = new Path(dag.getEndNodes().get(0).getValue().getJobSpec().getTemplateURI().get()).getName();
Assert.assertEquals(templateUri, "job4.job");
+ String flowEdgeId = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY);
+ Assert.assertEquals(flowEdgeId, "source:destination:edgeName1");
+ String specExecutorId = dag.getStartNodes().get(0).getValue().getSpecExecutor().getUri().toString();
+ Assert.assertEquals(specExecutorId, "testSpecExecutorInstanceUri");
Dag.DagNode<JobExecutionPlan> startNode = dag.getStartNodes().get(0);
List<Dag.DagNode<JobExecutionPlan>> nextNodes = dag.getChildren(startNode);
@@ -227,7 +233,28 @@ public class JobExecutionPlanDagFactoryTest {
Dag<JobExecutionPlan> dag1 = new JobExecutionPlanDagFactory().createDag(Arrays.asList(jobExecutionPlan));
Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY).length(), 142);
+ }
+
+ @Test
+ public void testCreateJobSpecAdditionalProps() throws Exception {
+ long currentTime = System.currentTimeMillis();
+ Config flowConfig = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup")
+ .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * * ?")
+ .addPrimitive(FlowSpec.MODIFICATION_TIME_KEY, currentTime).build();
+
+ Config jobConfig = ConfigBuilder.create()
+ .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName1")
+ .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * * ?").build();
+
+ FlowSpec flowSpec = FlowSpec.builder("testFlowSpec").withConfig(flowConfig).build();
+ JobExecutionPlan jobExecutionPlan = new JobExecutionPlan.Factory().createPlan(flowSpec, jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
+ ConfigValueFactory.fromAnyRef("testUri")), new InMemorySpecExecutor(ConfigFactory.empty()), 0L, ConfigFactory.empty());
+
+ Dag<JobExecutionPlan> dag1 = new JobExecutionPlanDagFactory().createDag(Arrays.asList(jobExecutionPlan));
+ Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getLong(FlowSpec.MODIFICATION_TIME_KEY), currentTime);
+ Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EDGE_ID_KEY), "source:destination:edgeName1");
}
}
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
index e97d56524..2055d455e 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
@@ -41,6 +41,7 @@ import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
public class GaaSObservabilityProducerTest {
@@ -48,7 +49,7 @@ public class GaaSObservabilityProducerTest {
private MultiContextIssueRepository issueRepository = new InMemoryMultiContextIssueRepository();
@Test
- public void testCreateGaaSObservabilityEvent() throws Exception {
+ public void testCreateGaaSObservabilityEventWithFullMetadata() throws Exception {
String flowGroup = "testFlowGroup1";
String flowName = "testFlowName1";
String jobName = String.format("%s_%s_%s", flowGroup, flowName, "testJobName1");
@@ -61,13 +62,18 @@ public class GaaSObservabilityProducerTest {
Map<String, String> gteEventMetadata = Maps.newHashMap();
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
- gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, "1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, flowExecutionId);
gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, flowName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, "flowEdge");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, "specExecutor");
+ gteEventMetadata.put(AzkabanProjectConfig.USER_TO_PROXY, "azkabanUser");
gteEventMetadata.put(TimingEvent.METADATA_MESSAGE, "hostName");
- gteEventMetadata.put(TimingEvent.METADATA_START_TIME, "1");
- gteEventMetadata.put(TimingEvent.METADATA_END_TIME, "100");
+ gteEventMetadata.put(TimingEvent.JOB_START_TIME, "20");
+ gteEventMetadata.put(TimingEvent.JOB_END_TIME, "100");
gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPLETE.name());
+ gteEventMetadata.put(TimingEvent.JOB_ORCHESTRATED_TIME, "1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_MODIFICATION_TIME_FIELD, "20");
Properties jobStatusProps = new Properties();
jobStatusProps.putAll(gteEventMetadata);
@@ -85,6 +91,62 @@ public class GaaSObservabilityProducerTest {
Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED);
Assert.assertEquals(event.getExecutorUrl(), "hostName");
Assert.assertEquals(event.getIssues().size(), 1);
+ Assert.assertEquals(event.getFlowGraphEdgeId(), "flowEdge");
+ Assert.assertEquals(event.getExecutorId(), "specExecutor");
+ Assert.assertEquals(event.getExecutionUserUrn(), "azkabanUser");
+ Assert.assertEquals(event.getJobOrchestratedTime(), Long.valueOf(1));
+ Assert.assertEquals(event.getLastFlowModificationTime(), Long.valueOf(20));
+ Assert.assertEquals(event.getJobStartTime(), Long.valueOf(20));
+ Assert.assertEquals(event.getJobEndTime(), Long.valueOf(100));
+
+ AvroSerializer<GaaSObservabilityEventExperimental> serializer = new AvroBinarySerializer<>(
+ GaaSObservabilityEventExperimental.SCHEMA$, new NoopSchemaVersionWriter()
+ );
+ serializer.serializeRecord(event);
+ }
+
+ @Test
+ public void testCreateGaaSObservabilityEventWithPartialMetadata() throws Exception {
+ String flowGroup = "testFlowGroup2";
+ String flowName = "testFlowName2";
+ String jobName = String.format("%s_%s_%s", flowGroup, flowName, "testJobName1");
+ String flowExecutionId = "1";
+ this.issueRepository.put(
+ TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, flowExecutionId, jobName),
+ createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+ );
+ MockGaaSObservabilityEventProducer producer = new MockGaaSObservabilityEventProducer(new State(), this.issueRepository);
+ Map<String, String> gteEventMetadata = Maps.newHashMap();
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, "1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, flowName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, "flowEdge");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, "specExecutor");
+ gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.CANCELLED.name());
+
+ Properties jobStatusProps = new Properties();
+ jobStatusProps.putAll(gteEventMetadata);
+ producer.emitObservabilityEvent(new State(jobStatusProps));
+
+ List<GaaSObservabilityEventExperimental> emittedEvents = producer.getTestEmittedEvents();
+
+ Assert.assertEquals(emittedEvents.size(), 1);
+ Iterator<GaaSObservabilityEventExperimental> iterator = emittedEvents.iterator();
+ GaaSObservabilityEventExperimental event = iterator.next();
+ Assert.assertEquals(event.getFlowGroup(), flowGroup);
+ Assert.assertEquals(event.getFlowName(), flowName);
+ Assert.assertEquals(event.getJobName(), jobName);
+ Assert.assertEquals(event.getFlowExecutionId(), Long.valueOf(flowExecutionId));
+ Assert.assertEquals(event.getJobStatus(), JobStatus.CANCELLED);
+ Assert.assertEquals(event.getIssues().size(), 1);
+ Assert.assertEquals(event.getFlowGraphEdgeId(), "flowEdge");
+ Assert.assertEquals(event.getExecutorId(), "specExecutor");
+ Assert.assertEquals(event.getJobOrchestratedTime(), null);
+ Assert.assertEquals(event.getJobStartTime(), null);
+ Assert.assertEquals(event.getExecutionUserUrn(), null);
+ Assert.assertEquals(event.getExecutorUrl(), null);
AvroSerializer<GaaSObservabilityEventExperimental> serializer = new AvroBinarySerializer<>(
GaaSObservabilityEventExperimental.SCHEMA$, new NoopSchemaVersionWriter()