You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:29 UTC

[11/50] incubator-gobblin git commit: [GOBBLIN-398] Upgrade helix to 0.6.9

[GOBBLIN-398] Upgrade helix to 0.6.9

Closes #2272 from htran1/helix_069


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d29b72f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d29b72f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d29b72f4

Branch: refs/heads/0.12.0
Commit: d29b72f4997b4a435397353bb93a66ac4213d55e
Parents: ff13dde
Author: Hung Tran <hu...@linkedin.com>
Authored: Wed Jan 31 17:29:21 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Jan 31 17:29:21 2018 -0800

----------------------------------------------------------------------
 .../gobblin/cluster/GobblinClusterManager.java  |  15 +-
 .../cluster/GobblinHelixJobLauncher.java        |  72 +-
 .../gobblin/cluster/GobblinHelixTaskDriver.java | 296 +-------
 .../apache/helix/task/GobblinJobRebalancer.java | 713 -------------------
 .../cluster/GobblinHelixJobLauncherTest.java    |  22 +-
 gradle/scripts/computeVersions.gradle           |   2 +-
 gradle/scripts/dependencyDefinitions.gradle     |   2 +-
 7 files changed, 57 insertions(+), 1065 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 6b53c6c..77e511e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.cluster;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -60,8 +59,8 @@ import org.apache.helix.messaging.handling.MessageHandler;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
+import org.apache.helix.task.TargetState;
 import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.WorkflowConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -261,21 +260,17 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
 
         // Clean up existing jobs
         TaskDriver taskDriver = new TaskDriver(this.helixManager);
