You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/20 05:49:28 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-860] Process
flow-level events for setting/retrieving flow status
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 59f3bee [GOBBLIN-860] Process flow-level events for setting/retrieving flow status
59f3bee is described below
commit 59f3beeea1e62dcc1f3901f082e46bc3d237f9fb
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Mon Aug 19 22:49:20 2019 -0700
[GOBBLIN-860] Process flow-level events for setting/retrieving flow status
Closes #2715 from jack-moseley/flow-level-events
---
.../apache/gobblin/metrics/event/TimingEvent.java | 2 +-
.../org/apache/gobblin/service/FlowStatusTest.java | 49 +++++++++-----
.../apache/gobblin/service/FlowStatusResource.java | 74 ++++++++--------------
.../service/modules/orchestration/DagManager.java | 11 ++--
.../modules/orchestration/DagManagerUtils.java | 10 +--
.../modules/orchestration/TimingEventUtils.java | 9 ++-
.../service/monitoring/FsJobStatusRetriever.java | 16 +----
.../monitoring/KafkaAvroJobStatusMonitor.java | 7 +-
.../monitoring/MysqlJobStatusRetriever.java | 14 ++--
9 files changed, 87 insertions(+), 105 deletions(-)
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 044eba2..d09629a 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -67,9 +67,9 @@ public class TimingEvent extends GobblinEventBuilder implements Closeable {
public static class FlowTimings {
public static final String FLOW_COMPILED = "FlowCompiled";
public static final String FLOW_COMPILE_FAILED = "FlowCompileFailed";
- public static final String FLOW_CANCEL = "FlowCancelled";
public static final String FLOW_SUCCEEDED = "FlowSucceeded";
public static final String FLOW_FAILED = "FlowFailed";
+ public static final String FLOW_RUNNING = "FlowRunning";
}
public static class FlowEventConstants {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index c908b83..6a86607 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -121,12 +121,18 @@ public class FlowStatusTest {
.flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 1")
.processedCount(100).jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").build();
+ org.apache.gobblin.service.monitoring.JobStatus fs1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
+ .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(5000L)
+ .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).build();
org.apache.gobblin.service.monitoring.JobStatus js2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
.flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(2000L).endTime(6000L)
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Test message 2")
.processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
- List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList1 = Lists.newArrayList(js1);
- List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList2 = Lists.newArrayList(js2);
+ org.apache.gobblin.service.monitoring.JobStatus fs2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
+ .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
+ .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).build();
+ List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList1 = Lists.newArrayList(js1, fs1);
+ List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList2 = Lists.newArrayList(js2, fs2);
_listOfJobStatusLists = Lists.newArrayList();
_listOfJobStatusLists.add(jobStatusList1);
_listOfJobStatusLists.add(jobStatusList2);
@@ -136,14 +142,14 @@ public class FlowStatusTest {
Assert.assertEquals(flowStatus.getId().getFlowGroup(), "fgroup1");
Assert.assertEquals(flowStatus.getId().getFlowName(), "flow1");
- Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(), 2000L);
- Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 6000L);
+ Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(), 1L);
+ Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 7000L);
Assert.assertEquals(flowStatus.getMessage(), js2.getMessage());
Assert.assertEquals(flowStatus.getExecutionStatus(), ExecutionStatus.COMPLETE);
JobStatusArray jobStatuses = flowStatus.getJobStatuses();
- Assert.assertEquals(jobStatusList2.size(), jobStatuses.size());
+ Assert.assertEquals(jobStatusList2.size(), jobStatuses.size() + 1);
for (int i = 0; i < jobStatuses.size(); i++) {
org.apache.gobblin.service.monitoring.JobStatus mjs = jobStatusList2.get(i);
@@ -172,7 +178,10 @@ public class FlowStatusTest {
.flowName("flow1").jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 2")
.processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
- List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2);
+ org.apache.gobblin.service.monitoring.JobStatus fs1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
+ .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
+ .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).build();
+ List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2, fs1);
_listOfJobStatusLists = Lists.newArrayList();
_listOfJobStatusLists.add(jobStatusList);
@@ -181,14 +190,14 @@ public class FlowStatusTest {
Assert.assertEquals(flowStatus.getId().getFlowGroup(), "fgroup1");
Assert.assertEquals(flowStatus.getId().getFlowName(), "flow1");
- Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(), 1000L);
- Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 6000L);
+ Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(), 0L);
+ Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 7000L);
Assert.assertEquals(flowStatus.getMessage(), messageJoiner.join(js1.getMessage(), js2.getMessage()));
Assert.assertEquals(flowStatus.getExecutionStatus(), ExecutionStatus.COMPLETE);
JobStatusArray jobStatuses = flowStatus.getJobStatuses();
- Assert.assertEquals(jobStatusList.size(), jobStatuses.size());
+ Assert.assertEquals(jobStatusList.size(), jobStatuses.size() + 1);
for (int i = 0; i < jobStatuses.size(); i++) {
org.apache.gobblin.service.monitoring.JobStatus mjs = jobStatusList.get(i);
@@ -212,7 +221,10 @@ public class FlowStatusTest {
.flowName("flow1").jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 2")
.processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
- List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2);
+ org.apache.gobblin.service.monitoring.JobStatus fs1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
+ .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY)
+ .eventName(ExecutionStatus.RUNNING.name()).flowExecutionId(0).build();
+ List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2, fs1);
_listOfJobStatusLists = Lists.newArrayList();
_listOfJobStatusLists.add(jobStatusList);
@@ -221,14 +233,14 @@ public class FlowStatusTest {
Assert.assertEquals(flowStatus.getId().getFlowGroup(), "fgroup1");
Assert.assertEquals(flowStatus.getId().getFlowName(), "flow1");
- Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(), 1000L);
- Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 6000L);
+ Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(), 0L);
+ Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 0L);
Assert.assertEquals(flowStatus.getMessage(), messageJoiner.join(js1.getMessage(), js2.getMessage()));
Assert.assertEquals(flowStatus.getExecutionStatus(), ExecutionStatus.RUNNING);
JobStatusArray jobStatuses = flowStatus.getJobStatuses();
- Assert.assertEquals(jobStatusList.size(), jobStatuses.size());
+ Assert.assertEquals(jobStatusList.size(), jobStatuses.size() + 1);
for (int i = 0; i < jobStatuses.size(); i++) {
org.apache.gobblin.service.monitoring.JobStatus mjs = jobStatusList.get(i);
@@ -252,7 +264,10 @@ public class FlowStatusTest {
.flowName("flow1").jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
.eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).message("Test message 2")
.processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
- List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2);
+ org.apache.gobblin.service.monitoring.JobStatus fs1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
+ .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
+ .eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).build();
+ List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2, fs1);
_listOfJobStatusLists = Lists.newArrayList();
_listOfJobStatusLists.add(jobStatusList);
@@ -261,14 +276,14 @@ public class FlowStatusTest {
Assert.assertEquals(flowStatus.getId().getFlowGroup(), "fgroup1");
Assert.assertEquals(flowStatus.getId().getFlowName(), "flow1");
- Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(), 1000L);
- Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 6000L);
+ Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(), 0L);
+ Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(), 7000L);
Assert.assertEquals(flowStatus.getMessage(), messageJoiner.join(js1.getMessage(), js2.getMessage()));
Assert.assertEquals(flowStatus.getExecutionStatus(), ExecutionStatus.FAILED);
JobStatusArray jobStatuses = flowStatus.getJobStatuses();
- Assert.assertEquals(jobStatusList.size(), jobStatuses.size());
+ Assert.assertEquals(jobStatusList.size(), jobStatuses.size() + 1);
for (int i = 0; i < jobStatuses.size(); i++) {
org.apache.gobblin.service.monitoring.JobStatus mjs = jobStatusList.get(i);
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index 78f28f8..70957c6 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -37,6 +37,7 @@ import com.linkedin.restli.server.annotations.RestLiCollection;
import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
/**
@@ -106,14 +107,22 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
JobStatusArray jobStatusArray = new JobStatusArray();
FlowId flowId = new FlowId().setFlowName(monitoringFlowStatus.getFlowName())
.setFlowGroup(monitoringFlowStatus.getFlowGroup());
- long flowStartTime = Long.MAX_VALUE;
- long flowEndTime = -1L;
- // flow execution status is complete unless job status indicates it is running or failed
- ExecutionStatus flowExecutionStatus = ExecutionStatus.COMPLETE;
+
+ long flowEndTime = 0L;
+ ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;
+
StringBuffer flowMessagesStringBuffer = new StringBuffer();
while (jobStatusIter.hasNext()) {
org.apache.gobblin.service.monitoring.JobStatus queriedJobStatus = jobStatusIter.next();
+
+ // Check if this is the flow status instead of a single job status
+ if (isFlowStatus(queriedJobStatus)) {
+ flowEndTime = queriedJobStatus.getEndTime();
+ flowExecutionStatus = ExecutionStatus.valueOf(queriedJobStatus.getEventName());
+ continue;
+ }
+
JobStatus jobStatus = new JobStatus();
jobStatus.setFlowId(flowId)
@@ -130,22 +139,10 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
jobStatusArray.add(jobStatus);
- if (queriedJobStatus.getStartTime() < flowStartTime){
- flowStartTime = queriedJobStatus.getStartTime();
- }
-
- // TODO: end time should be left as -1 if not all jobs have started for the flow
- // need to have flow job count to determine this
- if (queriedJobStatus.getEndTime() > flowEndTime){
- flowEndTime = queriedJobStatus.getEndTime();
- }
-
if (!queriedJobStatus.getMessage().isEmpty()) {
flowMessagesStringBuffer.append(queriedJobStatus.getMessage());
flowMessagesStringBuffer.append(MESSAGE_SEPARATOR);
}
-
- flowExecutionStatus = updatedFlowExecutionStatus(jobStatus.getExecutionStatus(), flowExecutionStatus);
}
String flowMessages = flowMessagesStringBuffer.length() > 0 ?
@@ -155,7 +152,7 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
return new FlowStatus()
.setId(new FlowStatusId().setFlowGroup(flowId.getFlowGroup()).setFlowName(flowId.getFlowName())
.setFlowExecutionId(monitoringFlowStatus.getFlowExecutionId()))
- .setExecutionStatistics(new FlowStatistics().setExecutionStartTime(flowStartTime)
+ .setExecutionStatistics(new FlowStatistics().setExecutionStartTime(getFlowStartTime(monitoringFlowStatus))
.setExecutionEndTime(flowEndTime))
.setMessage(flowMessages)
.setExecutionStatus(flowExecutionStatus)
@@ -163,40 +160,19 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
}
/**
- * Determines the new flow status based on the current flow status and new job status
- * @param jobExecutionStatus job status
- * @param currentFlowExecutionStatus current flow status
- * @return updated flow status
+ * Check if a {@link org.apache.gobblin.service.monitoring.JobStatus} is the special job status that represents the
+ * entire flow's status
*/
- static ExecutionStatus updatedFlowExecutionStatus(ExecutionStatus jobExecutionStatus,
- ExecutionStatus currentFlowExecutionStatus) {
-
- // if any job failed or flow has failed then return failed status
- if (currentFlowExecutionStatus == ExecutionStatus.FAILED ||
- jobExecutionStatus == ExecutionStatus.FAILED) {
- return ExecutionStatus.FAILED;
- }
-
- // if any job is cancelled or flow has failed then return failed status
- if (currentFlowExecutionStatus == ExecutionStatus.CANCELLED ||
- jobExecutionStatus == ExecutionStatus.CANCELLED) {
- return ExecutionStatus.CANCELLED;
- }
-
- if (currentFlowExecutionStatus == ExecutionStatus.COMPLETE &&
- jobExecutionStatus == ExecutionStatus.PENDING) {
- return ExecutionStatus.PENDING;
- }
-
- if (currentFlowExecutionStatus == ExecutionStatus.RUNNING ||
- jobExecutionStatus == ExecutionStatus.RUNNING ||
- jobExecutionStatus == ExecutionStatus.ORCHESTRATED ||
- jobExecutionStatus == ExecutionStatus.COMPILED ||
- jobExecutionStatus == ExecutionStatus.PENDING) {
- return ExecutionStatus.RUNNING;
- }
+ private static boolean isFlowStatus(org.apache.gobblin.service.monitoring.JobStatus jobStatus) {
+ return jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY) && jobStatus.getJobGroup().equals(JobStatusRetriever.NA_KEY);
+ }
- return currentFlowExecutionStatus;
+ /**
+ * Return the flow start time given a {@link org.apache.gobblin.service.monitoring.FlowStatus}. Flow execution ID is
+ * assumed to be the flow start time.
+ */
+ private static long getFlowStartTime(org.apache.gobblin.service.monitoring.FlowStatus flowStatus) {
+ return flowStatus.getFlowExecutionId();
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 1ae4fb7..2bf3ca2 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -476,6 +476,10 @@ public class DagManager extends AbstractIdleService {
for (DagNode dagNode: nextSubmitted.get(dagId)) {
addJobState(dagId, dagNode);
}
+
+ // Set flow status to running
+ DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, TimingEvent.FlowTimings.FLOW_RUNNING);
+
log.info("Dag {} Initialization complete.", DagManagerUtils.getFullyQualifiedDagName(dag));
}
@@ -618,6 +622,7 @@ public class DagManager extends AbstractIdleService {
synchronized Map<String, Set<DagNode<JobExecutionPlan>>> submitNext(String dagId) throws IOException {
Dag<JobExecutionPlan> dag = this.dags.get(dagId);
Set<DagNode<JobExecutionPlan>> nextNodes = DagManagerUtils.getNext(dag);
+
//Submit jobs from the dag ready for execution.
for (DagNode<JobExecutionPlan> dagNode : nextNodes) {
submitJob(dagNode);
@@ -758,8 +763,7 @@ public class DagManager extends AbstractIdleService {
}
log.info("Dag {} has finished with status FAILED; Cleaning up dag from the state store.", dagId);
// send an event before cleaning up dag
- JobExecutionPlan jobExecutionPlan = this.dags.get(dagId).getNodes().get(0).getValue();
- DagManagerUtils.emitFlowEvent(this.eventSubmitter, jobExecutionPlan, TimingEvent.FlowTimings.FLOW_FAILED);
+ DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dags.get(dagId), TimingEvent.FlowTimings.FLOW_FAILED);
dagIdstoClean.add(dagId);
}
@@ -773,8 +777,7 @@ public class DagManager extends AbstractIdleService {
}
log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", dagId, status);
// send an event before cleaning up dag
- JobExecutionPlan jobExecutionPlan = this.dags.get(dagId).getNodes().get(0).getValue();
- DagManagerUtils.emitFlowEvent(this.eventSubmitter, jobExecutionPlan, status);
+ DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dags.get(dagId), status);
dagIdstoClean.add(dagId);
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 5ab74e8..47caf6f 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -246,10 +246,12 @@ public class DagManagerUtils {
return (int) (flowExecutionId % numThreads);
}
- static void emitFlowEvent(Optional<EventSubmitter> eventSubmitter, JobExecutionPlan jobExecutionPlan, String flowEvent) {
- if (eventSubmitter.isPresent()) {
- Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
- eventSubmitter.get().getTimingEvent(flowEvent).stop(jobMetadata);
+ static void emitFlowEvent(Optional<EventSubmitter> eventSubmitter, Dag<JobExecutionPlan> dag, String flowEvent) {
+ if (eventSubmitter.isPresent() && !dag.isEmpty()) {
+ // Every dag node will contain the same flow metadata
+ Config config = dag.getNodes().get(0).getValue().getJobSpec().getConfig();
+ Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(config);
+ eventSubmitter.get().getTimingEvent(flowEvent).stop(flowMetadata);
}
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
index 6eb1a31..afe4820 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -31,12 +31,15 @@ import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
class TimingEventUtils {
static Map<String, String> getFlowMetadata(FlowSpec flowSpec) {
+ return getFlowMetadata(flowSpec.getConfig());
+ }
+
+ static Map<String, String> getFlowMetadata(Config flowConfig) {
Map<String, String> metadata = Maps.newHashMap();
- metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY));
- metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY));
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY));
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY));
- Config flowConfig = flowSpec.getConfig();
if (flowConfig.hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, flowConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index f35e076..fba549f 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -76,9 +76,7 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
if (jobStates.isEmpty()) {
return Iterators.emptyIterator();
}
- if (!shouldFilterJobStatus(tableNames, tableName)) {
- jobStatuses.add(getJobStatus(jobStates.get(0)));
- }
+ jobStatuses.add(getJobStatus(jobStates.get(0)));
}
return jobStatuses.iterator();
} catch (IOException e) {
@@ -131,16 +129,4 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
return null;
}
}
-
- /**
- * A helper method to determine if {@link JobStatus}es for jobs without a jobGroup/jobName should be filtered out.
- * Once a job has been orchestrated, {@link JobStatus}es without a jobGroup/jobName can be filtered out.
- * @param tableNames
- * @param tableName
- * @return
- */
- private boolean shouldFilterJobStatus(List<String> tableNames, String tableName) {
- return tableNames.size() > 1 && JobStatusRetriever.NA_KEY
- .equals(Splitter.on(KafkaJobStatusMonitor.STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(1));
- }
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index e8611cf..e021af0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -123,8 +123,8 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
case TimingEvent.FlowTimings.FLOW_COMPILED:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPILED.name());
break;
- case TimingEvent.FlowTimings.FLOW_COMPILE_FAILED:
- properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.FAILED.name());
+ case TimingEvent.FlowTimings.FLOW_RUNNING:
+ properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
break;
case TimingEvent.LauncherTimings.JOB_PENDING:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.PENDING.name());
@@ -136,10 +136,13 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
properties.put(TimingEvent.JOB_START_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
break;
+ case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
case TimingEvent.LauncherTimings.JOB_SUCCEEDED:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPLETE.name());
properties.put(TimingEvent.JOB_END_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
break;
+ case TimingEvent.FlowTimings.FLOW_FAILED:
+ case TimingEvent.FlowTimings.FLOW_COMPILE_FAILED:
case TimingEvent.LauncherTimings.JOB_FAILED:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.FAILED.name());
properties.put(TimingEvent.JOB_END_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index 627e421..56289bd 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -56,7 +56,7 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
try {
List<State> jobStatusStates = this.stateStore.getAll(storeName, flowExecutionId);
- return filterAndGetJobStatuses(jobStatusStates);
+ return getJobStatuses(jobStatusStates);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -70,7 +70,7 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
try {
List<State> jobStatusStates = this.stateStore.getAll(storeName, tableName);
- return filterAndGetJobStatuses(jobStatusStates);
+ return getJobStatuses(jobStatusStates);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -91,13 +91,7 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
}
}
- private Iterator<JobStatus> filterAndGetJobStatuses(List<State> jobStatusStates) {
- int totalJobStatuses = jobStatusStates.size();
-
- return jobStatusStates.stream()
- .filter(state -> !(totalJobStatuses > 1 && state.getProp(JobStatusRetriever.EVENT_NAME_FIELD)
- .equals(ExecutionStatus.COMPILED.name())))
- .map(this::getJobStatus)
- .iterator();
+ private Iterator<JobStatus> getJobStatuses(List<State> jobStatusStates) {
+ return jobStatusStates.stream().map(this::getJobStatus).iterator();
}
}