You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/07/13 17:20:44 UTC

incubator-gobblin git commit: [GOBBLIN-532] Add option to delete job no matter if it is successful or not

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master be075c629 -> 3bc3d3691


[GOBBLIN-532] Add option to delete job no matter if it is successful or not

Closes #2395 from yukuai518/deleteAlways


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/3bc3d369
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/3bc3d369
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/3bc3d369

Branch: refs/heads/master
Commit: 3bc3d3691fafdc32128a7511ea2a69d5ba4ddc2e
Parents: be075c6
Author: Kuai Yu <ku...@linkedin.com>
Authored: Fri Jul 13 10:20:38 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Jul 13 10:20:38 2018 -0700

----------------------------------------------------------------------
 .../cluster/GobblinClusterConfigurationKeys.java        |  6 ++++++
 .../gobblin/cluster/GobblinHelixJobScheduler.java       | 12 ++++++++++++
 2 files changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3bc3d369/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 648b5bc..b0badc4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -68,6 +68,8 @@ public class GobblinClusterConfigurationKeys {
   // Should job be executed in the scheduler thread?
   public static final String JOB_EXECUTE_IN_SCHEDULING_THREAD = GOBBLIN_CLUSTER_PREFIX + "job.executeInSchedulingThread";
   public static final boolean JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT = true;
+
+  // Helix related tagging
   public static final String HELIX_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + "helixJobTag";
   public static final String HELIX_INSTANCE_TAGS_KEY = GOBBLIN_CLUSTER_PREFIX + "helixInstanceTags";
 
@@ -76,6 +78,10 @@ public class GobblinClusterConfigurationKeys {
   public static final String PLANNING_CONF_PREFIX = GOBBLIN_CLUSTER_PREFIX + "planning.";
   public static final String PLANNING_ID_KEY = PLANNING_CONF_PREFIX + "idKey";
 
+  // job spec operation
+  public static final String JOB_ALWAYS_DELETE = GOBBLIN_CLUSTER_PREFIX + "job.alwaysDelete";
+
+
   /**
    * A path pointing to a directory that contains job execution files to be executed by Gobblin. This directory can
    * have a nested structure.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3bc3d369/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index b991406..3f53c23 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -62,6 +62,7 @@ import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PropertiesUtils;
 
 
 /**
@@ -403,6 +404,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
 
     @Override
     public void run() {
+      boolean isDeleted = false;
       try {
         ((MetricsTrackingListener)jobListener).metrics.updateTimeBeforeJobLaunching(this.jobConfig);
         ((MetricsTrackingListener)jobListener).metrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis, System.currentTimeMillis());
@@ -412,11 +414,21 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
         if (GobblinHelixJobScheduler.this.jobCatalog != null) {
           try {
             GobblinHelixJobScheduler.this.jobCatalog.remove(new URI(jobUri));
+            isDeleted = true;
           } catch (URISyntaxException e) {
             LOGGER.error("Failed to remove job with bad uri " + jobUri, e);
           }
         }
       } catch (JobException je) {
+        boolean alwaysDelete = PropertiesUtils
+            .getPropAsBoolean(this.jobConfig, GobblinClusterConfigurationKeys.JOB_ALWAYS_DELETE, "false");
+        if (alwaysDelete && !isDeleted) {
+          try {
+            GobblinHelixJobScheduler.this.jobCatalog.remove(new URI(jobUri));
+          } catch (URISyntaxException e) {
+            LOGGER.error("Always delete " + jobUri + ". Failed to remove job with bad uri " + jobUri, e);
+          }
+        }
         LOGGER.error("Failed to run job " + this.jobConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
       }
     }