You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/10/03 06:01:26 UTC

[3/3] helix git commit: Clean up TaskDriver, move all command line support code to a new tool class TaskAdmin.java, and build a commandline shell script from TaskAdmin.

Clean up TaskDriver, move all command line support code to a new tool class TaskAdmin.java, and build a commandline shell script from TaskAdmin.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/55b84465
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/55b84465
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/55b84465

Branch: refs/heads/master
Commit: 55b844657947dea661f6067dbd32237b0ed6afe2
Parents: 4e48719
Author: Lei Xia <lx...@linkedin.com>
Authored: Thu Jan 26 13:46:00 2017 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Oct 2 19:06:41 2017 -0700

----------------------------------------------------------------------
 helix-core/pom.xml                              |   4 +
 .../stages/PersistAssignmentStage.java          |   2 +-
 .../org/apache/helix/manager/zk/ZKUtil.java     |  10 +-
 .../java/org/apache/helix/task/TaskDriver.java  | 288 ++-----------------
 .../apache/helix/task/WorkflowRebalancer.java   |   4 +-
 .../java/org/apache/helix/tools/TaskAdmin.java  | 284 ++++++++++++++++++
 6 files changed, 321 insertions(+), 271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/55b84465/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index 28e115b..789a5e1 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -232,6 +232,10 @@ under the License.
               <mainClass>org.apache.helix.tools.ZkGrep</mainClass>
               <name>zkgrep</name>
             </program>
+            <program>
+              <mainClass>org.apache.helix.tools.TaskAdmin</mainClass>
+              <name>task-admin</name>
+            </program>
           </programs>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/helix/blob/55b84465/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index b55a838..cd320a4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -96,7 +96,7 @@ public class PersistAssignmentStage extends AbstractBaseStage {
           partitionStateMap = intermediateAssignment.getPartitionStateMap(resourceId);
         }
 
-        Map<Partition, Map<String, String>> assignmentToPersist = partitionStateMap.getStateMap());
+        Map<Partition, Map<String, String>> assignmentToPersist = partitionStateMap.getStateMap();
 
         if (assignmentToPersist != null && hasInstanceMapChanged(assignmentToPersist, idealState)) {
           for (Partition partition : assignmentToPersist.keySet()) {

http://git-wip-us.apache.org/repos/asf/helix/blob/55b84465/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
index 38b74cb..7300e07 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
@@ -27,7 +27,9 @@ import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.model.HelixConfigScope;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.data.Stat;
@@ -52,11 +54,11 @@ public final class ZKUtil {
     ArrayList<String> requiredPaths = new ArrayList<String>();
     requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.IDEALSTATES, clusterName));
     requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
-        ConfigScopeProperty.CLUSTER.toString(), clusterName));
+        HelixConfigScope.ConfigScopeProperty.CLUSTER.toString(), clusterName));
     requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
-        ConfigScopeProperty.PARTICIPANT.toString()));
+        HelixConfigScope.ConfigScopeProperty.PARTICIPANT.toString()));
     requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
-        ConfigScopeProperty.RESOURCE.toString()));
+        HelixConfigScope.ConfigScopeProperty.RESOURCE.toString()));
     requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.PROPERTYSTORE, clusterName));
     requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.LIVEINSTANCES, clusterName));
     requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.INSTANCES, clusterName));
@@ -94,7 +96,7 @@ public final class ZKUtil {
     if (type == InstanceType.PARTICIPANT || type == InstanceType.CONTROLLER_PARTICIPANT) {
       ArrayList<String> requiredPaths = new ArrayList<String>();
       requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
-          ConfigScopeProperty.PARTICIPANT.toString(), instanceName));
+          HelixConfigScope.ConfigScopeProperty.PARTICIPANT.toString(), instanceName));
       requiredPaths.add(PropertyPathBuilder
           .getPath(PropertyType.MESSAGES, clusterName, instanceName));
       requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.CURRENTSTATES, clusterName,

http://git-wip-us.apache.org/repos/asf/helix/blob/55b84465/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index a639cd0..97703f7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -18,11 +18,8 @@ package org.apache.helix.task;
  * specific language governing permissions and limitations
  * under the License.
  */
