You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "Will-Lo (via GitHub)" <gi...@apache.org> on 2023/02/14 19:23:57 UTC

[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3641: [GOBBLIN-1784] Only clean dags from the dag manager if a flow event is received

Will-Lo commented on code in PR #3641:
URL: https://github.com/apache/gobblin/pull/3641#discussion_r1106274419


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -1117,67 +1107,90 @@ private boolean hasRunningJobs(String dagId) {
      * Perform clean up. Remove a dag from the dagstore if the dag is complete and update internal state.
      */
     private void cleanUp() {
-      List<String> dagIdstoClean = new ArrayList<>();
-      //Clean up failed dags
-      for (String dagId : this.failedDagIdsFinishRunning) {
-        //Skip monitoring of any other jobs of the failed dag.
-        LinkedList<DagNode<JobExecutionPlan>> dagNodeList = this.dagToJobs.get(dagId);
-        while (!dagNodeList.isEmpty()) {
-          DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
-          deleteJobState(dagId, dagNode);
-        }
-        Dag<JobExecutionPlan> dag = this.dags.get(dagId);
-        String status = TimingEvent.FlowTimings.FLOW_FAILED;
-        addFailedDag(dagId, dag);
-        log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", dagId, status);
-        // send an event before cleaning up dag
-        DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dags.get(dagId), status);
-        dagIdstoClean.add(dagId);
-      }
+      // Approximate the time when the flow events are emitted to account for delay when the flow event is received by the job monitor
+      long cleanUpProcessingTime = System.currentTimeMillis();
 
       // Remove dags that are finished and emit their appropriate metrics
       for (Map.Entry<String, Dag<JobExecutionPlan>> dagIdKeyPair : this.dags.entrySet()) {
         String dagId = dagIdKeyPair.getKey();
+        // On service restart, we repopulate the dags that are waiting to be cleaned up
+        if (dagIdstoClean.contains(dagId)) {
+          continue;
+        }
         Dag<JobExecutionPlan> dag = dagIdKeyPair.getValue();
-        if (!hasRunningJobs(dagId) && !this.failedDagIdsFinishRunning.contains(dagId)) {
-          String status = TimingEvent.FlowTimings.FLOW_SUCCEEDED;
-          if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
-            status = TimingEvent.FlowTimings.FLOW_FAILED;
-            addFailedDag(dagId, dag);
-            this.failedDagIdsFinishAllPossible.remove(dagId);
+        if ((TimingEvent.FlowTimings.FLOW_FAILED.equals(dag.getFlowEvent()) || TimingEvent.FlowTimings.FLOW_CANCELLED.equals(dag.getFlowEvent())) &&
+            DagManagerUtils.getFailureOption(dag) == FailureOption.FINISH_RUNNING) {
+          //Skip monitoring of any other jobs of the failed dag.
+          LinkedList<DagNode<JobExecutionPlan>> dagNodeList = this.dagToJobs.get(dagId);
+          while (!dagNodeList.isEmpty()) {
+            DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
+            deleteJobState(dagId, dagNode);
+          }
+        }
+        if (!hasRunningJobs(dagId)) {
+          // Collect all the dagIds that are finished
+          this.dagIdstoClean.add(dagId);
+          if (dag.getFlowEvent() == null) {
+            // If the dag flow event is not set, then it is successful
+            dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_SUCCEEDED);
           } else {
-            dagManagerMetrics.emitFlowSuccessMetrics(DagManagerUtils.getFlowId(this.dags.get(dagId)));
+            addFailedDag(dagId, dag);
           }
-          log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", dagId, status);
+          log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", dagId, dag.getFlowEvent());
           // send an event before cleaning up dag
-          DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dags.get(dagId), status);
-          dagIdstoClean.add(dagId);
+          DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dags.get(dagId), dag.getFlowEvent());
+          dag.setEventEmittedTimeMillis(cleanUpProcessingTime);
         }
       }
 
-      for (String dagId: dagIdstoClean) {
-        cleanUpDag(dagId);
+      // Only clean up dags after the job status monitor processed the flow event
+      Set<String> cleanedDags = new HashSet<>();
+      for (String dagId: this.dagIdstoClean) {
+        Dag<JobExecutionPlan> dag = this.dags.get(dagId);
+        JobStatus flowStatus = pollFlowStatus(dag);
+        if (flowStatus != null && FlowStatusGenerator.FINISHED_STATUSES.contains(flowStatus.getEventName())) {
+          FlowId flowId = DagManagerUtils.getFlowId(dag);
+          switch(dag.getFlowEvent()) {
+            case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
+              this.dagManagerMetrics.emitFlowSuccessMetrics(flowId);
+              this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId, FlowState.SUCCESSFUL);
+              break;
+            case TimingEvent.FlowTimings.FLOW_FAILED:
+              this.dagManagerMetrics.emitFlowFailedMetrics(flowId);
+              this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId, FlowState.FAILED);
+              break;
+            case TimingEvent.FlowTimings.FLOW_CANCELLED:
+              this.dagManagerMetrics.emitFlowSlaExceededMetrics(flowId);
+              this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId, FlowState.FAILED);
+              break;
+            default:
+              log.warn("Unexpected flow event {} for dag {}", dag.getFlowEvent(), dagId);
+          }
+          cleanUpDag(dagId);
+          cleanedDags.add(dagId);

Review Comment:
   There's a concurrent modification exception due to the way how the dag Ids are stored, could store them in a map instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org