You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/10/03 06:01:26 UTC
[3/3] helix git commit: Clean up TaskDriver,
move all command line support code to a new tool class TaskAdmin.java,
and build a commandline shell script from TaskAdmin.
Clean up TaskDriver, move all command line support code to a new tool class TaskAdmin.java, and build a commandline shell script from TaskAdmin.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/55b84465
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/55b84465
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/55b84465
Branch: refs/heads/master
Commit: 55b844657947dea661f6067dbd32237b0ed6afe2
Parents: 4e48719
Author: Lei Xia <lx...@linkedin.com>
Authored: Thu Jan 26 13:46:00 2017 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Oct 2 19:06:41 2017 -0700
----------------------------------------------------------------------
helix-core/pom.xml | 4 +
.../stages/PersistAssignmentStage.java | 2 +-
.../org/apache/helix/manager/zk/ZKUtil.java | 10 +-
.../java/org/apache/helix/task/TaskDriver.java | 288 ++-----------------
.../apache/helix/task/WorkflowRebalancer.java | 4 +-
.../java/org/apache/helix/tools/TaskAdmin.java | 284 ++++++++++++++++++
6 files changed, 321 insertions(+), 271 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/55b84465/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index 28e115b..789a5e1 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -232,6 +232,10 @@ under the License.
<mainClass>org.apache.helix.tools.ZkGrep</mainClass>
<name>zkgrep</name>
</program>
+ <program>
+ <mainClass>org.apache.helix.tools.TaskAdmin</mainClass>
+ <name>task-admin</name>
+ </program>
</programs>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/helix/blob/55b84465/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index b55a838..cd320a4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -96,7 +96,7 @@ public class PersistAssignmentStage extends AbstractBaseStage {
partitionStateMap = intermediateAssignment.getPartitionStateMap(resourceId);
}
- Map<Partition, Map<String, String>> assignmentToPersist = partitionStateMap.getStateMap());
+ Map<Partition, Map<String, String>> assignmentToPersist = partitionStateMap.getStateMap();
if (assignmentToPersist != null && hasInstanceMapChanged(assignmentToPersist, idealState)) {
for (Partition partition : assignmentToPersist.keySet()) {
http://git-wip-us.apache.org/repos/asf/helix/blob/55b84465/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
index 38b74cb..7300e07 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
@@ -27,7 +27,9 @@ import org.I0Itec.zkclient.DataUpdater;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
+import org.apache.helix.model.HelixConfigScope;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
@@ -52,11 +54,11 @@ public final class ZKUtil {
ArrayList<String> requiredPaths = new ArrayList<String>();
requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.IDEALSTATES, clusterName));
requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
- ConfigScopeProperty.CLUSTER.toString(), clusterName));
+ HelixConfigScope.ConfigScopeProperty.CLUSTER.toString(), clusterName));
requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
- ConfigScopeProperty.PARTICIPANT.toString()));
+ HelixConfigScope.ConfigScopeProperty.PARTICIPANT.toString()));
requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
- ConfigScopeProperty.RESOURCE.toString()));
+ HelixConfigScope.ConfigScopeProperty.RESOURCE.toString()));
requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.PROPERTYSTORE, clusterName));
requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.LIVEINSTANCES, clusterName));
requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.INSTANCES, clusterName));
@@ -94,7 +96,7 @@ public final class ZKUtil {
if (type == InstanceType.PARTICIPANT || type == InstanceType.CONTROLLER_PARTICIPANT) {
ArrayList<String> requiredPaths = new ArrayList<String>();
requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
- ConfigScopeProperty.PARTICIPANT.toString(), instanceName));
+ HelixConfigScope.ConfigScopeProperty.PARTICIPANT.toString(), instanceName));
requiredPaths.add(PropertyPathBuilder
.getPath(PropertyType.MESSAGES, clusterName, instanceName));
requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.CURRENTSTATES, clusterName,
http://git-wip-us.apache.org/repos/asf/helix/blob/55b84465/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index a639cd0..97703f7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -18,11 +18,8 @@ package org.apache.helix.task;
* specific language governing permissions and limitations
* under the License.
*/
-
-import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -30,26 +27,14 @@ import java.util.Map;
import java.util.Set;
import org.I0Itec.zkclient.DataUpdater;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.OptionGroup;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -71,32 +56,6 @@ import com.google.common.collect.Sets;
* CLI for scheduling/canceling workflows
*/
public class TaskDriver {
- /** For logging */
- private static final Logger LOG = Logger.getLogger(TaskDriver.class);
-
- /** Required option name for Helix endpoint */
- private static final String ZK_ADDRESS = "zk";
-
- /** Required option name for cluster against which to run task */
- private static final String CLUSTER_NAME_OPTION = "cluster";
-
- /** Required option name for task resource within target cluster */
- private static final String RESOURCE_OPTION = "resource";
-
- /** Field for specifying a workflow file when starting a job */
- private static final String WORKFLOW_FILE_OPTION = "file";
-
- /** Default time out for monitoring workflow or job state */
- private final static int _defaultTimeout = 2 * 60 * 1000; /* 2 mins */
-
-
- private final HelixDataAccessor _accessor;
- private final ConfigAccessor _cfgAccessor;
- private final HelixPropertyStore<ZNRecord> _propertyStore;
- private final HelixAdmin _admin;
- private final String _clusterName;
-
- /** Commands which may be parsed from the first argument to main */
public enum DriverCommand {
start,
stop,
@@ -107,9 +66,20 @@ public class TaskDriver {
clean
}
+ /** For logging */
+ private static final Logger LOG = Logger.getLogger(TaskDriver.class);
+
+ /** Default time out for monitoring workflow or job state */
+ private final static int _defaultTimeout = 3 * 60 * 1000; /* 3 mins */
+
+ private final HelixDataAccessor _accessor;
+ private final HelixPropertyStore<ZNRecord> _propertyStore;
+ private final HelixAdmin _admin;
+ private final String _clusterName;
+
public TaskDriver(HelixManager manager) {
- this(manager.getClusterManagmentTool(), manager.getHelixDataAccessor(), manager
- .getConfigAccessor(), manager.getHelixPropertyStore(), manager.getClusterName());
+ this(manager.getClusterManagmentTool(), manager.getHelixDataAccessor(),
+ manager.getHelixPropertyStore(), manager.getClusterName());
}
public TaskDriver(ZkClient client, String clusterName) {
@@ -118,80 +88,24 @@ public class TaskDriver {
public TaskDriver(ZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) {
this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor),
- new ConfigAccessor(client), new ZkHelixPropertyStore<ZNRecord>(baseAccessor,
- PropertyPathBuilder.getPath(PropertyType.PROPERTYSTORE, clusterName), null), clusterName);
+ new ZkHelixPropertyStore<ZNRecord>(baseAccessor,
+ PropertyPathBuilder.propertyStore(clusterName), null), clusterName);
}
+ @Deprecated
public TaskDriver(HelixAdmin admin, HelixDataAccessor accessor, ConfigAccessor cfgAccessor,
HelixPropertyStore<ZNRecord> propertyStore, String clusterName) {
+ this(admin, accessor, propertyStore, clusterName);
+ }
+
+ public TaskDriver(HelixAdmin admin, HelixDataAccessor accessor,
+ HelixPropertyStore<ZNRecord> propertyStore, String clusterName) {
_admin = admin;
_accessor = accessor;
- _cfgAccessor = cfgAccessor;
_propertyStore = propertyStore;
_clusterName = clusterName;
}
- /**
- * Parses the first argument as a driver command and the rest of the
- * arguments are parsed based on that command. Constructs a Helix
- * message and posts it to the controller
- */
- public static void main(String[] args) throws Exception {
- String[] cmdArgs = Arrays.copyOfRange(args, 1, args.length);
- CommandLine cl = parseOptions(cmdArgs, constructOptions(), args[0]);
- String zkAddr = cl.getOptionValue(ZK_ADDRESS);
- String clusterName = cl.getOptionValue(CLUSTER_NAME_OPTION);
- String resource = cl.getOptionValue(RESOURCE_OPTION);
-
- if (zkAddr == null || clusterName == null || resource == null) {
- printUsage(constructOptions(), "[cmd]");
- throw new IllegalArgumentException(
- "zk, cluster, and resource must all be non-null for all commands");
- }
-
- HelixManager helixMgr =
- HelixManagerFactory.getZKHelixManager(clusterName, "Admin", InstanceType.ADMINISTRATOR,
- zkAddr);
- helixMgr.connect();
- TaskDriver driver = new TaskDriver(helixMgr);
- try {
- DriverCommand cmd = DriverCommand.valueOf(args[0]);
- switch (cmd) {
- case start:
- if (cl.hasOption(WORKFLOW_FILE_OPTION)) {
- driver.start(Workflow.parse(new File(cl.getOptionValue(WORKFLOW_FILE_OPTION))));
- } else {
- throw new IllegalArgumentException("Workflow file is required to start flow.");
- }
- break;
- case stop:
- driver.setWorkflowTargetState(resource, TargetState.STOP);
- break;
- case resume:
- driver.setWorkflowTargetState(resource, TargetState.START);
- break;
- case delete:
- driver.setWorkflowTargetState(resource, TargetState.DELETE);
- break;
- case list:
- driver.list(resource);
- break;
- case flush:
- driver.flushQueue(resource);
- break;
- case clean:
- driver.cleanupJobQueue(resource);
- break;
- default:
- throw new IllegalArgumentException("Unknown command " + args[0]);
- }
- } catch (IllegalArgumentException e) {
- LOG.error("Unknown driver command " + args[0]);
- throw e;
- }
-
- helixMgr.disconnect();
- }
/** Schedules a new workflow
*
@@ -284,7 +198,7 @@ public class TaskDriver {
}
/**
- * Flushes a named job queue
+ * Remove all jobs in a job queue
*
* @param queueName
* @throws Exception
@@ -626,8 +540,8 @@ public class TaskDriver {
}
/**
- * Clean up final state jobs (ABORTED, FAILED, COMPLETED),
- * which will consume the capacity, in job queue
+ * Remove all jobs that are in final states (ABORTED, FAILED, COMPLETED) from the job queue.
+ * The job config, job context will be removed from Zookeeper.
*
* @param queueName The name of job queue
*/
@@ -688,7 +602,6 @@ public class TaskDriver {
return is;
}
-
/**
* Add new job config to cluster
*/
@@ -881,59 +794,6 @@ public class TaskDriver {
return workflowConfigMap;
}
- public void list(String resource) {
- WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_accessor, resource);
- if (wCfg == null) {
- LOG.error("Workflow " + resource + " does not exist!");
- return;
- }
- WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, resource);
-
- LOG.info("Workflow " + resource + " consists of the following tasks: "
- + wCfg.getJobDag().getAllNodes());
- String workflowState =
- (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name();
- LOG.info("Current state of workflow is " + workflowState);
- LOG.info("Job states are: ");
- LOG.info("-------");
- for (String job : wCfg.getJobDag().getAllNodes()) {
- TaskState jobState = (wCtx != null) ? wCtx.getJobState(job) : TaskState.NOT_STARTED;
- LOG.info("Job " + job + " is " + jobState);
-
- // fetch job information
- JobConfig jCfg = TaskUtil.getJobCfg(_accessor, job);
- JobContext jCtx = TaskUtil.getJobContext(_propertyStore, job);
- if (jCfg == null || jCtx == null) {
- LOG.info("-------");
- continue;
- }
-
- // calculate taskPartitions
- List<Integer> partitions = Lists.newArrayList(jCtx.getPartitionSet());
- Collections.sort(partitions);
-
- // report status
- for (Integer partition : partitions) {
- String taskId = jCtx.getTaskIdForPartition(partition);
- taskId = (taskId != null) ? taskId : jCtx.getTargetForPartition(partition);
- LOG.info("Task: " + taskId);
- TaskConfig taskConfig = jCfg.getTaskConfig(taskId);
- if (taskConfig != null) {
- LOG.info("Configuration: " + taskConfig.getConfigMap());
- }
- TaskPartitionState state = jCtx.getPartitionState(partition);
- state = (state != null) ? state : TaskPartitionState.INIT;
- LOG.info("State: " + state);
- String assignedParticipant = jCtx.getAssignedParticipant(partition);
- if (assignedParticipant != null) {
- LOG.info("Assigned participant: " + assignedParticipant);
- }
- LOG.info("-------");
- }
- LOG.info("-------");
- }
- }
-
/**
* This call will be blocked until either workflow reaches to one of the state specified
* in the arguments, or timeout happens. If timeout happens, then it will throw a HelixException
@@ -1051,104 +911,4 @@ public class TaskDriver {
throws InterruptedException {
return pollForJobState(workflowName, jobName, _defaultTimeout, states);
}
-
- /** Constructs options set for all basic control messages */
- private static Options constructOptions() {
- Options options = new Options();
- options.addOptionGroup(contructGenericRequiredOptionGroup());
- options.addOptionGroup(constructStartOptionGroup());
- return options;
- }
-
- /** Constructs option group containing options required by all drivable jobs */
- @SuppressWarnings("static-access")
- private static OptionGroup contructGenericRequiredOptionGroup() {
- Option zkAddressOption =
- OptionBuilder.isRequired().withLongOpt(ZK_ADDRESS)
- .withDescription("ZK address managing cluster").create();
- zkAddressOption.setArgs(1);
- zkAddressOption.setArgName("zkAddress");
-
- Option clusterNameOption =
- OptionBuilder.isRequired().withLongOpt(CLUSTER_NAME_OPTION).withDescription("Cluster name")
- .create();
- clusterNameOption.setArgs(1);
- clusterNameOption.setArgName("clusterName");
-
- Option taskResourceOption =
- OptionBuilder.isRequired().withLongOpt(RESOURCE_OPTION)
- .withDescription("Workflow or job name").create();
- taskResourceOption.setArgs(1);
- taskResourceOption.setArgName("resourceName");
-
- OptionGroup group = new OptionGroup();
- group.addOption(zkAddressOption);
- group.addOption(clusterNameOption);
- group.addOption(taskResourceOption);
- return group;
- }
-
- /** Constructs option group containing options required by all drivable jobs */
- private static OptionGroup constructStartOptionGroup() {
- @SuppressWarnings("static-access")
- Option workflowFileOption =
- OptionBuilder.withLongOpt(WORKFLOW_FILE_OPTION)
- .withDescription("Local file describing workflow").create();
- workflowFileOption.setArgs(1);
- workflowFileOption.setArgName("workflowFile");
-
- OptionGroup group = new OptionGroup();
- group.addOption(workflowFileOption);
- return group;
- }
-
- /** Attempts to parse options for given command, printing usage under failure */
- private static CommandLine parseOptions(String[] args, Options options, String cmdStr) {
- CommandLineParser cliParser = new GnuParser();
- CommandLine cmd = null;
-
- try {
- cmd = cliParser.parse(options, args);
- } catch (ParseException pe) {
- LOG.error("CommandLineClient: failed to parse command-line options: " + pe.toString());
- printUsage(options, cmdStr);
- System.exit(1);
- }
- boolean ret = checkOptionArgsNumber(cmd.getOptions());
- if (!ret) {
- printUsage(options, cmdStr);
- System.exit(1);
- }
-
- return cmd;
- }
-
- /** Ensures options argument counts are correct */
- private static boolean checkOptionArgsNumber(Option[] options) {
- for (Option option : options) {
- int argNb = option.getArgs();
- String[] args = option.getValues();
- if (argNb == 0) {
- if (args != null && args.length > 0) {
- System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was "
- + Arrays.toString(args) + ")");
- return false;
- }
- } else {
- if (args == null || args.length != argNb) {
- System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was "
- + Arrays.toString(args) + ")");
- return false;
- }
- }
- }
- return true;
- }
-
- /** Displays CLI usage for given option set and command name */
- private static void printUsage(Options cliOptions, String cmd) {
- HelpFormatter helpFormatter = new HelpFormatter();
- helpFormatter.setWidth(1000);
- helpFormatter.printHelp("java " + TaskDriver.class.getName() + " " + cmd, cliOptions);
- }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/55b84465/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 11c6a61..830f93a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -482,8 +482,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
/**
- * Cleans up workflow configs and workflow contexts associated with this workflow,
- * including all job-level configs and context, plus workflow-level information.
+ * Cleans up job configs and job contexts associated with this job,
+ * including all job-level configs and context, plus the job info in the workflow context.
*/
private void cleanupJob(final String job, String workflow) {
LOG.info("Cleaning up job: " + job + " in workflow: " + workflow);
http://git-wip-us.apache.org/repos/asf/helix/blob/55b84465/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java b/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
new file mode 100644
index 0000000..7688017
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
@@ -0,0 +1,284 @@
+package org.apache.helix.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.apache.log4j.Logger;
+
+/**
+ * CLI for operating workflows and jobs.
+ * This is a wrapper of TaskDriver instance to allow command line changes of workflows and jobs.
+ */
+public class TaskAdmin {
+ /** For logging */
+ private static final Logger LOG = Logger.getLogger(TaskAdmin.class);
+
+ /** Required option name for Helix endpoint */
+ private static final String ZK_ADDRESS = "zk";
+
+ /** Required option name for cluster against which to run task */
+ private static final String CLUSTER_NAME_OPTION = "cluster";
+
+ /** Required option name for task resource within target cluster */
+ private static final String RESOURCE_OPTION = "resource";
+
+ /** Field for specifying a workflow file when starting a job */
+ private static final String WORKFLOW_FILE_OPTION = "file";
+
+ /**
+ * Parses the first argument as a driver command and the rest of the
+ * arguments are parsed based on that command. Constructs a Helix
+ * message and posts it to the controller
+ */
+ public static void main(String[] args) throws Exception {
+ String[] cmdArgs = Arrays.copyOfRange(args, 1, args.length);
+ CommandLine cl = parseOptions(cmdArgs, constructOptions(), args[0]);
+ String zkAddr = cl.getOptionValue(ZK_ADDRESS);
+ String clusterName = cl.getOptionValue(CLUSTER_NAME_OPTION);
+ String workflow = cl.getOptionValue(RESOURCE_OPTION);
+
+ if (zkAddr == null || clusterName == null || workflow == null) {
+ printUsage(constructOptions(), "[cmd]");
+ throw new IllegalArgumentException(
+ "zk, cluster, and resource must all be non-null for all commands");
+ }
+
+ HelixManager helixMgr =
+ HelixManagerFactory.getZKHelixManager(clusterName, "Admin", InstanceType.ADMINISTRATOR,
+ zkAddr);
+ helixMgr.connect();
+ TaskDriver driver = new TaskDriver(helixMgr);
+ try {
+ TaskDriver.DriverCommand cmd = TaskDriver.DriverCommand.valueOf(args[0]);
+ switch (cmd) {
+ case start:
+ if (cl.hasOption(WORKFLOW_FILE_OPTION)) {
+ driver.start(Workflow.parse(new File(cl.getOptionValue(WORKFLOW_FILE_OPTION))));
+ } else {
+ throw new IllegalArgumentException("Workflow file is required to start flow.");
+ }
+ break;
+ case stop:
+ driver.stop(workflow);
+ break;
+ case resume:
+ driver.resume(workflow);
+ break;
+ case delete:
+ driver.delete(workflow);
+ break;
+ case list:
+ list(driver, workflow);
+ break;
+ case flush:
+ driver.flushQueue(workflow);
+ break;
+ case clean:
+ driver.cleanupJobQueue(workflow);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown command " + args[0]);
+ }
+ } catch (IllegalArgumentException e) {
+ LOG.error("Unknown driver command " + args[0]);
+ throw e;
+ }
+
+ helixMgr.disconnect();
+ }
+
+ private static void list(TaskDriver taskDriver, String workflow) {
+ WorkflowConfig wCfg = taskDriver.getWorkflowConfig(workflow);
+ if (wCfg == null) {
+ LOG.error("Workflow " + workflow + " does not exist!");
+ return;
+ }
+ WorkflowContext wCtx = taskDriver.getWorkflowContext(workflow);
+
+ LOG.info("Workflow " + workflow + " consists of the following tasks: " + wCfg.getJobDag()
+ .getAllNodes());
+ String workflowState =
+ (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name();
+ LOG.info("Current state of workflow is " + workflowState);
+ LOG.info("Job states are: ");
+ LOG.info("-------");
+ for (String job : wCfg.getJobDag().getAllNodes()) {
+ TaskState jobState = (wCtx != null) ? wCtx.getJobState(job) : TaskState.NOT_STARTED;
+ LOG.info("Job " + job + " is " + jobState);
+
+ // fetch job information
+ JobConfig jCfg = taskDriver.getJobConfig(job);
+ JobContext jCtx = taskDriver.getJobContext(job);
+ if (jCfg == null || jCtx == null) {
+ LOG.info("-------");
+ continue;
+ }
+
+ // calculate taskPartitions
+ List<Integer> partitions = Lists.newArrayList(jCtx.getPartitionSet());
+ Collections.sort(partitions);
+
+ // report status
+ for (Integer partition : partitions) {
+ String taskId = jCtx.getTaskIdForPartition(partition);
+ taskId = (taskId != null) ? taskId : jCtx.getTargetForPartition(partition);
+ LOG.info("Task: " + taskId);
+ TaskConfig taskConfig = jCfg.getTaskConfig(taskId);
+ if (taskConfig != null) {
+ LOG.info("Configuration: " + taskConfig.getConfigMap());
+ }
+ TaskPartitionState state = jCtx.getPartitionState(partition);
+ state = (state != null) ? state : TaskPartitionState.INIT;
+ LOG.info("State: " + state);
+ String assignedParticipant = jCtx.getAssignedParticipant(partition);
+ if (assignedParticipant != null) {
+ LOG.info("Assigned participant: " + assignedParticipant);
+ }
+ LOG.info("-------");
+ }
+ LOG.info("-------");
+ }
+ }
+
+ /** Constructs option group containing options required by all drivable jobs */
+ @SuppressWarnings("static-access")
+ private static OptionGroup contructGenericRequiredOptionGroup() {
+ Option zkAddressOption =
+ OptionBuilder.isRequired().withLongOpt(ZK_ADDRESS)
+ .withDescription("ZK address managing cluster").create();
+ zkAddressOption.setArgs(1);
+ zkAddressOption.setArgName("zkAddress");
+
+ Option clusterNameOption =
+ OptionBuilder.isRequired().withLongOpt(CLUSTER_NAME_OPTION).withDescription("Cluster name")
+ .create();
+ clusterNameOption.setArgs(1);
+ clusterNameOption.setArgName("clusterName");
+
+ Option taskResourceOption =
+ OptionBuilder.isRequired().withLongOpt(RESOURCE_OPTION)
+ .withDescription("Workflow or job name").create();
+ taskResourceOption.setArgs(1);
+ taskResourceOption.setArgName("resourceName");
+
+ OptionGroup group = new OptionGroup();
+ group.addOption(zkAddressOption);
+ group.addOption(clusterNameOption);
+ group.addOption(taskResourceOption);
+ return group;
+ }
+
+ /** Constructs options set for all basic control messages */
+ private static Options constructOptions() {
+ Options options = new Options();
+ options.addOptionGroup(contructGenericRequiredOptionGroup());
+ options.addOptionGroup(constructStartOptionGroup());
+ return options;
+ }
+
+ /** Constructs option group containing options required by all drivable jobs */
+ private static OptionGroup constructStartOptionGroup() {
+ @SuppressWarnings("static-access")
+ Option workflowFileOption =
+ OptionBuilder.withLongOpt(WORKFLOW_FILE_OPTION)
+ .withDescription("Local file describing workflow").create();
+ workflowFileOption.setArgs(1);
+ workflowFileOption.setArgName("workflowFile");
+
+ OptionGroup group = new OptionGroup();
+ group.addOption(workflowFileOption);
+ return group;
+ }
+
+ /** Attempts to parse options for given command, printing usage under failure */
+ private static CommandLine parseOptions(String[] args, Options options, String cmdStr) {
+ CommandLineParser cliParser = new GnuParser();
+ CommandLine cmd = null;
+
+ try {
+ cmd = cliParser.parse(options, args);
+ } catch (ParseException pe) {
+ LOG.error("CommandLineClient: failed to parse command-line options: " + pe.toString());
+ printUsage(options, cmdStr);
+ System.exit(1);
+ }
+ boolean ret = checkOptionArgsNumber(cmd.getOptions());
+ if (!ret) {
+ printUsage(options, cmdStr);
+ System.exit(1);
+ }
+
+ return cmd;
+ }
+
+ /** Ensures options argument counts are correct */
+ private static boolean checkOptionArgsNumber(Option[] options) {
+ for (Option option : options) {
+ int argNb = option.getArgs();
+ String[] args = option.getValues();
+ if (argNb == 0) {
+ if (args != null && args.length > 0) {
+ System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was "
+ + Arrays.toString(args) + ")");
+ return false;
+ }
+ } else {
+ if (args == null || args.length != argNb) {
+ System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was "
+ + Arrays.toString(args) + ")");
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /** Displays CLI usage for given option set and command name */
+ private static void printUsage(Options cliOptions, String cmd) {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.setWidth(1000);
+ helpFormatter.printHelp("java " + TaskAdmin.class.getName() + " " + cmd, cliOptions);
+ }
+}