-        GobblinHelixTaskDriver gobblinHelixTaskDriver = new GobblinHelixTaskDriver(this.helixManager);
         Map<String, WorkflowConfig> workflows = taskDriver.getWorkflows();
 
         for (Map.Entry<String, WorkflowConfig> entry : workflows.entrySet()) {
           String queueName = entry.getKey();
           WorkflowConfig workflowConfig = entry.getValue();
 
-          for (String namespacedJobName : workflowConfig.getJobDag().getAllNodes()) {
-            String jobName = TaskUtil.getDenamespacedJobName(queueName, namespacedJobName);
-            LOGGER.info("job {} found for queue {} ", jobName, queueName);
+          // request delete if not already requested
+          if (workflowConfig.getTargetState() != TargetState.DELETE) {
+            taskDriver.delete(queueName);
 
-            // #HELIX-0.6.7-WORKAROUND
-            // working around 0.6.7 delete job issue for queues with IN_PROGRESS state
-            gobblinHelixTaskDriver.deleteJob(queueName, jobName);
-            LOGGER.info("deleted job {} from queue {}", jobName, queueName);
+            LOGGER.info("Requested delete of queue {}", queueName);
           }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 1a39dfb..af15469 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -30,17 +30,14 @@ import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
-import org.apache.helix.IdealStateChangeListener;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.task.GobblinJobRebalancer;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TargetState;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -170,36 +167,6 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
 
     this.taskStateCollectorService = new TaskStateCollectorService(jobProps, this.jobContext.getJobState(),
         this.eventBus, this.stateStores.getTaskStateStore(), outputTaskStateDir);
-
-    if (Task.getExecutionModel(ConfigUtils.configToState(jobConfig)).equals(ExecutionModel.STREAMING)) {
-      // Fix-up Ideal State with a custom rebalancer that will re-balance long-running jobs
-      final String clusterName =
-          ConfigUtils.getString(jobConfig, GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY, "");
-      final String rebalancerToReplace = "org.apache.helix.task.JobRebalancer";
-      final String rebalancerClassDesired = GobblinJobRebalancer.class.getName();
-      final String jobResourceName = this.jobResourceName;
-
-      if (!clusterName.isEmpty()) {
-        this.helixManager.addIdealStateChangeListener(new IdealStateChangeListener() {
-          @Override
-          public void onIdealStateChange(List<IdealState> list, NotificationContext notificationContext) {
-            HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
-            for (String resource : helixAdmin.getResourcesInCluster(clusterName)) {
-              if (resource.equals(jobResourceName)) {
-                IdealState idealState = helixAdmin.getResourceIdealState(clusterName, resource);
-                if (idealState != null) {
-                  String rebalancerClassFound = idealState.getRebalancerClassName();
-                  if (rebalancerToReplace.equals(rebalancerClassFound)) {
-                    idealState.setRebalancerClassName(rebalancerClassDesired);
-                    helixAdmin.setResourceIdealState(clusterName, resource, idealState);
-                  }
-                }
-              }
-            }
-          }
-        });
-      }
-    }
   }
 
   @Override
@@ -240,15 +207,8 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
   protected void executeCancellation() {
     if (this.jobSubmitted) {
       try {
-        // #HELIX-0.6.7-WORKAROUND
-        // working around helix 0.6.7 job delete issue with custom taskDriver
-        LOGGER.info("Cancelling job {} in Helix", this.jobContext.getJobId());
-        GobblinHelixTaskDriver taskDriver = new GobblinHelixTaskDriver(this.helixManager);
-        taskDriver.deleteJob(this.helixQueueName, this.jobContext.getJobId());
-        LOGGER.info("Job {} in cancelled Helix", this.jobContext.getJobId());
-
-        taskDriver.deleteWorkflow(this.helixQueueName, this.jobQueueDeleteTimeoutSeconds);
-      } catch (InterruptedException | IllegalArgumentException e) {
+        this.helixTaskDriver.delete(this.helixQueueName);
+      } catch (IllegalArgumentException e) {
         LOGGER.warn("Failed to cancel job {} in Helix", this.jobContext.getJobId(), e);
       }
     }
@@ -293,6 +253,11 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
     jobConfigBuilder.setNumConcurrentTasksPerInstance(ConfigUtils.getInt(jobConfig,
         GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY,
         GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT));
+
+    if (Task.getExecutionModel(ConfigUtils.configToState(jobConfig)).equals(ExecutionModel.STREAMING)) {
+      jobConfigBuilder.setRebalanceRunningTask(true);
+    }
+
     return jobConfigBuilder;
   }
 
@@ -300,18 +265,27 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
    * Submit a job to run.
    */
   private void submitJobToHelix(JobConfig.Builder jobConfigBuilder) throws Exception {
+    WorkflowConfig workflowConfig = this.helixTaskDriver.getWorkflowConfig(this.helixManager, this.helixQueueName);
+
+    // If the queue is present, but in delete state then wait for cleanup before recreating the queue
+    if (workflowConfig != null && workflowConfig.getTargetState() == TargetState.DELETE) {
+      GobblinHelixTaskDriver gobblinHelixTaskDriver = new GobblinHelixTaskDriver(this.helixManager);
+      gobblinHelixTaskDriver.deleteWorkflow(this.helixQueueName, this.jobQueueDeleteTimeoutSeconds);
+      // if we get here then the workflow was successfully deleted
+      workflowConfig = null;
+    }
+
     // Create one queue for each job with the job name being the queue name
-    if (null == this.helixTaskDriver.getWorkflowConfig(this.helixManager, this.helixQueueName)) {
-      JobQueue jobQueue = new JobQueue.Builder(this.helixQueueName).build();
-      this.helixTaskDriver.createQueue(jobQueue);
-      LOGGER.info("Created job queue {}", this.helixQueueName);
+    if (workflowConfig == null) {
+        JobQueue jobQueue = new JobQueue.Builder(this.helixQueueName).build();
+        this.helixTaskDriver.createQueue(jobQueue);
+        LOGGER.info("Created job queue {}", this.helixQueueName);
     } else {
       LOGGER.info("Job queue {} already exists", this.helixQueueName);
     }
 
     // Put the job into the queue
     this.helixTaskDriver.enqueueJob(this.jobContext.getJobName(), this.jobContext.getJobId(), jobConfigBuilder);
-
   }
 
   public void launchJob(@Nullable JobListener jobListener)

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
index cedb111..ebe2b52 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
@@ -16,54 +16,23 @@
  */
 package org.apache.gobblin.cluster;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.AccessOption;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.IdealState;
 import org.apache.helix.store.HelixPropertyStore;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.task.JobDag;
 import org.apache.helix.task.TargetState;
-import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
 
 /**
  * #HELIX-0.6.7-WORKAROUND
  * Replacement TaskDriver methods to workaround bugs and changes in behavior for the 0.6.7 upgrade
  */
 public class GobblinHelixTaskDriver {
-  /** For logging */
-  private static final Logger LOG = Logger.getLogger(GobblinHelixTaskDriver.class);
-
-  private final HelixDataAccessor _accessor;
-  private final ConfigAccessor _cfgAccessor;
-  private final HelixPropertyStore<ZNRecord> _propertyStore;
-  private final HelixAdmin _admin;
-  private final String _clusterName;
   private final TaskDriver _taskDriver;
 
   public GobblinHelixTaskDriver(HelixManager manager) {
@@ -71,277 +40,24 @@ public class GobblinHelixTaskDriver {
         .getConfigAccessor(), manager.getHelixPropertyStore(), manager.getClusterName());
   }
 
-  public GobblinHelixTaskDriver(ZkClient client, String clusterName) {
-    this(client, new ZkBaseDataAccessor<ZNRecord>(client), clusterName);
-  }
-
-  public GobblinHelixTaskDriver(ZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) {
-    this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor),
-        new ConfigAccessor(client), new ZkHelixPropertyStore<ZNRecord>(baseAccessor,
-            PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName), null), clusterName);
-  }
-
   public GobblinHelixTaskDriver(HelixAdmin admin, HelixDataAccessor accessor, ConfigAccessor cfgAccessor,
       HelixPropertyStore<ZNRecord> propertyStore, String clusterName) {
-    _admin = admin;
-    _accessor = accessor;
-    _cfgAccessor = cfgAccessor;
-    _propertyStore = propertyStore;
-    _clusterName = clusterName;
     _taskDriver = new TaskDriver(admin, accessor, cfgAccessor, propertyStore, clusterName);
   }
 
   /**
-   * Delete a job from an existing named queue,
-   * the queue has to be stopped prior to this call
-   *
-   * @param queueName
-   * @param jobName
-   */
-  public void deleteJob(final String queueName, final String jobName) {
-    WorkflowConfig workflowCfg =
-        _taskDriver.getWorkflowConfig(queueName);
-
-    if (workflowCfg == null) {
-      throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
-    }
-    if (workflowCfg.isTerminable()) {
-      throw new IllegalArgumentException(queueName + " is not a queue!");
-    }
-
-    boolean isRecurringWorkflow =
-        (workflowCfg.getScheduleConfig() != null && workflowCfg.getScheduleConfig().isRecurring());
-
-    if (isRecurringWorkflow) {
-      WorkflowContext wCtx = _taskDriver.getWorkflowContext(queueName);
-
-      String lastScheduledQueue = wCtx.getLastScheduledSingleWorkflow();
-
-      // delete the current scheduled one
-      deleteJobFromScheduledQueue(lastScheduledQueue, jobName, true);
-
-      // Remove the job from the original queue template's DAG
-      removeJobFromDag(queueName, jobName);
-
-      // delete the ideal state and resource config for the template job
-      final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
-      _admin.dropResource(_clusterName, namespacedJobName);
-
-      // Delete the job template from property store
-      String jobPropertyPath =
-          Joiner.on("/")
-              .join(TaskConstants.REBALANCER_CONTEXT_ROOT, namespacedJobName);
-      _propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT);
-    } else {
-      deleteJobFromScheduledQueue(queueName, jobName, false);
-    }
-  }
-
-  /**
-   * delete a job from a scheduled (non-recurrent) queue.
-   *
-   * @param queueName
-   * @param jobName
-   */
-  private void deleteJobFromScheduledQueue(final String queueName, final String jobName,
-      boolean isRecurrent) {
-    WorkflowConfig workflowCfg = _taskDriver.getWorkflowConfig(queueName);
-
-    if (workflowCfg == null) {
-      // When try to delete recurrent job, it could be either not started or finished. So
-      // there may not be a workflow config.
-      if (isRecurrent) {
-        return;
-      } else {
-        throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
-      }
-    }
-
-    WorkflowContext wCtx = _taskDriver.getWorkflowContext(queueName);
-    if (wCtx != null && wCtx.getWorkflowState() == null) {
-      throw new IllegalStateException("Queue " + queueName + " does not have a valid work state!");
-    }
-
-    // #HELIX-0.6.7-WORKAROUND
-    // This check is removed to get the same behavior as 0.6.6-SNAPSHOT until new APIs to support delete are provided
-    //String workflowState =
-    //    (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name();
-    //if (workflowState.equals(TaskState.IN_PROGRESS.name())) {
-    //  throw new IllegalStateException("Queue " + queueName + " is still in progress!");
-    //}
-
-    removeJob(queueName, jobName);
-  }
-
-  private boolean removeJobContext(HelixPropertyStore<ZNRecord> propertyStore,
-      String jobResource) {
-    return propertyStore.remove(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource),
-        AccessOption.PERSISTENT);
-  }
-
-  private void removeJob(String queueName, String jobName) {
-    // Remove the job from the queue in the DAG
-    removeJobFromDag(queueName, jobName);
-
-    // delete the ideal state and resource config for the job
-    final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
-    _admin.dropResource(_clusterName, namespacedJobName);
-
-    // update queue's property to remove job from JOB_STATES if it is already started.
-    removeJobStateFromQueue(queueName, jobName);
-
-    // Delete the job from property store
-    removeJobContext(_propertyStore, namespacedJobName);
-  }
-
-  /** Remove the job name from the DAG from the queue configuration */
-  private void removeJobFromDag(final String queueName, final String jobName) {
-    final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
-
-    DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        if (currentData == null) {
-          LOG.error("Could not update DAG for queue: " + queueName + " ZNRecord is null.");
-          return null;
-        }
-        // Add the node to the existing DAG
-        JobDag jobDag = JobDag.fromJson(
-            currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
-        Set<String> allNodes = jobDag.getAllNodes();
-        if (!allNodes.contains(namespacedJobName)) {
-          LOG.warn(
-              "Could not delete job from queue " + queueName + ", job " + jobName + " not exists");
-          return currentData;
-        }
-        String parent = null;
-        String child = null;
-        // remove the node from the queue
-        for (String node : allNodes) {
-          if (jobDag.getDirectChildren(node).contains(namespacedJobName)) {
-            parent = node;
-            jobDag.removeParentToChild(parent, namespacedJobName);
-          } else if (jobDag.getDirectParents(node).contains(namespacedJobName)) {
-            child = node;
-            jobDag.removeParentToChild(namespacedJobName, child);
-          }
-        }
-        if (parent != null && child != null) {
-          jobDag.addParentToChild(parent, child);
-        }
-        jobDag.removeNode(namespacedJobName);
-
-        // Save the updated DAG
-        try {
-          currentData
-              .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
-        } catch (Exception e) {
-          throw new IllegalStateException(
-              "Could not remove job " + jobName + " from DAG of queue " + queueName, e);
-        }
-        return currentData;
-      }
-    };
-
-    String path = _accessor.keyBuilder().resourceConfig(queueName).getPath();
-    if (!_accessor.getBaseDataAccessor().update(path, dagRemover, AccessOption.PERSISTENT)) {
-      throw new IllegalArgumentException(
-          "Could not remove job " + jobName + " from DAG of queue " + queueName);
-    }
-  }
-
-  /** update queue's property to remove job from JOB_STATES if it is already started. */
-  private void removeJobStateFromQueue(final String queueName, final String jobName) {
-    final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
-    String queuePropertyPath =
-        Joiner.on("/")
-            .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE);
-
-    DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        if (currentData != null) {
-          Map<String, String> states = currentData.getMapField(WorkflowContext.JOB_STATES);
-          if (states != null && states.containsKey(namespacedJobName)) {
-            states.keySet().remove(namespacedJobName);
-          }
-        }
-        return currentData;
-      }
-    };
-    if (!_propertyStore.update(queuePropertyPath, updater, AccessOption.PERSISTENT)) {
-      LOG.warn("Fail to remove job state for job " + namespacedJobName + " from queue " + queueName);
-    }
-  }
-
-  /**
-   * Trigger a controller pipeline execution for a given resource.
-   *
-   * @param accessor Helix data accessor
-   * @param resource the name of the resource changed to triggering the execution
-   */
-  private void invokeRebalance(HelixDataAccessor accessor, String resource) {
-    // The pipeline is idempotent, so touching an ideal state is enough to trigger a pipeline run
-    LOG.info("invoke rebalance for " + resource);
-    PropertyKey key = accessor.keyBuilder().idealStates(resource);
-    IdealState is = accessor.getProperty(key);
-    if (is != null && is.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
-      if (!accessor.updateProperty(key, is)) {
-        LOG.warn("Failed to invoke rebalance on resource " + resource);
-      }
-    } else {
-      LOG.warn("Can't find ideal state or ideal state is not for right type for " + resource);
-    }
-  }
-
-  /** Helper function to change target state for a given workflow */
-  private void setSingleWorkflowTargetState(String workflowName, final TargetState state) {
-    LOG.info("Set " + workflowName + " to target state " + state);
-    DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        if (currentData != null) {
-          // Only update target state for non-completed workflows
-          String finishTime = currentData.getSimpleField(WorkflowContext.FINISH_TIME);
-          if (finishTime == null || finishTime.equals(String.valueOf(WorkflowContext.UNFINISHED))) {
-            currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
-                state.name());
-          } else {
-            LOG.info("TargetState DataUpdater: ignore to update target state " + finishTime);
-          }
-        } else {
-          LOG.error("TargetState DataUpdater: Fails to update target state ");
-        }
-        return currentData;
-      }
-    };
-    List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
-    List<String> paths = Lists.newArrayList();
-
-    PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(_accessor, workflowName);
-    if (_accessor.getProperty(cfgKey) != null) {
-      paths.add(_accessor.keyBuilder().resourceConfig(workflowName).getPath());
-      updaters.add(updater);
-      _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
-      invokeRebalance(_accessor, workflowName);
-    } else {
-      LOG.error("Configuration path " + cfgKey + " not found!");
-    }
-  }
-
-  /**
    * Delete the workflow
    *
    * @param workflow  The workflow name
    * @param timeout   The timeout for deleting the workflow/queue in seconds
    */
   public void deleteWorkflow(String workflow, long timeout) throws InterruptedException {
-    // #HELIX-0.6.7-WORKAROUND
-    // Helix 0.6.7 has a bug where TaskDriver.delete(workflow) will delete all resources with a
-    // workflow as the prefix. Work around the bug by pulling in the code from TaskDriver and calling
-    // setSingleWorkflowTargetState directly to bypass the prefix matching code.
-    setSingleWorkflowTargetState(workflow, TargetState.DELETE);
+    WorkflowConfig workflowConfig = _taskDriver.getWorkflowConfig(workflow);
+
+    // set the target state if not already set
+    if (workflowConfig != null && workflowConfig.getTargetState() != TargetState.DELETE) {
+      _taskDriver.delete(workflow);
+    }
 
     long endTime = System.currentTimeMillis() + (timeout * 1000);
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gobblin-cluster/src/main/java/org/apache/helix/task/GobblinJobRebalancer.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/helix/task/GobblinJobRebalancer.java b/gobblin-cluster/src/main/java/org/apache/helix/task/GobblinJobRebalancer.java
deleted file mode 100644
index 25241d5..0000000
--- a/gobblin-cluster/src/main/java/org/apache/helix/task/GobblinJobRebalancer.java
+++ /dev/null
@@ -1,713 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.helix.task;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.helix.AccessOption;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-
-/**
- * Custom rebalancer implementation for the {@code Job} in task model.
- */
-public class GobblinJobRebalancer extends TaskRebalancer {
-  private static final Logger LOG = Logger.getLogger(GobblinJobRebalancer.class);
-  private static TaskAssignmentCalculator fixTaskAssignmentCal =
-      new FixedTargetTaskAssignmentCalculator();
-  private static TaskAssignmentCalculator genericTaskAssignmentCal =
-      new GenericTaskAssignmentCalculator();
-
-  private static final String PREV_RA_NODE = "PreviousResourceAssignment";
-
-  @Override
-  public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
-      IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
-    final String jobName = resource.getResourceName();
-    LOG.debug("Computer Best Partition for job: " + jobName);
-
-    // Fetch job configuration
-    JobConfig jobCfg = TaskUtil.getJobCfg(_manager, jobName);
-    if (jobCfg == null) {
-      LOG.error("Job configuration is NULL for " + jobName);
-      return buildEmptyAssignment(jobName, currStateOutput);
-    }
-    String workflowResource = jobCfg.getWorkflow();
-
-    // Fetch workflow configuration and context
-    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
-    if (workflowCfg == null) {
-      LOG.error("Workflow configuration is NULL for " + jobName);
-      return buildEmptyAssignment(jobName, currStateOutput);
-    }
-
-    WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
-    if (workflowCtx == null) {
-      LOG.error("Workflow context is NULL for " + jobName);
-      return buildEmptyAssignment(jobName, currStateOutput);
-    }
-
-    TargetState targetState = workflowCfg.getTargetState();
-    if (targetState != TargetState.START && targetState != TargetState.STOP) {
-      LOG.info("Target state is " + targetState.name() + " for workflow " + workflowResource
-          + ".Stop scheduling job " + jobName);
-      return buildEmptyAssignment(jobName, currStateOutput);
-    }
-
-    // Stop current run of the job if workflow or job is already in final state (failed or completed)
-    TaskState workflowState = workflowCtx.getWorkflowState();
-    TaskState jobState = workflowCtx.getJobState(jobName);
-    // The job is already in a final state (completed/failed).
-    if (workflowState == TaskState.FAILED || workflowState == TaskState.COMPLETED ||
-        jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) {
-      LOG.info(String.format(
-          "Workflow %s or job %s is already failed or completed, workflow state (%s), job state (%s), clean up job IS.",
-          workflowResource, jobName, workflowState, jobState));
-      cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
-      _scheduledRebalancer.removeScheduledRebalance(jobName);
-      return buildEmptyAssignment(jobName, currStateOutput);
-    }
-
-    if (!isWorkflowReadyForSchedule(workflowCfg)) {
-      LOG.info("Job is not ready to be run since workflow is not ready " + jobName);
-      return buildEmptyAssignment(jobName, currStateOutput);
-    }
-
-    if (!isJobStarted(jobName, workflowCtx) && !isJobReadyToSchedule(jobName, workflowCfg,
-        workflowCtx)) {
-      LOG.info("Job is not ready to run " + jobName);
-      return buildEmptyAssignment(jobName, currStateOutput);
-    }
-
-    // Fetch any existing context information from the property store.
-    JobContext jobCtx = TaskUtil.getJobContext(_manager, jobName);
-    if (jobCtx == null) {
-      jobCtx = new JobContext(new ZNRecord("TaskContext"));
-      jobCtx.setStartTime(System.currentTimeMillis());
-    }
-
-    // Grab the old assignment, or an empty one if it doesn't exist
-    ResourceAssignment prevAssignment = getPrevResourceAssignment(jobName);
-    if (prevAssignment == null) {
-      prevAssignment = new ResourceAssignment(jobName);
-    }
-
-    // Will contain the list of partitions that must be explicitly dropped from the ideal state that
-    // is stored in zk.
-    // Fetch the previous resource assignment from the property store. This is required because of
-    // HELIX-230.
-    Set<String> liveInstances = jobCfg.getInstanceGroupTag() == null
-        ? clusterData.getAllEnabledLiveInstances()
-        : clusterData.getAllEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
-
-    if (liveInstances.isEmpty()) {
-      LOG.error("No available instance found for job!");
-    }
-
-    Set<Integer> partitionsToDrop = new TreeSet<Integer>();
-    ResourceAssignment newAssignment =
-        computeResourceMapping(jobName, workflowCfg, jobCfg, prevAssignment, liveInstances,
-            currStateOutput, workflowCtx, jobCtx, partitionsToDrop, clusterData);
-
-    if (!partitionsToDrop.isEmpty()) {
-      for (Integer pId : partitionsToDrop) {
-        taskIs.getRecord().getMapFields().remove(pName(jobName, pId));
-      }
-      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-      PropertyKey propertyKey = accessor.keyBuilder().idealStates(jobName);
-      accessor.setProperty(propertyKey, taskIs);
-    }
-
-    // Update rebalancer context, previous ideal state.
-    TaskUtil.setJobContext(_manager, jobName, jobCtx);
-    TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
-    setPrevResourceAssignment(jobName, newAssignment);
-
-    LOG.debug("Job " + jobName + " new assignment " + Arrays
-        .toString(newAssignment.getMappedPartitions().toArray()));
-    return newAssignment;
-  }
-
-  private Set<String> getInstancesAssignedToOtherJobs(String currentJobName,
-      WorkflowConfig workflowCfg) {
-    Set<String> ret = new HashSet<String>();
-    for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
-      if (jobName.equals(currentJobName)) {
-        continue;
-      }
-      JobContext jobContext = TaskUtil.getJobContext(_manager, jobName);
-      if (jobContext == null) {
-        continue;
-      }
-      for (int partition : jobContext.getPartitionSet()) {
-        TaskPartitionState partitionState = jobContext.getPartitionState(partition);
-        if (partitionState == TaskPartitionState.INIT ||
-            partitionState == TaskPartitionState.RUNNING) {
-          ret.add(jobContext.getAssignedParticipant(partition));
-        }
-      }
-    }
-
-    return ret;
-  }
-
-  private ResourceAssignment computeResourceMapping(String jobResource,
-      WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment,
-      Collection<String> liveInstances, CurrentStateOutput currStateOutput,
-      WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
-      ClusterDataCache cache) {
-    TargetState jobTgtState = workflowConfig.getTargetState();
-    // Update running status in workflow context
-    if (jobTgtState == TargetState.STOP) {
-      workflowCtx.setJobState(jobResource, TaskState.STOPPED);
-      // Workflow has been stopped if all in progress jobs are stopped
-      if (isWorkflowStopped(workflowCtx, workflowConfig)) {
-        workflowCtx.setWorkflowState(TaskState.STOPPED);
-      }
-    } else {
-      workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
-      // Workflow is in progress if any task is in progress
-      workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
-    }
-
-    // Used to keep track of tasks that have already been assigned to instances.
-    Set<Integer> assignedPartitions = new HashSet<Integer>();
-
-    // Used to keep track of tasks that have failed, but whose failure is acceptable
-    Set<Integer> skippedPartitions = new HashSet<Integer>();
-
-    // Keeps a mapping of (partition) -> (instance, state)
-    Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
-
-    // Keeps a mapping of (partition) -> (instance, state) of partitions have have been relocated
-    Map<Integer, PartitionAssignment> relocatedPaMap = new TreeMap<Integer, PartitionAssignment>();
-
-    Set<String> excludedInstances = getInstancesAssignedToOtherJobs(jobResource, workflowConfig);
-
-    // Process all the current assignments of tasks.
-    TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalulator(jobCfg);
-    Set<Integer> allPartitions = taskAssignmentCal
-        .getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache.getIdealStates());
-
-    if (allPartitions == null || allPartitions.isEmpty()) {
-      // Empty target partitions, mark the job as FAILED.
-      String failureMsg = "Empty task partition mapping for job " + jobResource + ", marked the job as FAILED!";
-      LOG.info(failureMsg);
-      jobCtx.setInfo(failureMsg);
-      markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx);
-      markAllPartitionsError(jobCtx, TaskPartitionState.ERROR, false);
-      _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.FAILED);
-      return new ResourceAssignment(jobResource);
-    }
-
-    Map<String, SortedSet<Integer>> taskAssignments =
-        getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
-    long currentTime = System.currentTimeMillis();
-
-    LOG.debug("All partitions: " + allPartitions + " taskAssignment: " + taskAssignments
-        + " excludedInstances: " + excludedInstances);
-
-    for (Map.Entry<String, SortedSet<Integer>> entryInstance : taskAssignments.entrySet()) {
-      String instance = entryInstance.getKey();
-      if (excludedInstances.contains(instance)) {
-        continue;
-      }
-
-      Set<Integer> pSet = entryInstance.getValue();
-      // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
-      // TASK_ERROR, ERROR.
-      Set<Integer> donePartitions = new TreeSet<Integer>();
-      for (int pId : pSet) {
-        final String pName = pName(jobResource, pId);
-
-        // Check for pending state transitions on this (partition, instance).
-        Message pendingMessage =
-            currStateOutput.getPendingState(jobResource, new Partition(pName), instance);
-        if (pendingMessage != null) {
-          // There is a pending state transition for this (partition, instance). Just copy forward
-          // the state assignment from the previous ideal state.
-          Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName));
-          if (stateMap != null) {
-            String prevState = stateMap.get(instance);
-            paMap.put(pId, new PartitionAssignment(instance, prevState));
-            assignedPartitions.add(pId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String.format(
-                  "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
-                  pName, instance, prevState));
-            }
-          }
-
-          continue;
-        }
-
-        TaskPartitionState currState =
-            TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new Partition(
-                pName), instance));
-        jobCtx.setPartitionState(pId, currState);
-
-        String taskMsg = currStateOutput.getInfo(jobResource, new Partition(
-            pName), instance);
-        if (taskMsg != null) {
-          jobCtx.setPartitionInfo(pId, taskMsg);
-        }
-
-        // Process any requested state transitions.
-        String requestedStateStr =
-            currStateOutput.getRequestedState(jobResource, new Partition(pName), instance);
-        if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
-          TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
-          if (requestedState.equals(currState)) {
-            LOG.warn(String.format(
-                "Requested state %s is the same as the current state for instance %s.",
-                requestedState, instance));
-          }
-
-          paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
-          assignedPartitions.add(pId);
-          LOG.debug(String.format(
-              "Instance %s requested a state transition to %s for partition %s.", instance,
-              requestedState, pName));
-          continue;
-        }
-
-        switch (currState) {
-          case RUNNING:
-          case STOPPED: {
-            TaskPartitionState nextState;
-            if (jobTgtState == TargetState.START) {
-              nextState = TaskPartitionState.RUNNING;
-            } else {
-              nextState = TaskPartitionState.STOPPED;
-            }
-
-            paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
-            assignedPartitions.add(pId);
-            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
-                nextState, instance));
-          }
-          break;
-          case COMPLETED: {
-            // The task has completed on this partition. Mark as such in the context object.
-            donePartitions.add(pId);
-            LOG.debug(String
-                .format(
-                    "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
-                    pName, currState));
-            partitionsToDropFromIs.add(pId);
-            markPartitionCompleted(jobCtx, pId);
-          }
-          break;
-          case TIMED_OUT:
-          case TASK_ERROR:
-          case TASK_ABORTED:
-          case ERROR: {
-            donePartitions.add(pId); // The task may be rescheduled on a different instance.
-            LOG.debug(String.format(
-                "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.", pName,
-                currState, taskMsg));
-            markPartitionError(jobCtx, pId, currState, true);
-            // The error policy is to fail the task as soon a single partition fails for a specified
-            // maximum number of attempts or task is in ABORTED state.
-            if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask() ||
-                currState.equals(TaskPartitionState.TASK_ABORTED)) {
-              // If we have some leeway for how many tasks we can fail, then we don't have
-              // to fail the job immediately
-              if (skippedPartitions.size() >= jobCfg.getFailureThreshold()) {
-                markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx);
-                _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.FAILED);
-                markAllPartitionsError(jobCtx, currState, false);
-                addAllPartitions(allPartitions, partitionsToDropFromIs);
-
-                // remove IdealState of this job
-                cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
-                return buildEmptyAssignment(jobResource, currStateOutput);
-              } else {
-                skippedPartitions.add(pId);
-                partitionsToDropFromIs.add(pId);
-              }
-
-              LOG.debug("skippedPartitions:" + skippedPartitions);
-            } else {
-              // Mark the task to be started at some later time (if enabled)
-              markPartitionDelayed(jobCfg, jobCtx, pId);
-            }
-          }
-          break;
-          case INIT:
-          case DROPPED: {
-            // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
-            donePartitions.add(pId);
-            LOG.debug(String.format(
-                "Task partition %s has state %s. It will be dropped from the current ideal state.",
-                pName, currState));
-          }
-          break;
-          default:
-            throw new AssertionError("Unknown enum symbol: " + currState);
-        }
-      }
-
-      // Remove the set of task partitions that are completed or in one of the error states.
-      pSet.removeAll(donePartitions);
-    }
-
-    // For delayed tasks, trigger a rebalance event for the closest upcoming ready time
-    scheduleForNextTask(jobResource, jobCtx, currentTime);
-
-    if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) {
-      markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx);
-      _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED);
-      // remove IdealState of this job
-      cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
-    }
-
-    // Make additional task assignments if needed.
-    if (jobTgtState == TargetState.START) {
-      // Contains the set of task partitions that must be excluded from consideration when making
-      // any new assignments.
-      // This includes all completed, failed, delayed, and already assigned partitions.
-      //Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
-      //HACK: Modify excludeSet to be empty
-      Set<Integer> excludeSet = Sets.newTreeSet();
-      addCompletedTasks(excludeSet, jobCtx, allPartitions);
-      addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
-      excludeSet.addAll(skippedPartitions);
-      excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime));
-      // Get instance->[partition, ...] mappings for the target resource.
-      Map<String, SortedSet<Integer>> tgtPartitionAssignments = taskAssignmentCal
-          .getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
-              workflowConfig, workflowCtx, allPartitions, cache.getIdealStates());
-      for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
-        String instance = entry.getKey();
-        if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances
-            .contains(instance)) {
-          continue;
-        }
-
-        // Contains the set of task partitions currently assigned to the instance.
-        Set<Integer> pSet = entry.getValue();
-        int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size();
-        if (numToAssign > 0) {
-          List<Integer> nextPartitions =
-              getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
-          for (Integer pId : nextPartitions) {
-            // if partition is not currently assigned to instance then it may have been moved
-            if (!pSet.contains(pId)) {
-              // look at current assignment to see if task is running on another instance
-              for (Map.Entry<String, SortedSet<Integer>> currentEntry : taskAssignments.entrySet()) {
-                String currentInstance = currentEntry.getKey();
-                Set<Integer> currentpSet = currentEntry.getValue();
-
-                // task is being moved, so transition to STOPPED state
-                if (!instance.equals(currentInstance) && currentpSet.contains(pId)) {
-                  relocatedPaMap.put(pId, new PartitionAssignment(currentInstance, TaskPartitionState.STOPPED.name()));
-                  break;
-                }
-              }
-            }
-
-            String pName = pName(jobResource, pId);
-            paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
-            excludeSet.add(pId);
-            jobCtx.setAssignedParticipant(pId, instance);
-            jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
-            jobCtx.setPartitionStartTime(pId, System.currentTimeMillis());
-            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
-                TaskPartitionState.RUNNING, instance));
-          }
-        }
-      }
-    }
-
-    // Construct a ResourceAssignment object from the map of partition assignments.
-    ResourceAssignment ra = new ResourceAssignment(jobResource);
-    for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
-      PartitionAssignment pa = e.getValue();
-
-      if (relocatedPaMap.containsKey(e.getKey())) {
-        PartitionAssignment currentPa = relocatedPaMap.get(e.getKey());
-
-        ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())),
-            ImmutableMap.of(pa._instance, pa._state, currentPa._instance, currentPa._state));
-      } else {
-        ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())), ImmutableMap.of(pa._instance, pa._state));
-      }
-    }
-
-    return ra;
-  }
-
-  private void markJobComplete(String jobName, JobContext jobContext,
-      WorkflowConfig workflowConfig, WorkflowContext workflowContext) {
-    long currentTime = System.currentTimeMillis();
-    workflowContext.setJobState(jobName, TaskState.COMPLETED);
-    jobContext.setFinishTime(currentTime);
-    if (isWorkflowFinished(workflowContext, workflowConfig)) {
-      workflowContext.setFinishTime(currentTime);
-    }
-  }
-
-  private void scheduleForNextTask(String job, JobContext jobCtx, long now) {
-    // Clear current entries if they exist and are expired
-    long currentTime = now;
-    long scheduledTime = _scheduledRebalancer.getRebalanceTime(job);
-    if (scheduledTime > 0 && currentTime > scheduledTime) {
-      _scheduledRebalancer.removeScheduledRebalance(job);
-    }
-
-    // Figure out the earliest schedulable time in the future of a non-complete job
-    boolean shouldSchedule = false;
-    long earliestTime = Long.MAX_VALUE;
-    for (int p : jobCtx.getPartitionSet()) {
-      long retryTime = jobCtx.getNextRetryTime(p);
-      TaskPartitionState state = jobCtx.getPartitionState(p);
-      state = (state != null) ? state : TaskPartitionState.INIT;
-      Set<TaskPartitionState> errorStates =
-          Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.TASK_ERROR,
-              TaskPartitionState.TIMED_OUT);
-      if (errorStates.contains(state) && retryTime > currentTime && retryTime < earliestTime) {
-        earliestTime = retryTime;
-        shouldSchedule = true;
-      }
-    }
-
-    // If any was found, then schedule it
-    if (shouldSchedule) {
-      _scheduledRebalancer.scheduleRebalance(_manager, job, earliestTime);
-    }
-  }
-
-  /**
-   * Get the last task assignment for a given job
-   *
-   * @param resourceName the name of the job
-   * @return {@link ResourceAssignment} instance, or null if no assignment is available
-   */
-  private ResourceAssignment getPrevResourceAssignment(String resourceName) {
-    ZNRecord r = _manager.getHelixPropertyStore()
-        .get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
-            null, AccessOption.PERSISTENT);
-    return r != null ? new ResourceAssignment(r) : null;
-  }
-
-  /**
-   * Set the last task assignment for a given job
-   *
-   * @param resourceName the name of the job
-   * @param ra           {@link ResourceAssignment} containing the task assignment
-   */
-  private void setPrevResourceAssignment(String resourceName,
-      ResourceAssignment ra) {
-    _manager.getHelixPropertyStore()
-        .set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
-            ra.getRecord(), AccessOption.PERSISTENT);
-  }
-
-  /**
-   * Checks if the job has completed.
-   * @param ctx The rebalancer context.
-   * @param allPartitions The set of partitions to check.
-   * @param skippedPartitions partitions that failed, but whose failure is acceptable
-   * @return true if all task partitions have been marked with status
-   *         {@link TaskPartitionState#COMPLETED} in the rebalancer
-   *         context, false otherwise.
-   */
-  private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions,
-      Set<Integer> skippedPartitions, JobConfig cfg) {
-    for (Integer pId : allPartitions) {
-      TaskPartitionState state = ctx.getPartitionState(pId);
-      if (!skippedPartitions.contains(pId) && state != TaskPartitionState.COMPLETED
-          && !isTaskGivenup(ctx, cfg, pId)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-
-  private static void addAllPartitions(Set<Integer> toAdd, Set<Integer> destination) {
-    for (Integer pId : toAdd) {
-      destination.add(pId);
-    }
-  }
-
-  private static void addCompletedTasks(Set<Integer> set, JobContext ctx,
-      Iterable<Integer> pIds) {
-    for (Integer pId : pIds) {
-      TaskPartitionState state = ctx.getPartitionState(pId);
-      if (state == TaskPartitionState.COMPLETED) {
-        set.add(pId);
-      }
-    }
-  }
-
-  private static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) {
-    TaskPartitionState state = ctx.getPartitionState(pId);
-    if ((state != null) && (state.equals(TaskPartitionState.TASK_ABORTED) || state
-        .equals(TaskPartitionState.ERROR))) {
-      return true;
-    }
-    return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask();
-  }
-
-  // add all partitions that have been tried maxNumberAttempts
-  private static void addGiveupPartitions(Set<Integer> set, JobContext ctx, Iterable<Integer> pIds,
-      JobConfig cfg) {
-    for (Integer pId : pIds) {
-      if (isTaskGivenup(ctx, cfg, pId)) {
-        set.add(pId);
-      }
-    }
-  }
-
-  private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
-      Set<Integer> excluded, int n) {
-    List<Integer> result = new ArrayList<Integer>();
-    for (Integer pId : candidatePartitions) {
-      if (result.size() >= n) {
-        break;
-      }
-
-      if (!excluded.contains(pId)) {
-        result.add(pId);
-      }
-    }
-
-    return result;
-  }
-
-  private static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int p) {
-    long delayInterval = cfg.getTaskRetryDelay();
-    if (delayInterval <= 0) {
-      return;
-    }
-    long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval;
-    ctx.setNextRetryTime(p, nextStartTime);
-  }
-
-  private static void markPartitionCompleted(JobContext ctx, int pId) {
-    ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
-    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    ctx.incrementNumAttempts(pId);
-  }
-
-  private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state,
-      boolean incrementAttempts) {
-    ctx.setPartitionState(pId, state);
-    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    if (incrementAttempts) {
-      ctx.incrementNumAttempts(pId);
-    }
-  }
-
-  private static void markAllPartitionsError(JobContext ctx, TaskPartitionState state,
-      boolean incrementAttempts) {
-    for (int pId : ctx.getPartitionSet()) {
-      markPartitionError(ctx, pId, state, incrementAttempts);
-    }
-  }
-
-  /**
-   * Return the assignment of task partitions per instance.
-   */
-  private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
-      Iterable<String> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) {
-    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
-    for (String instance : instanceList) {
-      result.put(instance, new TreeSet<Integer>());
-    }
-
-    for (Partition partition : assignment.getMappedPartitions()) {
-      int pId = TaskUtil.getPartitionId(partition.getPartitionName());
-      if (includeSet.contains(pId)) {
-        Map<String, String> replicaMap = assignment.getReplicaMap(partition);
-        for (String instance : replicaMap.keySet()) {
-          SortedSet<Integer> pList = result.get(instance);
-          if (pList != null) {
-            pList.add(pId);
-          }
-        }
-      }
-    }
-    return result;
-  }
-
-  private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) {
-    Set<Integer> nonReadyPartitions = Sets.newHashSet();
-    for (int p : ctx.getPartitionSet()) {
-      long toStart = ctx.getNextRetryTime(p);
-      if (now < toStart) {
-        nonReadyPartitions.add(p);
-      }
-    }
-    return nonReadyPartitions;
-  }
-
-  private TaskAssignmentCalculator getAssignmentCalulator(JobConfig jobConfig) {
-    Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
-    if (taskConfigMap != null && !taskConfigMap.isEmpty()) {
-      return genericTaskAssignmentCal;
-    } else {
-      return fixTaskAssignmentCal;
-    }
-  }
-
-  /**
-   * Computes the partition name given the resource name and partition id.
-   */
-  private String pName(String resource, int pId) {
-    return resource + "_" + pId;
-  }
-
-  /**
-   * An (instance, state) pair.
-   */
-  private static class PartitionAssignment {
-    private final String _instance;
-    private final String _state;
-
-    private PartitionAssignment(String instance, String state) {
-      _instance = instance;
-      _state = state;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index aaf5f05..10ef3db 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -307,6 +307,9 @@ public class GobblinHelixJobLauncherTest {
 
     gobblinHelixJobLauncher.close();
 
+    // job queue deleted asynchronously after close
+    waitForQueueCleanup(taskDriver, jobName);
+
     jobContext = taskDriver.getJobContext(jobContextName);
 
     // job context should have been deleted
@@ -325,7 +328,9 @@ public class GobblinHelixJobLauncherTest {
 
     gobblinHelixJobLauncher2.close();
 
-    // job queue deleted after close
+    // job queue deleted asynchronously after close
+    waitForQueueCleanup(taskDriver, jobName2);
+
     workflowConfig  = taskDriver.getWorkflowConfig(jobName2);
     Assert.assertNull(workflowConfig);
 
@@ -358,4 +363,19 @@ public class GobblinHelixJobLauncherTest {
       this.closer.close();
     }
   }
+
+   private void waitForQueueCleanup(TaskDriver taskDriver, String queueName) {
+     for (int i = 0; i < 60; i++) {
+       WorkflowConfig workflowConfig  = taskDriver.getWorkflowConfig(queueName);
+
+       if (workflowConfig == null) {
+         break;
+       }
+
+       try {
+         Thread.sleep(1000);
+       } catch (InterruptedException e) {
+       }
+     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gradle/scripts/computeVersions.gradle
----------------------------------------------------------------------
diff --git a/gradle/scripts/computeVersions.gradle b/gradle/scripts/computeVersions.gradle
index 2c05350..cf7baaa 100644
--- a/gradle/scripts/computeVersions.gradle
+++ b/gradle/scripts/computeVersions.gradle
@@ -62,7 +62,7 @@ ext.gradleVersionMajor = Integer.parseInt(gradleVersions[0])
 ext.gradleVersionMinor = Integer.parseInt(gradleVersions[1])
 println "Detected Gradle version major=" + gradleVersionMajor + " minor=" + gradleVersionMinor
 
-ext.dropwizardMetricsVersion = '3.1.0'
+ext.dropwizardMetricsVersion = '3.2.3'
 ext.findBugsVersion = '3.0.0'
 ext.googleVersion = '1.22.0'
 ext.slf4jVersion = '1.7.21'

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gradle/scripts/dependencyDefinitions.gradle
----------------------------------------------------------------------
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index d138a55..ad1c1cf 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -61,7 +61,7 @@ ext.externalDependency = [
     "hadoopYarnMiniCluster": "org.apache.hadoop:hadoop-minicluster:" + hadoopVersion,
     "hadoopAnnotations": "org.apache.hadoop:hadoop-annotations:" + hadoopVersion,
     "hadoopAws": "org.apache.hadoop:hadoop-aws:2.6.0",
-    "helix": "org.apache.helix:helix-core:0.6.7",
+    "helix": "org.apache.helix:helix-core:0.6.9",
     "hiveCommon": "org.apache.hive:hive-common:" + hiveVersion,
     "hiveService": "org.apache.hive:hive-service:" + hiveVersion,
     "hiveJdbc": "org.apache.hive:hive-jdbc:" + hiveVersion,