You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/01/08 22:10:01 UTC

[jira] [Work logged] (GOBBLIN-1342) Add API to resume a flow in gaas

     [ https://issues.apache.org/jira/browse/GOBBLIN-1342?focusedWorklogId=533266&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-533266 ]

ASF GitHub Bot logged work on GOBBLIN-1342:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Jan/21 22:09
            Start Date: 08/Jan/21 22:09
    Worklog Time Spent: 10m 
      Work Description: sv2000 commented on a change in pull request #3179:
URL: https://github.com/apache/incubator-gobblin/pull/3179#discussion_r554219344



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -462,6 +501,64 @@ public void run() {
       }
     }
 
+    /**
+     * Begin resuming a dag by setting the status of both the dag and the failed/cancelled dag nodes to {@link ExecutionStatus#PENDING_RESUME},
+     * and also sending events so that this status will be reflected in the job status state store.
+     */
+    private void beginResumingDag(String dagId) {
+      Dag<JobExecutionPlan> dag = this.failedDags.get(dagId);
+      if (dag == null) {
+        log.warn("No dag found with dagId " + dagId + ", so cannot resume flow");
+        return;
+      }
+
+      long flowResumeTime = System.currentTimeMillis();
+
+      // Set the flow and it's failed or cancelled nodes to PENDING_RESUME so that the flow will be resumed from the point before it failed
+      DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, TimingEvent.FlowTimings.FLOW_PENDING_RESUME);
+      for (DagNode<JobExecutionPlan> node : dag.getNodes()) {
+        ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
+        if (executionStatus.equals(FAILED) || executionStatus.equals(CANCELLED)) {
+          node.getValue().setExecutionStatus(PENDING_RESUME);
+        }
+        Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
+        this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
+
+        // Set flowStartTime so that flow SLA will be based on current time instead of original flow
+        node.getValue().setFlowStartTime(flowResumeTime);
+      }
+
+      this.resumingDags.put(dagId, dag);
+    }
+
+    /**
+     * Finish resuming dags by first verifying the status is correct (flow should be {@link ExecutionStatus#PENDING_RESUME}
+     * and jobs should not be {@link ExecutionStatus#FAILED} or {@link ExecutionStatus#CANCELLED}) and then calling
+     * {@link #initialize}. This is separated from {@link #beginResumingDag} because it could take some time for the
+     * job status state store to reflect the updated status.
+     */
+    private void finishResumingDags() throws IOException {
+      for (Map.Entry<String, Dag<JobExecutionPlan>> dag : this.resumingDags.entrySet()) {
+        JobStatus flowStatus = pollFlowStatus(dag.getValue());
+        if (!flowStatus.getEventName().equals(PENDING_RESUME.name())) {
+          return;
+        }
+
+        for (DagNode<JobExecutionPlan> node : dag.getValue().getNodes()) {
+          JobStatus jobStatus = pollJobStatus(node);
+          if (jobStatus == null || jobStatus.getEventName().equals(FAILED.name()) || jobStatus.getEventName().equals(CANCELLED.name())) {
+            return;

Review comment:
       Same comment as earlier.

##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -462,6 +501,64 @@ public void run() {
       }
     }
 
+    /**
+     * Begin resuming a dag by setting the status of both the dag and the failed/cancelled dag nodes to {@link ExecutionStatus#PENDING_RESUME},
+     * and also sending events so that this status will be reflected in the job status state store.
+     */
+    private void beginResumingDag(String dagId) {
+      Dag<JobExecutionPlan> dag = this.failedDags.get(dagId);
+      if (dag == null) {
+        log.warn("No dag found with dagId " + dagId + ", so cannot resume flow");
+        return;
+      }
+
+      long flowResumeTime = System.currentTimeMillis();
+
+      // Set the flow and it's failed or cancelled nodes to PENDING_RESUME so that the flow will be resumed from the point before it failed
+      DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, TimingEvent.FlowTimings.FLOW_PENDING_RESUME);
+      for (DagNode<JobExecutionPlan> node : dag.getNodes()) {
+        ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
+        if (executionStatus.equals(FAILED) || executionStatus.equals(CANCELLED)) {
+          node.getValue().setExecutionStatus(PENDING_RESUME);
+        }
+        Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
+        this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
+
+        // Set flowStartTime so that flow SLA will be based on current time instead of original flow
+        node.getValue().setFlowStartTime(flowResumeTime);
+      }
+
+      this.resumingDags.put(dagId, dag);
+    }
+
+    /**
+     * Finish resuming dags by first verifying the status is correct (flow should be {@link ExecutionStatus#PENDING_RESUME}
+     * and jobs should not be {@link ExecutionStatus#FAILED} or {@link ExecutionStatus#CANCELLED}) and then calling
+     * {@link #initialize}. This is separated from {@link #beginResumingDag} because it could take some time for the
+     * job status state store to reflect the updated status.
+     */
+    private void finishResumingDags() throws IOException {
+      for (Map.Entry<String, Dag<JobExecutionPlan>> dag : this.resumingDags.entrySet()) {
+        JobStatus flowStatus = pollFlowStatus(dag.getValue());
+        if (!flowStatus.getEventName().equals(PENDING_RESUME.name())) {

Review comment:
       Possible NPE here. Is there a possibility that flowStatus is null?

##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -462,6 +501,64 @@ public void run() {
       }
     }
 
+    /**
+     * Begin resuming a dag by setting the status of both the dag and the failed/cancelled dag nodes to {@link ExecutionStatus#PENDING_RESUME},
+     * and also sending events so that this status will be reflected in the job status state store.
+     */
+    private void beginResumingDag(String dagId) {
+      Dag<JobExecutionPlan> dag = this.failedDags.get(dagId);
+      if (dag == null) {
+        log.warn("No dag found with dagId " + dagId + ", so cannot resume flow");
+        return;
+      }
+
+      long flowResumeTime = System.currentTimeMillis();
+
+      // Set the flow and it's failed or cancelled nodes to PENDING_RESUME so that the flow will be resumed from the point before it failed
+      DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, TimingEvent.FlowTimings.FLOW_PENDING_RESUME);
+      for (DagNode<JobExecutionPlan> node : dag.getNodes()) {
+        ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
+        if (executionStatus.equals(FAILED) || executionStatus.equals(CANCELLED)) {
+          node.getValue().setExecutionStatus(PENDING_RESUME);
+        }
+        Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
+        this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
+
+        // Set flowStartTime so that flow SLA will be based on current time instead of original flow
+        node.getValue().setFlowStartTime(flowResumeTime);
+      }
+
+      this.resumingDags.put(dagId, dag);
+    }
+
+    /**
+     * Finish resuming dags by first verifying the status is correct (flow should be {@link ExecutionStatus#PENDING_RESUME}
+     * and jobs should not be {@link ExecutionStatus#FAILED} or {@link ExecutionStatus#CANCELLED}) and then calling
+     * {@link #initialize}. This is separated from {@link #beginResumingDag} because it could take some time for the
+     * job status state store to reflect the updated status.
+     */
+    private void finishResumingDags() throws IOException {
+      for (Map.Entry<String, Dag<JobExecutionPlan>> dag : this.resumingDags.entrySet()) {
+        JobStatus flowStatus = pollFlowStatus(dag.getValue());
+        if (!flowStatus.getEventName().equals(PENDING_RESUME.name())) {
+          return;

Review comment:
       Should this be continue instead of return? Don't you want to process the remiaining dags?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 533266)
    Time Spent: 20m  (was: 10m)

> Add API to resume a flow in gaas
> --------------------------------
>
>                 Key: GOBBLIN-1342
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1342
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Jack Moseley
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)