You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/01/12 01:58:34 UTC
incubator-gobblin git commit: [GOBBLIN-369] Clean up the helix job queue after the job execution is…
Repository: incubator-gobblin
Updated Branches:
refs/heads/master d0784cad9 -> 1d0ec852c
[GOBBLIN-369] Clean up the helix job queue after the job execution is…
Closes #2244 from htran1/helix_queue_cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1d0ec852
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1d0ec852
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1d0ec852
Branch: refs/heads/master
Commit: 1d0ec852c84037ac7c8f24f6694f4757dae21a00
Parents: d0784ca
Author: Hung Tran <hu...@linkedin.com>
Authored: Thu Jan 11 17:58:09 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Jan 11 17:58:17 2018 -0800
----------------------------------------------------------------------
.../GobblinClusterConfigurationKeys.java | 3 ++
.../cluster/GobblinHelixJobLauncher.java | 8 +++++-
.../gobblin/cluster/GobblinHelixTaskDriver.java | 29 ++++++++++++++++++++
.../cluster/GobblinHelixJobLauncherTest.java | 9 ++++++
4 files changed, 48 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1d0ec852/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 8fd9bfd..5e25194 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -82,4 +82,7 @@ public class GobblinClusterConfigurationKeys {
public static final String STOP_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "stopTimeoutSeconds";
public static final long DEFAULT_STOP_TIMEOUT_SECONDS = 60;
+
+ public static final String HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "jobQueueDeleteTimeoutSeconds";
+ public static final long DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS = 300;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1d0ec852/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 73e5330..79f3b9e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -126,6 +126,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
private final ConcurrentHashMap<String, Boolean> runningMap;
private final StateStores stateStores;
private final Config jobConfig;
+ private final long jobQueueDeleteTimeoutSeconds;
public GobblinHelixJobLauncher(Properties jobProps, final HelixManager helixManager, Path appWorkDir,
List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> runningMap)
@@ -150,6 +151,9 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
jobConfig = ConfigUtils.propertiesToConfig(jobProps);
+ this.jobQueueDeleteTimeoutSeconds = ConfigUtils.getLong(jobConfig, GobblinClusterConfigurationKeys.HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS,
+ GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS);
+
Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps)
.withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, ConfigValueFactory.fromAnyRef(
new URI(appWorkDir.toUri().getScheme(), null, appWorkDir.toUri().getHost(),
@@ -240,7 +244,9 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
GobblinHelixTaskDriver taskDriver = new GobblinHelixTaskDriver(this.helixManager);
taskDriver.deleteJob(this.helixQueueName, this.jobContext.getJobId());
LOGGER.info("Job {} in cancelled Helix", this.jobContext.getJobId());
- } catch (IllegalArgumentException e) {
+
+ taskDriver.deleteWorkflow(this.helixQueueName, this.jobQueueDeleteTimeoutSeconds);
+ } catch (InterruptedException | IllegalArgumentException e) {
LOGGER.warn("Failed to cancel job {} in Helix", this.jobContext.getJobId(), e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1d0ec852/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
index bb5c551..a39c5ca 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
@@ -26,6 +26,7 @@ import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
@@ -268,4 +269,32 @@ public class GobblinHelixTaskDriver {
LOG.warn("Fail to remove job state for job " + namespacedJobName + " from queue " + queueName);
}
}
+
+ /**
+ * Delete the workflow
+ *
+ * @param workflow The workflow name
+ * @param timeout The timeout for deleting the workflow/queue in seconds
+ */
+ public void deleteWorkflow(String workflow, long timeout) throws InterruptedException {
+ _taskDriver.delete(workflow);
+
+ long endTime = System.currentTimeMillis() + (timeout * 1000);
+
+ // check for completion of deletion request
+ while (System.currentTimeMillis() <= endTime) {
+ WorkflowContext workflowContext = _taskDriver.getWorkflowContext(workflow);
+
+ if (workflowContext != null) {
+ Thread.sleep(1000);
+ } else {
+ // Successfully deleted
+ return;
+ }
+ }
+
+ // Failed to complete deletion within timeout
+ throw new HelixException(String
+ .format("Fail to delete the workflow/queue %s within %d seconds.", workflow, timeout));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1d0ec852/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index 243c652..cc327fc 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -40,6 +40,8 @@ import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -303,6 +305,13 @@ public class GobblinHelixJobLauncherTest {
// job context should have been deleted
Assert.assertNull(jobContext);
+ // job queue should have been deleted
+ WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(jobName);
+ Assert.assertNull(workflowConfig);
+
+ WorkflowContext workflowContext = taskDriver.getWorkflowContext(jobName);
+ Assert.assertNull(workflowContext);
+
// check that workunit and taskstate directory for the job are cleaned up
final File workunitsDir =
new File(this.appWorkDir + File.separator + GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME