You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/07/13 17:20:44 UTC
incubator-gobblin git commit: [GOBBLIN-532] Add option to delete job
no matter if it is successful or not
Repository: incubator-gobblin
Updated Branches:
refs/heads/master be075c629 -> 3bc3d3691
[GOBBLIN-532] Add option to delete job no matter if it is successful or not
Closes #2395 from yukuai518/deleteAlways
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/3bc3d369
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/3bc3d369
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/3bc3d369
Branch: refs/heads/master
Commit: 3bc3d3691fafdc32128a7511ea2a69d5ba4ddc2e
Parents: be075c6
Author: Kuai Yu <ku...@linkedin.com>
Authored: Fri Jul 13 10:20:38 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Jul 13 10:20:38 2018 -0700
----------------------------------------------------------------------
.../cluster/GobblinClusterConfigurationKeys.java | 6 ++++++
.../gobblin/cluster/GobblinHelixJobScheduler.java | 12 ++++++++++++
2 files changed, 18 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3bc3d369/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 648b5bc..b0badc4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -68,6 +68,8 @@ public class GobblinClusterConfigurationKeys {
// Should job be executed in the scheduler thread?
public static final String JOB_EXECUTE_IN_SCHEDULING_THREAD = GOBBLIN_CLUSTER_PREFIX + "job.executeInSchedulingThread";
public static final boolean JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT = true;
+
+ // Helix related tagging
public static final String HELIX_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + "helixJobTag";
public static final String HELIX_INSTANCE_TAGS_KEY = GOBBLIN_CLUSTER_PREFIX + "helixInstanceTags";
@@ -76,6 +78,10 @@ public class GobblinClusterConfigurationKeys {
public static final String PLANNING_CONF_PREFIX = GOBBLIN_CLUSTER_PREFIX + "planning.";
public static final String PLANNING_ID_KEY = PLANNING_CONF_PREFIX + "idKey";
+ // job spec operation
+ public static final String JOB_ALWAYS_DELETE = GOBBLIN_CLUSTER_PREFIX + "job.alwaysDelete";
+
+
/**
* A path pointing to a directory that contains job execution files to be executed by Gobblin. This directory can
* have a nested structure.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3bc3d369/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index b991406..3f53c23 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -62,6 +62,7 @@ import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.scheduler.JobScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PropertiesUtils;
/**
@@ -403,6 +404,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
@Override
public void run() {
+ boolean isDeleted = false;
try {
((MetricsTrackingListener)jobListener).metrics.updateTimeBeforeJobLaunching(this.jobConfig);
((MetricsTrackingListener)jobListener).metrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis, System.currentTimeMillis());
@@ -412,11 +414,21 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
if (GobblinHelixJobScheduler.this.jobCatalog != null) {
try {
GobblinHelixJobScheduler.this.jobCatalog.remove(new URI(jobUri));
+ isDeleted = true;
} catch (URISyntaxException e) {
LOGGER.error("Failed to remove job with bad uri " + jobUri, e);
}
}
} catch (JobException je) {
+ boolean alwaysDelete = PropertiesUtils
+ .getPropAsBoolean(this.jobConfig, GobblinClusterConfigurationKeys.JOB_ALWAYS_DELETE, "false");
+ if (alwaysDelete && !isDeleted) {
+ try {
+ GobblinHelixJobScheduler.this.jobCatalog.remove(new URI(jobUri));
+ } catch (URISyntaxException e) {
+ LOGGER.error("Always delete " + jobUri + ". Failed to remove job with bad uri " + jobUri, e);
+ }
+ }
LOGGER.error("Failed to run job " + this.jobConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
}
}