You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/01/12 01:58:34 UTC

incubator-gobblin git commit: [GOBBLIN-369] Clean up the helix job queue after the job execution is…

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master d0784cad9 -> 1d0ec852c


[GOBBLIN-369] Clean up the helix job queue after the job execution is…

Closes #2244 from htran1/helix_queue_cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1d0ec852
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1d0ec852
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1d0ec852

Branch: refs/heads/master
Commit: 1d0ec852c84037ac7c8f24f6694f4757dae21a00
Parents: d0784ca
Author: Hung Tran <hu...@linkedin.com>
Authored: Thu Jan 11 17:58:09 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Jan 11 17:58:17 2018 -0800

----------------------------------------------------------------------
 .../GobblinClusterConfigurationKeys.java        |  3 ++
 .../cluster/GobblinHelixJobLauncher.java        |  8 +++++-
 .../gobblin/cluster/GobblinHelixTaskDriver.java | 29 ++++++++++++++++++++
 .../cluster/GobblinHelixJobLauncherTest.java    |  9 ++++++
 4 files changed, 48 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1d0ec852/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 8fd9bfd..5e25194 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -82,4 +82,7 @@ public class GobblinClusterConfigurationKeys {
 
   public static final String STOP_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "stopTimeoutSeconds";
   public static final long DEFAULT_STOP_TIMEOUT_SECONDS = 60;
+
+  public static final String HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "jobQueueDeleteTimeoutSeconds";
+  public static final long DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS = 300;
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1d0ec852/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 73e5330..79f3b9e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -126,6 +126,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
   private final ConcurrentHashMap<String, Boolean> runningMap;
   private final StateStores stateStores;
   private final Config jobConfig;
+  private final long jobQueueDeleteTimeoutSeconds;
 
   public GobblinHelixJobLauncher(Properties jobProps, final HelixManager helixManager, Path appWorkDir,
       List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> runningMap)
@@ -150,6 +151,9 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
 
     jobConfig = ConfigUtils.propertiesToConfig(jobProps);
 
+    this.jobQueueDeleteTimeoutSeconds = ConfigUtils.getLong(jobConfig, GobblinClusterConfigurationKeys.HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS,
+        GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS);
+
     Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps)
         .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, ConfigValueFactory.fromAnyRef(
             new URI(appWorkDir.toUri().getScheme(), null, appWorkDir.toUri().getHost(),
@@ -240,7 +244,9 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
         GobblinHelixTaskDriver taskDriver = new GobblinHelixTaskDriver(this.helixManager);
         taskDriver.deleteJob(this.helixQueueName, this.jobContext.getJobId());
         LOGGER.info("Job {} in cancelled Helix", this.jobContext.getJobId());
-      } catch (IllegalArgumentException e) {
+
+        taskDriver.deleteWorkflow(this.helixQueueName, this.jobQueueDeleteTimeoutSeconds);
+      } catch (InterruptedException | IllegalArgumentException e) {
         LOGGER.warn("Failed to cancel job {} in Helix", this.jobContext.getJobId(), e);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1d0ec852/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
index bb5c551..a39c5ca 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
@@ -26,6 +26,7 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
@@ -268,4 +269,32 @@ public class GobblinHelixTaskDriver {
       LOG.warn("Fail to remove job state for job " + namespacedJobName + " from queue " + queueName);
     }
   }
+
+  /**
+   * Delete the workflow
+   *
+   * @param workflow  The workflow name
+   * @param timeout   The timeout for deleting the workflow/queue in seconds
+   */
+  public void deleteWorkflow(String workflow, long timeout) throws InterruptedException {
+    _taskDriver.delete(workflow);
+
+    long endTime = System.currentTimeMillis() + (timeout * 1000);
+
+    // check for completion of deletion request
+    while (System.currentTimeMillis() <= endTime) {
+      WorkflowContext workflowContext = _taskDriver.getWorkflowContext(workflow);
+
+      if (workflowContext != null) {
+        Thread.sleep(1000);
+      } else {
+        // Successfully deleted
+        return;
+      }
+    }
+
+    // Failed to complete deletion within timeout
+    throw new HelixException(String
+        .format("Fail to delete the workflow/queue %s within %d seconds.", workflow, timeout));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1d0ec852/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index 243c652..cc327fc 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -40,6 +40,8 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -303,6 +305,13 @@ public class GobblinHelixJobLauncherTest {
     // job context should have been deleted
     Assert.assertNull(jobContext);
 
+    // job queue should have been deleted
+    WorkflowConfig workflowConfig  = taskDriver.getWorkflowConfig(jobName);
+    Assert.assertNull(workflowConfig);
+
+    WorkflowContext workflowContext = taskDriver.getWorkflowContext(jobName);
+    Assert.assertNull(workflowContext);
+
     // check that workunit and taskstate directory for the job are cleaned up
     final File workunitsDir =
         new File(this.appWorkDir + File.separator + GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME