You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/11/05 17:17:44 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1303] Catch error when failing to clean staging data

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fc978c  [GOBBLIN-1303] Catch error when failing to clean staging data
2fc978c is described below

commit 2fc978c1156633d8f5a6a752b590ee78ed757659
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Thu Nov 5 09:17:36 2020 -0800

    [GOBBLIN-1303] Catch error when failing to clean staging data
    
    Closes #3141 from jack-moseley/cleanup-failure
---
 .../gobblin/runtime/AbstractJobLauncher.java       | 66 +++++++++++-----------
 1 file changed, 33 insertions(+), 33 deletions(-)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 0753f50..2ad588f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -559,43 +559,43 @@ public abstract class AbstractJobLauncher implements JobLauncher {
               }
             });
           }
-        }
-      }
 
-      for (JobState.DatasetState datasetState : this.jobContext.getDatasetStatesByUrns().values()) {
-        // Set the overall job state to FAILED if the job failed to process any dataset
-        if (datasetState.getState() == JobState.RunningState.FAILED) {
-          jobState.setState(JobState.RunningState.FAILED);
-          LOG.warn("At least one dataset state is FAILED. Setting job state to FAILED.");
-          break;
-        }
-      }
+          for (JobState.DatasetState datasetState : this.jobContext.getDatasetStatesByUrns().values()) {
+            // Set the overall job state to FAILED if the job failed to process any dataset
+            if (datasetState.getState() == JobState.RunningState.FAILED) {
+              jobState.setState(JobState.RunningState.FAILED);
+              LOG.warn("At least one dataset state is FAILED. Setting job state to FAILED.");
+              break;
+            }
+          }
 
-      notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_COMPLETE, new JobListenerAction() {
-        @Override
-        public void apply(JobListener jobListener, JobContext jobContext)
-            throws Exception {
-          jobListener.onJobCompletion(jobContext);
-        }
-      });
+          notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_COMPLETE, new JobListenerAction() {
+            @Override
+            public void apply(JobListener jobListener, JobContext jobContext)
+                throws Exception {
+              jobListener.onJobCompletion(jobContext);
+            }
+          });
 
-      if (jobState.getState() == JobState.RunningState.FAILED) {
-        notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_FAILED, new JobListenerAction() {
-          @Override
-          public void apply(JobListener jobListener, JobContext jobContext)
-              throws Exception {
-            jobListener.onJobFailure(jobContext);
-          }
-        });
-        throw new JobException(String.format("Job %s failed", jobId));
-      } else {
-        notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_SUCCEEDED, new JobListenerAction() {
-          @Override
-          public void apply(JobListener jobListener, JobContext jobContext)
-              throws Exception {
-            jobListener.onJobFailure(jobContext);
+          if (jobState.getState() == JobState.RunningState.FAILED) {
+            notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_FAILED, new JobListenerAction() {
+              @Override
+              public void apply(JobListener jobListener, JobContext jobContext)
+                  throws Exception {
+                jobListener.onJobFailure(jobContext);
+              }
+            });
+            throw new JobException(String.format("Job %s failed", jobId));
+          } else {
+            notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_SUCCEEDED, new JobListenerAction() {
+              @Override
+              public void apply(JobListener jobListener, JobContext jobContext)
+                  throws Exception {
+                jobListener.onJobFailure(jobContext);
+              }
+            });
           }
-        });
+        }
       }
     } finally {
       // Stop metrics reporting