-
-import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -30,26 +27,14 @@ import java.util.Map;
 import java.util.Set;
 
 import org.I0Itec.zkclient.DataUpdater;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.OptionGroup;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -71,32 +56,6 @@ import com.google.common.collect.Sets;
  * CLI for scheduling/canceling workflows
  */
 public class TaskDriver {
-  /** For logging */
-  private static final Logger LOG = Logger.getLogger(TaskDriver.class);
-
-  /** Required option name for Helix endpoint */
-  private static final String ZK_ADDRESS = "zk";
-
-  /** Required option name for cluster against which to run task */
-  private static final String CLUSTER_NAME_OPTION = "cluster";
-
-  /** Required option name for task resource within target cluster */
-  private static final String RESOURCE_OPTION = "resource";
-
-  /** Field for specifying a workflow file when starting a job */
-  private static final String WORKFLOW_FILE_OPTION = "file";
-
-  /** Default time out for monitoring workflow or job state */
-  private final static int _defaultTimeout = 2 * 60 * 1000; /* 2 mins */
-
-
-  private final HelixDataAccessor _accessor;
-  private final ConfigAccessor _cfgAccessor;
-  private final HelixPropertyStore<ZNRecord> _propertyStore;
-  private final HelixAdmin _admin;
-  private final String _clusterName;
-
-  /** Commands which may be parsed from the first argument to main */
   public enum DriverCommand {
     start,
     stop,
@@ -107,9 +66,20 @@ public class TaskDriver {
     clean
   }
 
+  /** For logging */
+  private static final Logger LOG = Logger.getLogger(TaskDriver.class);
+
+  /** Default time out for monitoring workflow or job state */
+  private final static int _defaultTimeout = 3 * 60 * 1000; /* 3 mins */
+
+  private final HelixDataAccessor _accessor;
+  private final HelixPropertyStore<ZNRecord> _propertyStore;
+  private final HelixAdmin _admin;
+  private final String _clusterName;
+
   public TaskDriver(HelixManager manager) {
-    this(manager.getClusterManagmentTool(), manager.getHelixDataAccessor(), manager
-        .getConfigAccessor(), manager.getHelixPropertyStore(), manager.getClusterName());
+    this(manager.getClusterManagmentTool(), manager.getHelixDataAccessor(),
+        manager.getHelixPropertyStore(), manager.getClusterName());
   }
 
   public TaskDriver(ZkClient client, String clusterName) {
@@ -118,80 +88,24 @@ public class TaskDriver {
 
   public TaskDriver(ZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) {
     this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor),
-        new ConfigAccessor(client), new ZkHelixPropertyStore<ZNRecord>(baseAccessor,
-            PropertyPathBuilder.getPath(PropertyType.PROPERTYSTORE, clusterName), null), clusterName);
+        new ZkHelixPropertyStore<ZNRecord>(baseAccessor,
+            PropertyPathBuilder.propertyStore(clusterName), null), clusterName);
   }
 
+  @Deprecated
   public TaskDriver(HelixAdmin admin, HelixDataAccessor accessor, ConfigAccessor cfgAccessor,
       HelixPropertyStore<ZNRecord> propertyStore, String clusterName) {
+    this(admin, accessor, propertyStore, clusterName);
+  }
+
+  public TaskDriver(HelixAdmin admin, HelixDataAccessor accessor,
+      HelixPropertyStore<ZNRecord> propertyStore, String clusterName) {
     _admin = admin;
     _accessor = accessor;
-    _cfgAccessor = cfgAccessor;
     _propertyStore = propertyStore;
     _clusterName = clusterName;
   }
 
-  /**
-   * Parses the first argument as a driver command and the rest of the
-   * arguments are parsed based on that command. Constructs a Helix
-   * message and posts it to the controller
-   */
-  public static void main(String[] args) throws Exception {
-    String[] cmdArgs = Arrays.copyOfRange(args, 1, args.length);
-    CommandLine cl = parseOptions(cmdArgs, constructOptions(), args[0]);
-    String zkAddr = cl.getOptionValue(ZK_ADDRESS);
-    String clusterName = cl.getOptionValue(CLUSTER_NAME_OPTION);
-    String resource = cl.getOptionValue(RESOURCE_OPTION);
-
-    if (zkAddr == null || clusterName == null || resource == null) {
-      printUsage(constructOptions(), "[cmd]");
-      throw new IllegalArgumentException(
-          "zk, cluster, and resource must all be non-null for all commands");
-    }
-
-    HelixManager helixMgr =
-        HelixManagerFactory.getZKHelixManager(clusterName, "Admin", InstanceType.ADMINISTRATOR,
-            zkAddr);
-    helixMgr.connect();
-    TaskDriver driver = new TaskDriver(helixMgr);
-    try {
-      DriverCommand cmd = DriverCommand.valueOf(args[0]);
-      switch (cmd) {
-      case start:
-        if (cl.hasOption(WORKFLOW_FILE_OPTION)) {
-          driver.start(Workflow.parse(new File(cl.getOptionValue(WORKFLOW_FILE_OPTION))));
-        } else {
-          throw new IllegalArgumentException("Workflow file is required to start flow.");
-        }
-        break;
-      case stop:
-        driver.setWorkflowTargetState(resource, TargetState.STOP);
-        break;
-      case resume:
-        driver.setWorkflowTargetState(resource, TargetState.START);
-        break;
-      case delete:
-        driver.setWorkflowTargetState(resource, TargetState.DELETE);
-        break;
-      case list:
-        driver.list(resource);
-        break;
-      case flush:
-        driver.flushQueue(resource);
-        break;
-      case clean:
-        driver.cleanupJobQueue(resource);
-        break;
-      default:
-        throw new IllegalArgumentException("Unknown command " + args[0]);
-      }
-    } catch (IllegalArgumentException e) {
-      LOG.error("Unknown driver command " + args[0]);
-      throw e;
-    }
-
-    helixMgr.disconnect();
-  }
 
   /** Schedules a new workflow
    *
@@ -284,7 +198,7 @@ public class TaskDriver {
   }
 
   /**
-   * Flushes a named job queue
+   * Remove all jobs in a job queue
    *
    * @param queueName
    * @throws Exception
@@ -626,8 +540,8 @@ public class TaskDriver {
   }
 
   /**
-   * Clean up final state jobs (ABORTED, FAILED, COMPLETED),
-   * which will consume the capacity, in job queue
+   * Remove all jobs that are in final states (ABORTED, FAILED, COMPLETED) from the job queue.
+   * The job config, job context will be removed from Zookeeper.
    *
    * @param queueName The name of job queue
    */
@@ -688,7 +602,6 @@ public class TaskDriver {
     return is;
   }
 
-
   /**
    * Add new job config to cluster
    */
@@ -881,59 +794,6 @@ public class TaskDriver {
     return workflowConfigMap;
   }
 
-  public void list(String resource) {
-    WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_accessor, resource);
-    if (wCfg == null) {
-      LOG.error("Workflow " + resource + " does not exist!");
-      return;
-    }
-    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, resource);
-
-    LOG.info("Workflow " + resource + " consists of the following tasks: "
-        + wCfg.getJobDag().getAllNodes());
-    String workflowState =
-        (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name();
-    LOG.info("Current state of workflow is " + workflowState);
-    LOG.info("Job states are: ");
-    LOG.info("-------");
-    for (String job : wCfg.getJobDag().getAllNodes()) {
-      TaskState jobState = (wCtx != null) ? wCtx.getJobState(job) : TaskState.NOT_STARTED;
-      LOG.info("Job " + job + " is " + jobState);
-
-      // fetch job information
-      JobConfig jCfg = TaskUtil.getJobCfg(_accessor, job);
-      JobContext jCtx = TaskUtil.getJobContext(_propertyStore, job);
-      if (jCfg == null || jCtx == null) {
-        LOG.info("-------");
-        continue;
-      }
-
-      // calculate taskPartitions
-      List<Integer> partitions = Lists.newArrayList(jCtx.getPartitionSet());
-      Collections.sort(partitions);
-
-      // report status
-      for (Integer partition : partitions) {
-        String taskId = jCtx.getTaskIdForPartition(partition);
-        taskId = (taskId != null) ? taskId : jCtx.getTargetForPartition(partition);
-        LOG.info("Task: " + taskId);
-        TaskConfig taskConfig = jCfg.getTaskConfig(taskId);
-        if (taskConfig != null) {
-          LOG.info("Configuration: " + taskConfig.getConfigMap());
-        }
-        TaskPartitionState state = jCtx.getPartitionState(partition);
-        state = (state != null) ? state : TaskPartitionState.INIT;
-        LOG.info("State: " + state);
-        String assignedParticipant = jCtx.getAssignedParticipant(partition);
-        if (assignedParticipant != null) {
-          LOG.info("Assigned participant: " + assignedParticipant);
-        }
-        LOG.info("-------");
-      }
-      LOG.info("-------");
-    }
-  }
-
   /**
    * This call will be blocked until either workflow reaches to one of the state specified
    * in the arguments, or timeout happens. If timeout happens, then it will throw a HelixException
@@ -1051,104 +911,4 @@ public class TaskDriver {
       throws InterruptedException {
     return pollForJobState(workflowName, jobName, _defaultTimeout, states);
   }
-
-  /** Constructs options set for all basic control messages */
-  private static Options constructOptions() {
-    Options options = new Options();
-    options.addOptionGroup(contructGenericRequiredOptionGroup());
-    options.addOptionGroup(constructStartOptionGroup());
-    return options;
-  }
-
-  /** Constructs option group containing options required by all drivable jobs */
-  @SuppressWarnings("static-access")
-  private static OptionGroup contructGenericRequiredOptionGroup() {
-    Option zkAddressOption =
-        OptionBuilder.isRequired().withLongOpt(ZK_ADDRESS)
-            .withDescription("ZK address managing cluster").create();
-    zkAddressOption.setArgs(1);
-    zkAddressOption.setArgName("zkAddress");
-
-    Option clusterNameOption =
-        OptionBuilder.isRequired().withLongOpt(CLUSTER_NAME_OPTION).withDescription("Cluster name")
-            .create();
-    clusterNameOption.setArgs(1);
-    clusterNameOption.setArgName("clusterName");
-
-    Option taskResourceOption =
-        OptionBuilder.isRequired().withLongOpt(RESOURCE_OPTION)
-            .withDescription("Workflow or job name").create();
-    taskResourceOption.setArgs(1);
-    taskResourceOption.setArgName("resourceName");
-
-    OptionGroup group = new OptionGroup();
-    group.addOption(zkAddressOption);
-    group.addOption(clusterNameOption);
-    group.addOption(taskResourceOption);
-    return group;
-  }
-
-  /** Constructs option group containing options required by all drivable jobs */
-  private static OptionGroup constructStartOptionGroup() {
-    @SuppressWarnings("static-access")
-    Option workflowFileOption =
-        OptionBuilder.withLongOpt(WORKFLOW_FILE_OPTION)
-            .withDescription("Local file describing workflow").create();
-    workflowFileOption.setArgs(1);
-    workflowFileOption.setArgName("workflowFile");
-
-    OptionGroup group = new OptionGroup();
-    group.addOption(workflowFileOption);
-    return group;
-  }
-
-  /** Attempts to parse options for given command, printing usage under failure */
-  private static CommandLine parseOptions(String[] args, Options options, String cmdStr) {
-    CommandLineParser cliParser = new GnuParser();
-    CommandLine cmd = null;
-
-    try {
-      cmd = cliParser.parse(options, args);
-    } catch (ParseException pe) {
-      LOG.error("CommandLineClient: failed to parse command-line options: " + pe.toString());
-      printUsage(options, cmdStr);
-      System.exit(1);
-    }
-    boolean ret = checkOptionArgsNumber(cmd.getOptions());
-    if (!ret) {
-      printUsage(options, cmdStr);
-      System.exit(1);
-    }
-
-    return cmd;
-  }
-
-  /** Ensures options argument counts are correct */
-  private static boolean checkOptionArgsNumber(Option[] options) {
-    for (Option option : options) {
-      int argNb = option.getArgs();
-      String[] args = option.getValues();
-      if (argNb == 0) {
-        if (args != null && args.length > 0) {
-          System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was "
-              + Arrays.toString(args) + ")");
-          return false;
-        }
-      } else {
-        if (args == null || args.length != argNb) {
-          System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was "
-              + Arrays.toString(args) + ")");
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-
-  /** Displays CLI usage for given option set and command name */
-  private static void printUsage(Options cliOptions, String cmd) {
-    HelpFormatter helpFormatter = new HelpFormatter();
-    helpFormatter.setWidth(1000);
-    helpFormatter.printHelp("java " + TaskDriver.class.getName() + " " + cmd, cliOptions);
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/55b84465/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 11c6a61..830f93a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -482,8 +482,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
 
   /**
-   * Cleans up workflow configs and workflow contexts associated with this workflow,
-   * including all job-level configs and context, plus workflow-level information.
+   * Cleans up job configs and job contexts associated with this job,
+   * including all job-level configs and context, plus the job info in the workflow context.
    */
   private void cleanupJob(final String job, String workflow) {
     LOG.info("Cleaning up job: " + job + " in workflow: " + workflow);

http://git-wip-us.apache.org/repos/asf/helix/blob/55b84465/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java b/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
new file mode 100644
index 0000000..7688017
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
@@ -0,0 +1,284 @@
+package org.apache.helix.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.apache.log4j.Logger;
+
+/**
+ * CLI for operating workflows and jobs.
+ * This is a wrapper of TaskDriver instance to allow command line changes of workflows and jobs.
+ */
+public class TaskAdmin {
+  /** For logging */
+  private static final Logger LOG = Logger.getLogger(TaskAdmin.class);
+
+  /** Required option name for Helix endpoint */
+  private static final String ZK_ADDRESS = "zk";
+
+  /** Required option name for cluster against which to run task */
+  private static final String CLUSTER_NAME_OPTION = "cluster";
+
+  /** Required option name for task resource within target cluster */
+  private static final String RESOURCE_OPTION = "resource";
+
+  /** Field for specifying a workflow file when starting a job */
+  private static final String WORKFLOW_FILE_OPTION = "file";
+
+  /**
+   * Parses the first argument as a driver command and the rest of the
+   * arguments are parsed based on that command. Constructs a Helix
+   * message and posts it to the controller
+   */
+  public static void main(String[] args) throws Exception {
+    String[] cmdArgs = Arrays.copyOfRange(args, 1, args.length);
+    CommandLine cl = parseOptions(cmdArgs, constructOptions(), args[0]);
+    String zkAddr = cl.getOptionValue(ZK_ADDRESS);
+    String clusterName = cl.getOptionValue(CLUSTER_NAME_OPTION);
+    String workflow = cl.getOptionValue(RESOURCE_OPTION);
+
+    if (zkAddr == null || clusterName == null || workflow == null) {
+      printUsage(constructOptions(), "[cmd]");
+      throw new IllegalArgumentException(
+          "zk, cluster, and resource must all be non-null for all commands");
+    }
+
+    HelixManager helixMgr =
+        HelixManagerFactory.getZKHelixManager(clusterName, "Admin", InstanceType.ADMINISTRATOR,
+            zkAddr);
+    helixMgr.connect();
+    TaskDriver driver = new TaskDriver(helixMgr);
+    try {
+      TaskDriver.DriverCommand cmd = TaskDriver.DriverCommand.valueOf(args[0]);
+      switch (cmd) {
+      case start:
+        if (cl.hasOption(WORKFLOW_FILE_OPTION)) {
+          driver.start(Workflow.parse(new File(cl.getOptionValue(WORKFLOW_FILE_OPTION))));
+        } else {
+          throw new IllegalArgumentException("Workflow file is required to start flow.");
+        }
+        break;
+      case stop:
+        driver.stop(workflow);
+        break;
+      case resume:
+        driver.resume(workflow);
+        break;
+      case delete:
+        driver.delete(workflow);
+        break;
+      case list:
+        list(driver, workflow);
+        break;
+      case flush:
+        driver.flushQueue(workflow);
+        break;
+      case clean:
+        driver.cleanupJobQueue(workflow);
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown command " + args[0]);
+      }
+    } catch (IllegalArgumentException e) {
+      LOG.error("Unknown driver command " + args[0]);
+      throw e;
+    }
+
+    helixMgr.disconnect();
+  }
+
+  private static void list(TaskDriver taskDriver, String workflow) {
+    WorkflowConfig wCfg = taskDriver.getWorkflowConfig(workflow);
+    if (wCfg == null) {
+      LOG.error("Workflow " + workflow + " does not exist!");
+      return;
+    }
+    WorkflowContext wCtx = taskDriver.getWorkflowContext(workflow);
+
+    LOG.info("Workflow " + workflow + " consists of the following tasks: " + wCfg.getJobDag()
+        .getAllNodes());
+    String workflowState =
+        (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name();
+    LOG.info("Current state of workflow is " + workflowState);
+    LOG.info("Job states are: ");
+    LOG.info("-------");
+    for (String job : wCfg.getJobDag().getAllNodes()) {
+      TaskState jobState = (wCtx != null) ? wCtx.getJobState(job) : TaskState.NOT_STARTED;
+      LOG.info("Job " + job + " is " + jobState);
+
+      // fetch job information
+      JobConfig jCfg = taskDriver.getJobConfig(job);
+      JobContext jCtx = taskDriver.getJobContext(job);
+      if (jCfg == null || jCtx == null) {
+        LOG.info("-------");
+        continue;
+      }
+
+      // calculate taskPartitions
+      List<Integer> partitions = Lists.newArrayList(jCtx.getPartitionSet());
+      Collections.sort(partitions);
+
+      // report status
+      for (Integer partition : partitions) {
+        String taskId = jCtx.getTaskIdForPartition(partition);
+        taskId = (taskId != null) ? taskId : jCtx.getTargetForPartition(partition);
+        LOG.info("Task: " + taskId);
+        TaskConfig taskConfig = jCfg.getTaskConfig(taskId);
+        if (taskConfig != null) {
+          LOG.info("Configuration: " + taskConfig.getConfigMap());
+        }
+        TaskPartitionState state = jCtx.getPartitionState(partition);
+        state = (state != null) ? state : TaskPartitionState.INIT;
+        LOG.info("State: " + state);
+        String assignedParticipant = jCtx.getAssignedParticipant(partition);
+        if (assignedParticipant != null) {
+          LOG.info("Assigned participant: " + assignedParticipant);
+        }
+        LOG.info("-------");
+      }
+      LOG.info("-------");
+    }
+  }
+
+  /** Constructs option group containing options required by all drivable jobs */
+  @SuppressWarnings("static-access")
+  private static OptionGroup contructGenericRequiredOptionGroup() {
+    Option zkAddressOption =
+        OptionBuilder.isRequired().withLongOpt(ZK_ADDRESS)
+            .withDescription("ZK address managing cluster").create();
+    zkAddressOption.setArgs(1);
+    zkAddressOption.setArgName("zkAddress");
+
+    Option clusterNameOption =
+        OptionBuilder.isRequired().withLongOpt(CLUSTER_NAME_OPTION).withDescription("Cluster name")
+            .create();
+    clusterNameOption.setArgs(1);
+    clusterNameOption.setArgName("clusterName");
+
+    Option taskResourceOption =
+        OptionBuilder.isRequired().withLongOpt(RESOURCE_OPTION)
+            .withDescription("Workflow or job name").create();
+    taskResourceOption.setArgs(1);
+    taskResourceOption.setArgName("resourceName");
+
+    OptionGroup group = new OptionGroup();
+    group.addOption(zkAddressOption);
+    group.addOption(clusterNameOption);
+    group.addOption(taskResourceOption);
+    return group;
+  }
+
+  /** Constructs options set for all basic control messages */
+  private static Options constructOptions() {
+    Options options = new Options();
+    options.addOptionGroup(contructGenericRequiredOptionGroup());
+    options.addOptionGroup(constructStartOptionGroup());
+    return options;
+  }
+
+  /** Constructs option group containing options required by all drivable jobs */
+  private static OptionGroup constructStartOptionGroup() {
+    @SuppressWarnings("static-access")
+    Option workflowFileOption =
+        OptionBuilder.withLongOpt(WORKFLOW_FILE_OPTION)
+            .withDescription("Local file describing workflow").create();
+    workflowFileOption.setArgs(1);
+    workflowFileOption.setArgName("workflowFile");
+
+    OptionGroup group = new OptionGroup();
+    group.addOption(workflowFileOption);
+    return group;
+  }
+
+  /** Attempts to parse options for given command, printing usage under failure */
+  private static CommandLine parseOptions(String[] args, Options options, String cmdStr) {
+    CommandLineParser cliParser = new GnuParser();
+    CommandLine cmd = null;
+
+    try {
+      cmd = cliParser.parse(options, args);
+    } catch (ParseException pe) {
+      LOG.error("CommandLineClient: failed to parse command-line options: " + pe.toString());
+      printUsage(options, cmdStr);
+      System.exit(1);
+    }
+    boolean ret = checkOptionArgsNumber(cmd.getOptions());
+    if (!ret) {
+      printUsage(options, cmdStr);
+      System.exit(1);
+    }
+
+    return cmd;
+  }
+
+  /** Ensures options argument counts are correct */
+  private static boolean checkOptionArgsNumber(Option[] options) {
+    for (Option option : options) {
+      int argNb = option.getArgs();
+      String[] args = option.getValues();
+      if (argNb == 0) {
+        if (args != null && args.length > 0) {
+          System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was "
+              + Arrays.toString(args) + ")");
+          return false;
+        }
+      } else {
+        if (args == null || args.length != argNb) {
+          System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was "
+              + Arrays.toString(args) + ")");
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /** Displays CLI usage for given option set and command name */
+  private static void printUsage(Options cliOptions, String cmd) {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.setWidth(1000);
+    helpFormatter.printHelp("java " + TaskAdmin.class.getName() + " " + cmd, cliOptions);
+  }
+}