You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/09/30 07:47:51 UTC
[2/2] falcon git commit: FALCON-592 Refactor FalconCLI to make it
more manageable. Contributed by Balu Vellanki.
FALCON-592 Refactor FalconCLI to make it more manageable. Contributed by Balu Vellanki.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d6965213
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d6965213
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d6965213
Branch: refs/heads/master
Commit: d6965213aa9fe28420d9e40dfcbd8dff07e27009
Parents: 8cc6810
Author: Ajay Yadava <aj...@gmail.com>
Authored: Wed Sep 30 11:17:00 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Wed Sep 30 11:17:00 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/falcon/cli/FalconAdminCLI.java | 107 +++
.../java/org/apache/falcon/cli/FalconCLI.java | 861 +------------------
.../org/apache/falcon/cli/FalconEntityCLI.java | 350 ++++++++
.../apache/falcon/cli/FalconInstanceCLI.java | 327 +++++++
.../apache/falcon/cli/FalconMetadataCLI.java | 122 ++-
.../org/apache/falcon/cli/FalconRecipeCLI.java | 99 +++
.../java/org/apache/falcon/cli/FalconCLIIT.java | 8 +-
8 files changed, 987 insertions(+), 889 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/d6965213/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d848515..22aa5ff 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,8 @@ Trunk (Unreleased)
FALCON-1027 Falcon proxy user support(Sowmya Ramesh)
IMPROVEMENTS
+ FALCON-592 Refactor FalconCLI to make it more manageable(Balu Vellanki via Ajay Yadava)
+
FALCON-1472 Improvements in SLA service(Ajay Yadava)
FALCON-438 Auto generate documentation for REST API(Narayan Periwal via Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/d6965213/client/src/main/java/org/apache/falcon/cli/FalconAdminCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconAdminCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconAdminCLI.java
new file mode 100644
index 0000000..32c0e8c
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/cli/FalconAdminCLI.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.falcon.client.FalconCLIException;
+import org.apache.falcon.client.FalconClient;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Admin extension to Falcon Command Line Interface - wraps the RESTful API for admin commands.
+ */
+public class FalconAdminCLI extends FalconCLI {
+
+ private static final String STACK_OPTION = "stack";
+
+ public FalconAdminCLI() throws Exception {
+ super();
+ }
+
+ public Options createAdminOptions() {
+ Options adminOptions = new Options();
+ Option url = new Option(URL_OPTION, true, "Falcon URL");
+ adminOptions.addOption(url);
+
+ OptionGroup group = new OptionGroup();
+ Option status = new Option(STATUS_OPT, false,
+ "show the current system status");
+ Option version = new Option(VERSION_OPT, false,
+ "show Falcon server build version");
+ Option stack = new Option(STACK_OPTION, false,
+ "show the thread stack dump");
+ Option doAs = new Option(DO_AS_OPT, true,
+ "doAs user");
+ Option help = new Option("help", false, "show Falcon help");
+ group.addOption(status);
+ group.addOption(version);
+ group.addOption(stack);
+ group.addOption(help);
+
+ adminOptions.addOptionGroup(group);
+ adminOptions.addOption(doAs);
+ return adminOptions;
+ }
+
+ public int adminCommand(CommandLine commandLine, FalconClient client,
+ String falconUrl) throws FalconCLIException, IOException {
+ String result;
+ Set<String> optionsList = new HashSet<String>();
+ for (Option option : commandLine.getOptions()) {
+ optionsList.add(option.getOpt());
+ }
+
+ String doAsUser = commandLine.getOptionValue(DO_AS_OPT);
+
+ if (optionsList.contains(STACK_OPTION)) {
+ result = client.getThreadDump(doAsUser);
+ OUT.get().println(result);
+ }
+
+ int exitValue = 0;
+ if (optionsList.contains(STATUS_OPT)) {
+ try {
+ int status = client.getStatus(doAsUser);
+ if (status != 200) {
+ ERR.get().println("Falcon server is not fully operational (on " + falconUrl + "). "
+ + "Please check log files.");
+ exitValue = status;
+ } else {
+ OUT.get().println("Falcon server is running (on " + falconUrl + ")");
+ }
+ } catch (Exception e) {
+ ERR.get().println("Falcon server doesn't seem to be running on " + falconUrl);
+ exitValue = -1;
+ }
+ } else if (optionsList.contains(VERSION_OPT)) {
+ result = client.getVersion(doAsUser);
+ OUT.get().println("Falcon server build version: " + result);
+ } else if (optionsList.contains(HELP_CMD)) {
+ OUT.get().println("Falcon Help");
+ }
+ return exitValue;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d6965213/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index 1574585..67420ec 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -20,35 +20,22 @@ package org.apache.falcon.cli;
import com.sun.jersey.api.client.ClientHandlerException;
import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.LifeCycle;
-import org.apache.falcon.ResponseHelper;
import org.apache.falcon.client.FalconCLIException;
import org.apache.falcon.client.FalconClient;
import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.resource.EntityList;
-import org.apache.falcon.resource.FeedLookupResult;
-import org.apache.falcon.resource.InstanceDependencyResult;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.InstancesSummaryResult;
-import org.apache.falcon.resource.SchedulableEntityInstanceResult;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
import java.util.Properties;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -59,95 +46,55 @@ public class FalconCLI {
public static final AtomicReference<PrintStream> ERR = new AtomicReference<PrintStream>(System.err);
public static final AtomicReference<PrintStream> OUT = new AtomicReference<PrintStream>(System.out);
- public static final String FALCON_URL = "FALCON_URL";
public static final String URL_OPTION = "url";
- public static final String VERSION_OPTION = "version";
- public static final String STATUS_OPTION = "status";
+ private static final String FALCON_URL = "FALCON_URL";
+
public static final String ADMIN_CMD = "admin";
public static final String HELP_CMD = "help";
public static final String METADATA_CMD = "metadata";
- private static final String VERSION_CMD = "version";
- private static final String STACK_OPTION = "stack";
-
public static final String ENTITY_CMD = "entity";
- public static final String ENTITY_TYPE_OPT = "type";
+ public static final String INSTANCE_CMD = "instance";
+ public static final String RECIPE_CMD = "recipe";
+
+ public static final String TYPE_OPT = "type";
public static final String COLO_OPT = "colo";
public static final String CLUSTER_OPT = "cluster";
public static final String ENTITY_NAME_OPT = "name";
public static final String FILE_PATH_OPT = "file";
- public static final String SUBMIT_OPT = "submit";
- public static final String UPDATE_OPT = "update";
+ public static final String VERSION_OPT = "version";
public static final String SCHEDULE_OPT = "schedule";
public static final String SUSPEND_OPT = "suspend";
public static final String RESUME_OPT = "resume";
- public static final String DELETE_OPT = "delete";
- public static final String SUBMIT_AND_SCHEDULE_OPT = "submitAndSchedule";
- public static final String VALIDATE_OPT = "validate";
public static final String STATUS_OPT = "status";
public static final String SUMMARY_OPT = "summary";
- public static final String DEFINITION_OPT = "definition";
public static final String DEPENDENCY_OPT = "dependency";
- public static final String SLA_MISS_ALERT_OPT = "slaAlert";
- public static final String LOOKUP_OPT = "lookup";
- public static final String PATH_OPT = "path";
public static final String LIST_OPT = "list";
- public static final String TOUCH_OPT = "touch";
public static final String SKIPDRYRUN_OPT = "skipDryRun";
- public static final String PROPS_OPT = "properties";
-
- public static final String FIELDS_OPT = "fields";
public static final String FILTER_BY_OPT = "filterBy";
- public static final String TAGS_OPT = "tags";
public static final String ORDER_BY_OPT = "orderBy";
public static final String SORT_ORDER_OPT = "sortOrder";
public static final String OFFSET_OPT = "offset";
public static final String NUM_RESULTS_OPT = "numResults";
- public static final String NUM_INSTANCES_OPT = "numInstances";
- public static final String NAMESEQ_OPT = "nameseq";
- public static final String TAGKEYS_OPT = "tagkeys";
- public static final String FORCE_RERUN_FLAG = "force";
-
- public static final String INSTANCE_CMD = "instance";
- public static final String INSTANCE_TIME_OPT = "instanceTime";
public static final String START_OPT = "start";
public static final String END_OPT = "end";
- public static final String RUNNING_OPT = "running";
- public static final String KILL_OPT = "kill";
- public static final String RERUN_OPT = "rerun";
- public static final String LOG_OPT = "logs";
- public static final String RUNID_OPT = "runid";
- public static final String CLUSTERS_OPT = "clusters";
- public static final String SOURCECLUSTER_OPT = "sourceClusters";
public static final String CURRENT_COLO = "current.colo";
public static final String CLIENT_PROPERTIES = "/client.properties";
- public static final String LIFECYCLE_OPT = "lifecycle";
- public static final String PARARMS_OPT = "params";
- public static final String LISTING_OPT = "listing";
- public static final String TRIAGE_OPT = "triage";
+ public static final String DO_AS_OPT = "doAs";
- // Recipe Command
- public static final String RECIPE_CMD = "recipe";
- public static final String RECIPE_NAME = "name";
- public static final String RECIPE_OPERATION= "operation";
- public static final String RECIPE_TOOL_CLASS_NAME = "tool";
+ private final Properties clientProperties;
+
+ public FalconCLI() throws Exception {
+ clientProperties = getClientProperties();
+ }
/**
* Recipe operation enum.
*/
- public static enum RecipeOperation {
+ public enum RecipeOperation {
HDFS_REPLICATION,
HIVE_DISASTER_RECOVERY
}
- private final Properties clientProperties;
-
- public FalconCLI() throws Exception {
- clientProperties = getClientProperties();
- }
-
- // doAs option
- public static final String DO_AS_OPT = "doAs";
-
/**
* Entry point for the Falcon CLI when invoked from the command line. Upon
* completion this method exits the JVM with '0' (success) or '-1'
@@ -175,23 +122,28 @@ public class FalconCLI {
* @param args options and arguments for the Oozie CLI.
* @return '0' (success), '-1' (failure).
*/
- public synchronized int run(final String[] args) {
+ public synchronized int run(final String[] args) throws Exception {
CLIParser parser = new CLIParser("falcon", FALCON_HELP);
+
+ FalconAdminCLI adminCLI = new FalconAdminCLI();
+ FalconEntityCLI entityCLI = new FalconEntityCLI();
+ FalconInstanceCLI instanceCLI = new FalconInstanceCLI();
FalconMetadataCLI metadataCLI = new FalconMetadataCLI();
+ FalconRecipeCLI recipeCLI = new FalconRecipeCLI();
- parser.addCommand(ADMIN_CMD, "", "admin operations", createAdminOptions(), true);
+ parser.addCommand(ADMIN_CMD, "", "admin operations", adminCLI.createAdminOptions(), true);
parser.addCommand(HELP_CMD, "", "display usage", new Options(), false);
- parser.addCommand(VERSION_CMD, "", "show client version", new Options(), false);
parser.addCommand(ENTITY_CMD, "",
"Entity operations like submit, suspend, resume, delete, status, definition, submitAndSchedule",
- entityOptions(), false);
+ entityCLI.createEntityOptions(), false);
parser.addCommand(INSTANCE_CMD, "",
"Process instances operations like running, status, kill, suspend, resume, rerun, logs",
- instanceOptions(), false);
+ instanceCLI.createInstanceOptions(), false);
parser.addCommand(METADATA_CMD, "", "Metadata operations like list, relations",
metadataCLI.createMetadataOptions(), true);
- parser.addCommand(RECIPE_CMD, "", "recipe operations", createRecipeOptions(), true);
+ parser.addCommand(RECIPE_CMD, "", "recipe operations", recipeCLI.createRecipeOptions(), true);
+ parser.addCommand(VERSION_OPT, "", "show client version", new Options(), false);
try {
CLIParser.Command command = parser.parse(args);
@@ -204,15 +156,15 @@ public class FalconCLI {
FalconClient client = new FalconClient(falconUrl, clientProperties);
if (command.getName().equals(ADMIN_CMD)) {
- exitValue = adminCommand(commandLine, client, falconUrl);
+ exitValue = adminCLI.adminCommand(commandLine, client, falconUrl);
} else if (command.getName().equals(ENTITY_CMD)) {
- entityCommand(commandLine, client);
+ entityCLI.entityCommand(commandLine, client);
} else if (command.getName().equals(INSTANCE_CMD)) {
- instanceCommand(commandLine, client);
+ instanceCLI.instanceCommand(commandLine, client);
} else if (command.getName().equals(METADATA_CMD)) {
metadataCLI.metadataCommand(commandLine, client);
} else if (command.getName().equals(RECIPE_CMD)) {
- recipeCommand(commandLine, client);
+ recipeCLI.recipeCommand(commandLine, client);
}
}
return exitValue;
@@ -239,131 +191,7 @@ public class FalconCLI {
}
}
- private void instanceCommand(CommandLine commandLine, FalconClient client)
- throws FalconCLIException, IOException {
- Set<String> optionsList = new HashSet<String>();
- for (Option option : commandLine.getOptions()) {
- optionsList.add(option.getOpt());
- }
-
- String result;
- String type = commandLine.getOptionValue(ENTITY_TYPE_OPT);
- String entity = commandLine.getOptionValue(ENTITY_NAME_OPT);
- String instanceTime = commandLine.getOptionValue(INSTANCE_TIME_OPT);
- String start = commandLine.getOptionValue(START_OPT);
- String end = commandLine.getOptionValue(END_OPT);
- String filePath = commandLine.getOptionValue(FILE_PATH_OPT);
- String runId = commandLine.getOptionValue(RUNID_OPT);
- String colo = commandLine.getOptionValue(COLO_OPT);
- String clusters = commandLine.getOptionValue(CLUSTERS_OPT);
- String sourceClusters = commandLine.getOptionValue(SOURCECLUSTER_OPT);
- List<LifeCycle> lifeCycles = getLifeCycle(commandLine.getOptionValue(LIFECYCLE_OPT));
- String filterBy = commandLine.getOptionValue(FILTER_BY_OPT);
- String orderBy = commandLine.getOptionValue(ORDER_BY_OPT);
- String sortOrder = commandLine.getOptionValue(SORT_ORDER_OPT);
- String doAsUser = commandLine.getOptionValue(DO_AS_OPT);
- Integer offset = parseIntegerInput(commandLine.getOptionValue(OFFSET_OPT), 0, "offset");
- Integer numResults = parseIntegerInput(commandLine.getOptionValue(NUM_RESULTS_OPT), null, "numResults");
-
- colo = getColo(colo);
- String instanceAction = "instance";
- validateSortOrder(sortOrder);
- validateInstanceCommands(optionsList, entity, type, colo);
-
- if (optionsList.contains(TRIAGE_OPT)) {
- validateNotEmpty(colo, COLO_OPT);
- validateNotEmpty(start, START_OPT);
- validateNotEmpty(type, ENTITY_TYPE_OPT);
- validateEntityTypeForSummary(type);
- validateNotEmpty(entity, ENTITY_NAME_OPT);
- result = client.triage(type, entity, start, colo).toString();
- } else if (optionsList.contains(DEPENDENCY_OPT)) {
- validateNotEmpty(instanceTime, INSTANCE_TIME_OPT);
- InstanceDependencyResult response = client.getInstanceDependencies(type, entity, instanceTime, colo);
- result = ResponseHelper.getString(response);
-
- } else if (optionsList.contains(RUNNING_OPT)) {
- validateOrderBy(orderBy, instanceAction);
- validateFilterBy(filterBy, instanceAction);
- result =
- ResponseHelper.getString(client.getRunningInstances(type,
- entity, colo, lifeCycles, filterBy, orderBy, sortOrder,
- offset, numResults, doAsUser));
- } else if (optionsList.contains(STATUS_OPT) || optionsList.contains(LIST_OPT)) {
- validateOrderBy(orderBy, instanceAction);
- validateFilterBy(filterBy, instanceAction);
- result =
- ResponseHelper.getString(client
- .getStatusOfInstances(type, entity, start, end, colo,
- lifeCycles,
- filterBy, orderBy, sortOrder, offset, numResults, doAsUser));
- } else if (optionsList.contains(SUMMARY_OPT)) {
- validateOrderBy(orderBy, "summary");
- validateFilterBy(filterBy, "summary");
- result =
- ResponseHelper.getString(client
- .getSummaryOfInstances(type, entity, start, end, colo,
- lifeCycles, filterBy, orderBy, sortOrder, doAsUser));
- } else if (optionsList.contains(KILL_OPT)) {
- validateNotEmpty(start, START_OPT);
- validateNotEmpty(end, END_OPT);
- result =
- ResponseHelper.getString(client
- .killInstances(type, entity, start, end, colo, clusters,
- sourceClusters, lifeCycles, doAsUser));
- } else if (optionsList.contains(SUSPEND_OPT)) {
- validateNotEmpty(start, START_OPT);
- validateNotEmpty(end, END_OPT);
- result =
- ResponseHelper.getString(client
- .suspendInstances(type, entity, start, end, colo, clusters,
- sourceClusters, lifeCycles, doAsUser));
- } else if (optionsList.contains(RESUME_OPT)) {
- validateNotEmpty(start, START_OPT);
- validateNotEmpty(end, END_OPT);
- result =
- ResponseHelper.getString(client
- .resumeInstances(type, entity, start, end, colo, clusters,
- sourceClusters, lifeCycles, doAsUser));
- } else if (optionsList.contains(RERUN_OPT)) {
- validateNotEmpty(start, START_OPT);
- validateNotEmpty(end, END_OPT);
- boolean isForced = false;
- if (optionsList.contains(FORCE_RERUN_FLAG)) {
- isForced = true;
- }
- result =
- ResponseHelper.getString(client
- .rerunInstances(type, entity, start, end, filePath, colo,
- clusters, sourceClusters,
- lifeCycles, isForced, doAsUser));
- } else if (optionsList.contains(LOG_OPT)) {
- validateOrderBy(orderBy, instanceAction);
- validateFilterBy(filterBy, instanceAction);
- result =
- ResponseHelper.getString(client
- .getLogsOfInstances(type, entity, start, end, colo, runId,
- lifeCycles,
- filterBy, orderBy, sortOrder, offset, numResults, doAsUser),
- runId);
- } else if (optionsList.contains(PARARMS_OPT)) {
- // start time is the nominal time of instance
- result =
- ResponseHelper
- .getString(client.getParamsOfInstance(
- type, entity, start, colo, lifeCycles, doAsUser));
- } else if (optionsList.contains(LISTING_OPT)) {
- result =
- ResponseHelper.getString(client
- .getFeedListing(type, entity, start, end, colo, doAsUser));
- } else {
- throw new FalconCLIException("Invalid command");
- }
-
- OUT.get().println(result);
- }
-
- private Integer parseIntegerInput(String optionValue, Integer defaultVal, String optionName)
+ protected Integer parseIntegerInput(String optionValue, Integer defaultVal, String optionName)
throws FalconCLIException {
Integer integer = defaultVal;
if (optionValue != null) {
@@ -377,187 +205,7 @@ public class FalconCLI {
return integer;
}
- private void validateInstanceCommands(Set<String> optionsList,
- String entity, String type,
- String colo) throws FalconCLIException {
-
- validateNotEmpty(entity, ENTITY_NAME_OPT);
- validateNotEmpty(type, ENTITY_TYPE_OPT);
- validateNotEmpty(colo, COLO_OPT);
-
- if (optionsList.contains(CLUSTERS_OPT)) {
- if (optionsList.contains(RUNNING_OPT)
- || optionsList.contains(LOG_OPT)
- || optionsList.contains(STATUS_OPT)
- || optionsList.contains(SUMMARY_OPT)) {
- throw new FalconCLIException("Invalid argument: clusters");
- }
- }
-
- if (optionsList.contains(SOURCECLUSTER_OPT)) {
- if (optionsList.contains(RUNNING_OPT)
- || optionsList.contains(LOG_OPT)
- || optionsList.contains(STATUS_OPT)
- || optionsList.contains(SUMMARY_OPT) || !type.equals("feed")) {
- throw new FalconCLIException("Invalid argument: sourceClusters");
- }
- }
-
- if (optionsList.contains(FORCE_RERUN_FLAG)) {
- if (!optionsList.contains(RERUN_OPT)) {
- throw new FalconCLIException("Force option can be used only with instance rerun");
- }
- }
- }
-
- private void entityCommand(CommandLine commandLine, FalconClient client)
- throws FalconCLIException, IOException {
- Set<String> optionsList = new HashSet<String>();
- for (Option option : commandLine.getOptions()) {
- optionsList.add(option.getOpt());
- }
-
- String result = null;
- String entityType = commandLine.getOptionValue(ENTITY_TYPE_OPT);
- String entityName = commandLine.getOptionValue(ENTITY_NAME_OPT);
- String filePath = commandLine.getOptionValue(FILE_PATH_OPT);
- String colo = commandLine.getOptionValue(COLO_OPT);
- colo = getColo(colo);
- String cluster = commandLine.getOptionValue(CLUSTER_OPT);
- String start = commandLine.getOptionValue(START_OPT);
- String end = commandLine.getOptionValue(END_OPT);
- String orderBy = commandLine.getOptionValue(ORDER_BY_OPT);
- String sortOrder = commandLine.getOptionValue(SORT_ORDER_OPT);
- String filterBy = commandLine.getOptionValue(FILTER_BY_OPT);
- String filterTags = commandLine.getOptionValue(TAGS_OPT);
- String nameSubsequence = commandLine.getOptionValue(NAMESEQ_OPT);
- String tagKeywords = commandLine.getOptionValue(TAGKEYS_OPT);
- String fields = commandLine.getOptionValue(FIELDS_OPT);
- String feedInstancePath = commandLine.getOptionValue(PATH_OPT);
- Integer offset = parseIntegerInput(commandLine.getOptionValue(OFFSET_OPT), 0, "offset");
- Integer numResults = parseIntegerInput(commandLine.getOptionValue(NUM_RESULTS_OPT),
- null, "numResults");
- String doAsUser = commandLine.getOptionValue(DO_AS_OPT);
-
- Integer numInstances = parseIntegerInput(commandLine.getOptionValue(NUM_INSTANCES_OPT), 7, "numInstances");
- Boolean skipDryRun = null;
- if (optionsList.contains(SKIPDRYRUN_OPT)) {
- skipDryRun = true;
- }
-
- String userProps = commandLine.getOptionValue(PROPS_OPT);
-
- EntityType entityTypeEnum = null;
- if (optionsList.contains(LIST_OPT)) {
- if (entityType == null) {
- entityType = "";
- }
- if (StringUtils.isNotEmpty(entityType)) {
- String[] types = entityType.split(",");
- for (String type : types) {
- EntityType.getEnum(type);
- }
- }
- } else {
- validateNotEmpty(entityType, ENTITY_TYPE_OPT);
- entityTypeEnum = EntityType.getEnum(entityType);
- }
- validateSortOrder(sortOrder);
- String entityAction = "entity";
-
- if (optionsList.contains(SLA_MISS_ALERT_OPT)) {
- validateNotEmpty(entityType, ENTITY_TYPE_OPT);
- validateNotEmpty(start, START_OPT);
- parseDateString(start);
- parseDateString(end);
- SchedulableEntityInstanceResult response = client.getFeedSlaMissPendingAlerts(entityType,
- entityName, start, end, colo);
- result = ResponseHelper.getString(response);
- } else if (optionsList.contains(SUBMIT_OPT)) {
- validateNotEmpty(filePath, "file");
- validateColo(optionsList);
- result = client.submit(entityType, filePath, doAsUser).getMessage();
- } else if (optionsList.contains(LOOKUP_OPT)) {
- validateNotEmpty(feedInstancePath, PATH_OPT);
- FeedLookupResult resp = client.reverseLookUp(entityType, feedInstancePath, doAsUser);
- result = ResponseHelper.getString(resp);
-
- } else if (optionsList.contains(UPDATE_OPT)) {
- validateNotEmpty(filePath, "file");
- validateColo(optionsList);
- validateNotEmpty(entityName, ENTITY_NAME_OPT);
- result = client.update(entityType, entityName, filePath, skipDryRun, doAsUser).getMessage();
- } else if (optionsList.contains(SUBMIT_AND_SCHEDULE_OPT)) {
- validateNotEmpty(filePath, "file");
- validateColo(optionsList);
- result = client.submitAndSchedule(entityType, filePath, skipDryRun, doAsUser, userProps).getMessage();
- } else if (optionsList.contains(VALIDATE_OPT)) {
- validateNotEmpty(filePath, "file");
- validateColo(optionsList);
- result = client.validate(entityType, filePath, skipDryRun, doAsUser).getMessage();
- } else if (optionsList.contains(SCHEDULE_OPT)) {
- validateNotEmpty(entityName, ENTITY_NAME_OPT);
- colo = getColo(colo);
- result = client.schedule(entityTypeEnum, entityName, colo, skipDryRun, doAsUser, userProps).getMessage();
- } else if (optionsList.contains(SUSPEND_OPT)) {
- validateNotEmpty(entityName, ENTITY_NAME_OPT);
- colo = getColo(colo);
- result = client.suspend(entityTypeEnum, entityName, colo, doAsUser).getMessage();
- } else if (optionsList.contains(RESUME_OPT)) {
- validateNotEmpty(entityName, ENTITY_NAME_OPT);
- colo = getColo(colo);
- result = client.resume(entityTypeEnum, entityName, colo, doAsUser).getMessage();
- } else if (optionsList.contains(DELETE_OPT)) {
- validateColo(optionsList);
- validateNotEmpty(entityName, ENTITY_NAME_OPT);
- result = client.delete(entityTypeEnum, entityName, doAsUser).getMessage();
- } else if (optionsList.contains(STATUS_OPT)) {
- validateNotEmpty(entityName, ENTITY_NAME_OPT);
- colo = getColo(colo);
- result =
- client.getStatus(entityTypeEnum, entityName, colo, doAsUser).getMessage();
- } else if (optionsList.contains(DEFINITION_OPT)) {
- validateColo(optionsList);
- validateNotEmpty(entityName, ENTITY_NAME_OPT);
- result = client.getDefinition(entityType, entityName, doAsUser).toString();
- } else if (optionsList.contains(DEPENDENCY_OPT)) {
- validateColo(optionsList);
- validateNotEmpty(entityName, ENTITY_NAME_OPT);
- result = client.getDependency(entityType, entityName, doAsUser).toString();
- } else if (optionsList.contains(LIST_OPT)) {
- validateColo(optionsList);
- validateEntityFields(fields);
- validateOrderBy(orderBy, entityAction);
- validateFilterBy(filterBy, entityAction);
- EntityList entityList = client.getEntityList(entityType, fields, nameSubsequence, tagKeywords,
- filterBy, filterTags, orderBy, sortOrder, offset, numResults, doAsUser);
- result = entityList != null ? entityList.toString() : "No entity of type (" + entityType + ") found.";
- } else if (optionsList.contains(SUMMARY_OPT)) {
- validateEntityTypeForSummary(entityType);
- validateNotEmpty(cluster, CLUSTER_OPT);
- validateEntityFields(fields);
- validateFilterBy(filterBy, entityAction);
- validateOrderBy(orderBy, entityAction);
- result =
- ResponseHelper.getString(client
- .getEntitySummary(
- entityType, cluster, start, end, fields, filterBy,
- filterTags,
- orderBy, sortOrder, offset, numResults,
- numInstances, doAsUser));
- } else if (optionsList.contains(TOUCH_OPT)) {
- validateNotEmpty(entityName, ENTITY_NAME_OPT);
- colo = getColo(colo);
- result = client.touch(entityType, entityName, colo, skipDryRun, doAsUser).getMessage();
- } else if (optionsList.contains(HELP_CMD)) {
- OUT.get().println("Falcon Help");
- } else {
- throw new FalconCLIException("Invalid command");
- }
- OUT.get().println(result);
- }
-
- private void validateEntityTypeForSummary(String type) throws FalconCLIException {
+ protected void validateEntityTypeForSummary(String type) throws FalconCLIException {
EntityType entityType = EntityType.getEnum(type);
if (!entityType.isSchedulable()) {
throw new FalconCLIException("Invalid entity type " + entityType
@@ -565,14 +213,14 @@ public class FalconCLI {
}
}
- private void validateNotEmpty(String paramVal, String paramName) throws FalconCLIException {
- if (StringUtils.isEmpty(paramVal)) {
+ protected void validateNotEmpty(String paramVal, String paramName) throws FalconCLIException {
+ if (StringUtils.isBlank(paramVal)) {
throw new FalconCLIException("Missing argument : " + paramName);
}
}
- private void validateSortOrder(String sortOrder) throws FalconCLIException {
- if (!StringUtils.isEmpty(sortOrder)) {
+ protected void validateSortOrder(String sortOrder) throws FalconCLIException {
+ if (!StringUtils.isBlank(sortOrder)) {
if (!sortOrder.equalsIgnoreCase("asc") && !sortOrder.equalsIgnoreCase("desc")) {
throw new FalconCLIException("Value for param sortOrder should be \"asc\" or \"desc\". It is : "
+ sortOrder);
@@ -580,7 +228,7 @@ public class FalconCLI {
}
}
- private String getColo(String colo) throws FalconCLIException, IOException {
+ protected String getColo(String colo) throws FalconCLIException, IOException {
if (colo == null) {
Properties prop = getClientProperties();
colo = prop.getProperty(CURRENT_COLO, "*");
@@ -588,29 +236,7 @@ public class FalconCLI {
return colo;
}
- private void validateColo(Set<String> optionsList)
- throws FalconCLIException {
-
- if (optionsList.contains(COLO_OPT)) {
- throw new FalconCLIException("Invalid argument : " + COLO_OPT);
- }
- }
-
- private void validateEntityFields(String fields) throws FalconCLIException {
- if (StringUtils.isEmpty(fields)) {
- return;
- }
- String[] fieldsList = fields.split(",");
- for (String s : fieldsList) {
- try {
- EntityList.EntityFieldList.valueOf(s.toUpperCase());
- } catch (IllegalArgumentException ie) {
- throw new FalconCLIException("Invalid fields argument : " + FIELDS_OPT);
- }
- }
- }
-
- private void validateFilterBy(String filterBy, String filterType) throws FalconCLIException {
+ protected void validateFilterBy(String filterBy, String filterType) throws FalconCLIException {
if (StringUtils.isEmpty(filterBy)) {
return;
}
@@ -633,8 +259,8 @@ public class FalconCLI {
}
}
- private void validateOrderBy(String orderBy, String action) throws FalconCLIException {
- if (StringUtils.isEmpty(orderBy)) {
+ protected void validateOrderBy(String orderBy, String action) throws FalconCLIException {
+ if (StringUtils.isBlank(orderBy)) {
return;
}
if (action.equals("instance")) {
@@ -655,325 +281,6 @@ public class FalconCLI {
throw new FalconCLIException("Invalid orderBy argument : " + orderBy);
}
-
- private Date parseDateString(String time) throws FalconCLIException {
- if (time != null && !time.isEmpty()) {
- try {
- return SchemaHelper.parseDateUTC(time);
- } catch(Exception e) {
- throw new FalconCLIException("Time " + time + " is not valid", e);
- }
- }
- return null;
- }
-
- private Options createAdminOptions() {
- Options adminOptions = new Options();
- Option url = new Option(URL_OPTION, true, "Falcon URL");
- adminOptions.addOption(url);
-
- OptionGroup group = new OptionGroup();
- Option status = new Option(STATUS_OPTION, false,
- "show the current system status");
- Option version = new Option(VERSION_OPTION, false,
- "show Falcon server build version");
- Option stack = new Option(STACK_OPTION, false,
- "show the thread stack dump");
- Option doAs = new Option(DO_AS_OPT, true,
- "doAs user");
- Option help = new Option("help", false, "show Falcon help");
- group.addOption(status);
- group.addOption(version);
- group.addOption(stack);
- group.addOption(help);
-
- adminOptions.addOptionGroup(group);
- adminOptions.addOption(doAs);
- return adminOptions;
- }
-
- private Options entityOptions() {
-
- Options entityOptions = new Options();
-
- Option submit = new Option(SUBMIT_OPT, false,
- "Submits an entity xml to Falcon");
- Option update = new Option(UPDATE_OPT, false,
- "Updates an existing entity xml");
- Option schedule = new Option(SCHEDULE_OPT, false,
- "Schedules a submited entity in Falcon");
- Option suspend = new Option(SUSPEND_OPT, false,
- "Suspends a running entity in Falcon");
- Option resume = new Option(RESUME_OPT, false,
- "Resumes a suspended entity in Falcon");
- Option delete = new Option(DELETE_OPT, false,
- "Deletes an entity in Falcon, and kills its instance from workflow engine");
- Option submitAndSchedule = new Option(SUBMIT_AND_SCHEDULE_OPT, false,
- "Submits and entity to Falcon and schedules it immediately");
- Option validate = new Option(VALIDATE_OPT, false,
- "Validates an entity based on the entity type");
- Option status = new Option(STATUS_OPT, false,
- "Gets the status of entity");
- Option definition = new Option(DEFINITION_OPT, false,
- "Gets the Definition of entity");
- Option dependency = new Option(DEPENDENCY_OPT, false,
- "Gets the dependencies of entity");
- Option list = new Option(LIST_OPT, false,
- "List entities registered for a type");
- Option lookup = new Option(LOOKUP_OPT, false, "Lookup a feed given its instance's path");
- Option slaAlert = new Option(SLA_MISS_ALERT_OPT, false, "Get missing feed instances which missed SLA");
- Option entitySummary = new Option(SUMMARY_OPT, false,
- "Get summary of instances for list of entities");
- Option touch = new Option(TOUCH_OPT, false,
- "Force update the entity in workflow engine(even without any changes to entity)");
-
- OptionGroup group = new OptionGroup();
- group.addOption(submit);
- group.addOption(update);
- group.addOption(schedule);
- group.addOption(suspend);
- group.addOption(resume);
- group.addOption(delete);
- group.addOption(submitAndSchedule);
- group.addOption(validate);
- group.addOption(status);
- group.addOption(definition);
- group.addOption(dependency);
- group.addOption(list);
- group.addOption(lookup);
- group.addOption(slaAlert);
- group.addOption(entitySummary);
- group.addOption(touch);
-
- Option url = new Option(URL_OPTION, true, "Falcon URL");
- Option entityType = new Option(ENTITY_TYPE_OPT, true,
- "Entity type, can be cluster, feed or process xml");
- Option filePath = new Option(FILE_PATH_OPT, true,
- "Path to entity xml file");
- Option entityName = new Option(ENTITY_NAME_OPT, true,
- "Entity type, can be cluster, feed or process xml");
- Option start = new Option(START_OPT, true, "Start time is optional for summary");
- Option end = new Option(END_OPT, true, "End time is optional for summary");
- Option colo = new Option(COLO_OPT, true, "Colo name");
- Option cluster = new Option(CLUSTER_OPT, true, "Cluster name");
- colo.setRequired(false);
- Option fields = new Option(FIELDS_OPT, true, "Entity fields to show for a request");
- Option filterBy = new Option(FILTER_BY_OPT, true,
- "Filter returned entities by the specified status");
- Option filterTags = new Option(TAGS_OPT, true, "Filter returned entities by the specified tags");
- Option nameSubsequence = new Option(NAMESEQ_OPT, true, "Subsequence of entity name");
- Option tagKeywords = new Option(TAGKEYS_OPT, true, "Keywords in tags");
- Option orderBy = new Option(ORDER_BY_OPT, true,
- "Order returned entities by this field");
- Option sortOrder = new Option(SORT_ORDER_OPT, true, "asc or desc order for results");
- Option offset = new Option(OFFSET_OPT, true,
- "Start returning entities from this offset");
- Option numResults = new Option(NUM_RESULTS_OPT, true,
- "Number of results to return per request");
- Option numInstances = new Option(NUM_INSTANCES_OPT, true,
- "Number of instances to return per entity summary request");
- Option path = new Option(PATH_OPT, true, "Path for a feed's instance");
- Option skipDryRun = new Option(SKIPDRYRUN_OPT, false, "skip dry run in workflow engine");
- Option doAs = new Option(DO_AS_OPT, true, "doAs user");
- Option userProps = new Option(PROPS_OPT, true, "User supplied comma separated key value properties");
-
- entityOptions.addOption(url);
- entityOptions.addOption(path);
- entityOptions.addOptionGroup(group);
- entityOptions.addOption(entityType);
- entityOptions.addOption(entityName);
- entityOptions.addOption(filePath);
- entityOptions.addOption(colo);
- entityOptions.addOption(cluster);
- entityOptions.addOption(start);
- entityOptions.addOption(end);
- entityOptions.addOption(fields);
- entityOptions.addOption(filterBy);
- entityOptions.addOption(filterTags);
- entityOptions.addOption(nameSubsequence);
- entityOptions.addOption(tagKeywords);
- entityOptions.addOption(orderBy);
- entityOptions.addOption(sortOrder);
- entityOptions.addOption(offset);
- entityOptions.addOption(numResults);
- entityOptions.addOption(numInstances);
- entityOptions.addOption(skipDryRun);
- entityOptions.addOption(doAs);
- entityOptions.addOption(userProps);
-
- return entityOptions;
- }
-
- private Options instanceOptions() {
-
- Options instanceOptions = new Options();
-
- Option running = new Option(RUNNING_OPT, false,
- "Gets running process instances for a given process");
- Option list = new Option(LIST_OPT, false,
- "Gets all instances for a given process in the range start time and optional end time");
- Option status = new Option(
- STATUS_OPT,
- false,
- "Gets status of process instances for a given process in the range start time and optional end time");
- Option summary = new Option(
- SUMMARY_OPT,
- false,
- "Gets summary of instances for a given process in the range start time and optional end time");
- Option kill = new Option(
- KILL_OPT,
- false,
- "Kills active process instances for a given process in the range start time and optional end time");
- Option suspend = new Option(
- SUSPEND_OPT,
- false,
- "Suspends active process instances for a given process in the range start time and optional end time");
- Option resume = new Option(
- RESUME_OPT,
- false,
- "Resumes suspended process instances for a given process "
- + "in the range start time and optional end time");
- Option rerun = new Option(
- RERUN_OPT,
- false,
- "Reruns process instances for a given process in the range start time and "
- + "optional end time and overrides properties present in job.properties file");
-
- Option logs = new Option(
- LOG_OPT,
- false,
- "Logs print the logs for process instances for a given process in "
- + "the range start time and optional end time");
-
- Option params = new Option(
- PARARMS_OPT,
- false,
- "Displays the workflow parameters for a given instance of specified nominal time"
- + "start time represents nominal time and end time is not considered");
-
- Option listing = new Option(
- LISTING_OPT,
- false,
- "Displays feed listing and their status between a start and end time range.");
-
- Option dependency = new Option(
- DEPENDENCY_OPT,
- false,
- "Displays dependent instances for a specified instance.");
-
- Option triage = new Option(TRIAGE_OPT, false,
- "Triage a feed or process instance and find the failures in it's lineage.");
-
- OptionGroup group = new OptionGroup();
- group.addOption(running);
- group.addOption(list);
- group.addOption(status);
- group.addOption(summary);
- group.addOption(kill);
- group.addOption(resume);
- group.addOption(suspend);
- group.addOption(resume);
- group.addOption(rerun);
- group.addOption(logs);
- group.addOption(params);
- group.addOption(listing);
- group.addOption(dependency);
- group.addOption(triage);
-
- Option url = new Option(URL_OPTION, true, "Falcon URL");
- Option start = new Option(START_OPT, true,
- "Start time is required for commands, status, kill, suspend, resume and re-run"
- + "and it is nominal time while displaying workflow params");
- Option end = new Option(
- END_OPT,
- true,
- "End time is optional for commands, status, kill, suspend, resume and re-run; "
- + "if not specified then current time is considered as end time");
- Option runid = new Option(RUNID_OPT, true,
- "Instance runid is optional and user can specify the runid, defaults to 0");
- Option clusters = new Option(CLUSTERS_OPT, true,
- "clusters is optional for commands kill, suspend and resume, "
- + "should not be specified for other commands");
- Option sourceClusters = new Option(SOURCECLUSTER_OPT, true,
- " source cluster is optional for commands kill, suspend and resume, "
- + "should not be specified for other commands (required for only feed)");
- Option filePath = new Option(
- FILE_PATH_OPT,
- true,
- "Path to job.properties file is required for rerun command, "
- + "it should contain name=value pair for properties to override for rerun");
- Option entityType = new Option(ENTITY_TYPE_OPT, true,
- "Entity type, can be feed or process xml");
- Option entityName = new Option(ENTITY_NAME_OPT, true,
- "Entity name, can be feed or process name");
- Option colo = new Option(COLO_OPT, true,
- "Colo on which the cmd has to be executed");
- Option lifecycle = new Option(LIFECYCLE_OPT,
- true,
- "describes life cycle of entity , for feed it can be replication/retention "
- + "and for process it can be execution");
- Option filterBy = new Option(FILTER_BY_OPT, true,
- "Filter returned instances by the specified fields");
- Option orderBy = new Option(ORDER_BY_OPT, true,
- "Order returned instances by this field");
- Option sortOrder = new Option(SORT_ORDER_OPT, true, "asc or desc order for results");
- Option offset = new Option(OFFSET_OPT, true,
- "Start returning instances from this offset");
- Option numResults = new Option(NUM_RESULTS_OPT, true,
- "Number of results to return per request");
- Option forceRerun = new Option(FORCE_RERUN_FLAG, false,
- "Flag to forcefully rerun entire workflow of an instance");
- Option doAs = new Option(DO_AS_OPT, true, "doAs user");
-
- Option instanceTime = new Option(INSTANCE_TIME_OPT, true, "Time for an instance");
-
- instanceOptions.addOption(url);
- instanceOptions.addOptionGroup(group);
- instanceOptions.addOption(start);
- instanceOptions.addOption(end);
- instanceOptions.addOption(filePath);
- instanceOptions.addOption(entityType);
- instanceOptions.addOption(entityName);
- instanceOptions.addOption(runid);
- instanceOptions.addOption(clusters);
- instanceOptions.addOption(sourceClusters);
- instanceOptions.addOption(colo);
- instanceOptions.addOption(lifecycle);
- instanceOptions.addOption(filterBy);
- instanceOptions.addOption(offset);
- instanceOptions.addOption(orderBy);
- instanceOptions.addOption(sortOrder);
- instanceOptions.addOption(numResults);
- instanceOptions.addOption(forceRerun);
- instanceOptions.addOption(doAs);
- instanceOptions.addOption(instanceTime);
-
- return instanceOptions;
- }
-
- private Options createRecipeOptions() {
- Options recipeOptions = new Options();
- Option url = new Option(URL_OPTION, true, "Falcon URL");
- recipeOptions.addOption(url);
-
- Option recipeFileOpt = new Option(RECIPE_NAME, true, "recipe name");
- recipeOptions.addOption(recipeFileOpt);
-
- Option recipeToolClassName = new Option(RECIPE_TOOL_CLASS_NAME, true, "recipe class");
- recipeOptions.addOption(recipeToolClassName);
-
- Option recipeOperation = new Option(RECIPE_OPERATION, true, "recipe operation");
- recipeOptions.addOption(recipeOperation);
-
- Option skipDryRunOperation = new Option(SKIPDRYRUN_OPT, false, "skip dryrun operation");
- recipeOptions.addOption(skipDryRunOperation);
-
- Option doAs = new Option(DO_AS_OPT, true, "doAs user");
- recipeOptions.addOption(doAs);
-
- return recipeOptions;
- }
-
protected String getFalconEndpoint(CommandLine commandLine) throws FalconCLIException, IOException {
String url = commandLine.getOptionValue(URL_OPTION);
if (url == null) {
@@ -991,45 +298,6 @@ public class FalconCLI {
return url;
}
- private int adminCommand(CommandLine commandLine, FalconClient client,
- String falconUrl) throws FalconCLIException, IOException {
- String result;
- Set<String> optionsList = new HashSet<String>();
- for (Option option : commandLine.getOptions()) {
- optionsList.add(option.getOpt());
- }
-
- String doAsUser = commandLine.getOptionValue(DO_AS_OPT);
-
- if (optionsList.contains(STACK_OPTION)) {
- result = client.getThreadDump(doAsUser);
- OUT.get().println(result);
- }
- int exitValue = 0;
- if (optionsList.contains(STATUS_OPTION)) {
- try {
- int status = client.getStatus(doAsUser);
- if (status != 200) {
- ERR.get().println("Falcon server is not fully operational (on " + falconUrl + "). "
- + "Please check log files.");
- exitValue = status;
- } else {
- OUT.get().println("Falcon server is running (on " + falconUrl + ")");
- }
- } catch (Exception e) {
- ERR.get().println("Falcon server doesn't seem to be running on " + falconUrl);
- exitValue = -1;
- }
- } else if (optionsList.contains(VERSION_OPTION)) {
- result = client.getVersion(doAsUser);
- OUT.get().println("Falcon server build version: " + result);
- } else if (optionsList.contains(HELP_CMD)) {
- OUT.get().println("Falcon Help");
- }
-
- return exitValue;
- }
-
private Properties getClientProperties() throws IOException {
InputStream inputStream = null;
try {
@@ -1043,55 +311,4 @@ public class FalconCLI {
IOUtils.closeQuietly(inputStream);
}
}
-
- public static List<LifeCycle> getLifeCycle(String lifeCycleValue) throws FalconCLIException {
-
- if (lifeCycleValue != null) {
- String[] lifeCycleValues = lifeCycleValue.split(",");
- List<LifeCycle> lifeCycles = new ArrayList<LifeCycle>();
- try {
- for (String lifeCycle : lifeCycleValues) {
- lifeCycles.add(LifeCycle.valueOf(lifeCycle.toUpperCase().trim()));
- }
- } catch (IllegalArgumentException e) {
- throw new FalconCLIException("Invalid life cycle values: " + lifeCycles, e);
- }
- return lifeCycles;
- }
- return null;
- }
-
- private void recipeCommand(CommandLine commandLine, FalconClient client) throws FalconCLIException {
- Set<String> optionsList = new HashSet<String>();
- for (Option option : commandLine.getOptions()) {
- optionsList.add(option.getOpt());
- }
-
- String recipeName = commandLine.getOptionValue(RECIPE_NAME);
- String recipeToolClass = commandLine.getOptionValue(RECIPE_TOOL_CLASS_NAME);
- String recipeOperation = commandLine.getOptionValue(RECIPE_OPERATION);
- String doAsUser = commandLine.getOptionValue(DO_AS_OPT);
-
- validateNotEmpty(recipeName, RECIPE_NAME);
- validateNotEmpty(recipeOperation, RECIPE_OPERATION);
- validateRecipeOperations(recipeOperation);
- Boolean skipDryRun = null;
- if (optionsList.contains(SKIPDRYRUN_OPT)) {
- skipDryRun = true;
- }
-
- String result = client.submitRecipe(recipeName, recipeToolClass,
- recipeOperation, skipDryRun, doAsUser).toString();
- OUT.get().println(result);
- }
-
- private static void validateRecipeOperations(String recipeOperation) throws FalconCLIException {
- for(RecipeOperation operation : RecipeOperation.values()) {
- if (operation.toString().equalsIgnoreCase(recipeOperation)) {
- return;
- }
- }
- throw new FalconCLIException("Allowed Recipe operations: "
- + java.util.Arrays.asList((RecipeOperation.values())));
- }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d6965213/client/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
new file mode 100644
index 0000000..98f3176
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.ResponseHelper;
+import org.apache.falcon.client.FalconCLIException;
+import org.apache.falcon.client.FalconClient;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.resource.EntityList;
+import org.apache.falcon.resource.FeedLookupResult;
+import org.apache.falcon.resource.SchedulableEntityInstanceResult;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Entity extension to Falcon Command Line Interface - wraps the RESTful API for entities.
+ */
+public class FalconEntityCLI extends FalconCLI {
+
+ private static final String SUBMIT_OPT = "submit";
+ private static final String UPDATE_OPT = "update";
+ private static final String DELETE_OPT = "delete";
+ private static final String SUBMIT_AND_SCHEDULE_OPT = "submitAndSchedule";
+ private static final String VALIDATE_OPT = "validate";
+ private static final String DEFINITION_OPT = "definition";
+ public static final String SLA_MISS_ALERT_OPT = "slaAlert";
+
+ private static final String LOOKUP_OPT = "lookup";
+ private static final String PATH_OPT = "path";
+ private static final String TOUCH_OPT = "touch";
+ private static final String PROPS_OPT = "properties";
+ private static final String FIELDS_OPT = "fields";
+ private static final String TAGS_OPT = "tags";
+ private static final String NUM_INSTANCES_OPT = "numInstances";
+ private static final String NAMESEQ_OPT = "nameseq";
+ private static final String TAGKEYS_OPT = "tagkeys";
+
+ public FalconEntityCLI() throws Exception {
+ super();
+ }
+
+ public Options createEntityOptions() {
+
+ Options entityOptions = new Options();
+
+ Option submit = new Option(SUBMIT_OPT, false,
+ "Submits an entity xml to Falcon");
+ Option update = new Option(UPDATE_OPT, false,
+ "Updates an existing entity xml");
+ Option schedule = new Option(SCHEDULE_OPT, false,
+ "Schedules a submited entity in Falcon");
+ Option suspend = new Option(SUSPEND_OPT, false,
+ "Suspends a running entity in Falcon");
+ Option resume = new Option(RESUME_OPT, false,
+ "Resumes a suspended entity in Falcon");
+ Option delete = new Option(DELETE_OPT, false,
+ "Deletes an entity in Falcon, and kills its instance from workflow engine");
+ Option submitAndSchedule = new Option(SUBMIT_AND_SCHEDULE_OPT, false,
+ "Submits and entity to Falcon and schedules it immediately");
+ Option validate = new Option(VALIDATE_OPT, false,
+ "Validates an entity based on the entity type");
+ Option status = new Option(STATUS_OPT, false,
+ "Gets the status of entity");
+ Option definition = new Option(DEFINITION_OPT, false,
+ "Gets the Definition of entity");
+ Option dependency = new Option(DEPENDENCY_OPT, false,
+ "Gets the dependencies of entity");
+ Option list = new Option(LIST_OPT, false,
+ "List entities registered for a type");
+ Option lookup = new Option(LOOKUP_OPT, false, "Lookup a feed given its instance's path");
+ Option slaAlert = new Option(SLA_MISS_ALERT_OPT, false, "Get missing feed instances which missed SLA");
+ Option entitySummary = new Option(SUMMARY_OPT, false,
+ "Get summary of instances for list of entities");
+ Option touch = new Option(TOUCH_OPT, false,
+ "Force update the entity in workflow engine(even without any changes to entity)");
+
+ OptionGroup group = new OptionGroup();
+ group.addOption(submit);
+ group.addOption(update);
+ group.addOption(schedule);
+ group.addOption(suspend);
+ group.addOption(resume);
+ group.addOption(delete);
+ group.addOption(submitAndSchedule);
+ group.addOption(validate);
+ group.addOption(status);
+ group.addOption(definition);
+ group.addOption(dependency);
+ group.addOption(list);
+ group.addOption(lookup);
+ group.addOption(slaAlert);
+ group.addOption(entitySummary);
+ group.addOption(touch);
+
+ Option url = new Option(URL_OPTION, true, "Falcon URL");
+ Option entityType = new Option(TYPE_OPT, true,
+ "Entity type, can be cluster, feed or process xml");
+ Option filePath = new Option(FILE_PATH_OPT, true,
+ "Path to entity xml file");
+ Option entityName = new Option(ENTITY_NAME_OPT, true,
+ "Entity type, can be cluster, feed or process xml");
+ Option start = new Option(START_OPT, true, "Start time is optional for summary");
+ Option end = new Option(END_OPT, true, "End time is optional for summary");
+ Option colo = new Option(COLO_OPT, true, "Colo name");
+ Option cluster = new Option(CLUSTER_OPT, true, "Cluster name");
+ colo.setRequired(false);
+ Option fields = new Option(FIELDS_OPT, true, "Entity fields to show for a request");
+ Option filterBy = new Option(FILTER_BY_OPT, true,
+ "Filter returned entities by the specified status");
+ Option filterTags = new Option(TAGS_OPT, true, "Filter returned entities by the specified tags");
+ Option nameSubsequence = new Option(NAMESEQ_OPT, true, "Subsequence of entity name");
+ Option tagKeywords = new Option(TAGKEYS_OPT, true, "Keywords in tags");
+ Option orderBy = new Option(ORDER_BY_OPT, true,
+ "Order returned entities by this field");
+ Option sortOrder = new Option(SORT_ORDER_OPT, true, "asc or desc order for results");
+ Option offset = new Option(OFFSET_OPT, true,
+ "Start returning entities from this offset");
+ Option numResults = new Option(NUM_RESULTS_OPT, true,
+ "Number of results to return per request");
+ Option numInstances = new Option(NUM_INSTANCES_OPT, true,
+ "Number of instances to return per entity summary request");
+ Option path = new Option(PATH_OPT, true, "Path for a feed's instance");
+ Option skipDryRun = new Option(SKIPDRYRUN_OPT, false, "skip dry run in workflow engine");
+ Option doAs = new Option(DO_AS_OPT, true, "doAs user");
+ Option userProps = new Option(PROPS_OPT, true, "User supplied comma separated key value properties");
+
+ entityOptions.addOption(url);
+ entityOptions.addOption(path);
+ entityOptions.addOptionGroup(group);
+ entityOptions.addOption(entityType);
+ entityOptions.addOption(entityName);
+ entityOptions.addOption(filePath);
+ entityOptions.addOption(colo);
+ entityOptions.addOption(cluster);
+ entityOptions.addOption(start);
+ entityOptions.addOption(end);
+ entityOptions.addOption(fields);
+ entityOptions.addOption(filterBy);
+ entityOptions.addOption(filterTags);
+ entityOptions.addOption(nameSubsequence);
+ entityOptions.addOption(tagKeywords);
+ entityOptions.addOption(orderBy);
+ entityOptions.addOption(sortOrder);
+ entityOptions.addOption(offset);
+ entityOptions.addOption(numResults);
+ entityOptions.addOption(numInstances);
+ entityOptions.addOption(skipDryRun);
+ entityOptions.addOption(doAs);
+ entityOptions.addOption(userProps);
+
+ return entityOptions;
+ }
+
+ public void entityCommand(CommandLine commandLine, FalconClient client) throws FalconCLIException, IOException {
+ Set<String> optionsList = new HashSet<String>();
+ for (Option option : commandLine.getOptions()) {
+ optionsList.add(option.getOpt());
+ }
+
+ String result = null;
+ String entityType = commandLine.getOptionValue(TYPE_OPT);
+ String entityName = commandLine.getOptionValue(ENTITY_NAME_OPT);
+ String filePath = commandLine.getOptionValue(FILE_PATH_OPT);
+ String colo = commandLine.getOptionValue(COLO_OPT);
+ colo = getColo(colo);
+ String cluster = commandLine.getOptionValue(CLUSTER_OPT);
+ String start = commandLine.getOptionValue(START_OPT);
+ String end = commandLine.getOptionValue(END_OPT);
+ String orderBy = commandLine.getOptionValue(ORDER_BY_OPT);
+ String sortOrder = commandLine.getOptionValue(SORT_ORDER_OPT);
+ String filterBy = commandLine.getOptionValue(FILTER_BY_OPT);
+ String filterTags = commandLine.getOptionValue(TAGS_OPT);
+ String nameSubsequence = commandLine.getOptionValue(NAMESEQ_OPT);
+ String tagKeywords = commandLine.getOptionValue(TAGKEYS_OPT);
+ String fields = commandLine.getOptionValue(FIELDS_OPT);
+ String feedInstancePath = commandLine.getOptionValue(PATH_OPT);
+ Integer offset = parseIntegerInput(commandLine.getOptionValue(OFFSET_OPT), 0, "offset");
+ Integer numResults = parseIntegerInput(commandLine.getOptionValue(NUM_RESULTS_OPT),
+ null, "numResults");
+ String doAsUser = commandLine.getOptionValue(DO_AS_OPT);
+
+ Integer numInstances = parseIntegerInput(commandLine.getOptionValue(NUM_INSTANCES_OPT), 7, "numInstances");
+ Boolean skipDryRun = null;
+ if (optionsList.contains(SKIPDRYRUN_OPT)) {
+ skipDryRun = true;
+ }
+
+ String userProps = commandLine.getOptionValue(PROPS_OPT);
+
+ EntityType entityTypeEnum = null;
+ if (optionsList.contains(LIST_OPT)) {
+ if (entityType == null) {
+ entityType = "";
+ }
+ if (StringUtils.isNotEmpty(entityType)) {
+ String[] types = entityType.split(",");
+ for (String type : types) {
+ EntityType.getEnum(type);
+ }
+ }
+ } else {
+ validateNotEmpty(entityType, TYPE_OPT);
+ entityTypeEnum = EntityType.getEnum(entityType);
+ }
+ validateSortOrder(sortOrder);
+ String entityAction = "entity";
+
+ if (optionsList.contains(SLA_MISS_ALERT_OPT)) {
+ validateNotEmpty(entityType, TYPE_OPT);
+ validateNotEmpty(start, START_OPT);
+ parseDateString(start);
+ parseDateString(end);
+ SchedulableEntityInstanceResult response = client.getFeedSlaMissPendingAlerts(entityType,
+ entityName, start, end, colo);
+ result = ResponseHelper.getString(response);
+ } else if (optionsList.contains(SUBMIT_OPT)) {
+ validateNotEmpty(filePath, "file");
+ validateColo(optionsList);
+ result = client.submit(entityType, filePath, doAsUser).getMessage();
+ } else if (optionsList.contains(LOOKUP_OPT)) {
+ validateNotEmpty(feedInstancePath, PATH_OPT);
+ FeedLookupResult resp = client.reverseLookUp(entityType, feedInstancePath, doAsUser);
+ result = ResponseHelper.getString(resp);
+ } else if (optionsList.contains(UPDATE_OPT)) {
+ validateNotEmpty(filePath, "file");
+ validateColo(optionsList);
+ validateNotEmpty(entityName, ENTITY_NAME_OPT);
+ result = client.update(entityType, entityName, filePath, skipDryRun, doAsUser).getMessage();
+ } else if (optionsList.contains(SUBMIT_AND_SCHEDULE_OPT)) {
+ validateNotEmpty(filePath, "file");
+ validateColo(optionsList);
+ result = client.submitAndSchedule(entityType, filePath, skipDryRun, doAsUser, userProps).getMessage();
+ } else if (optionsList.contains(VALIDATE_OPT)) {
+ validateNotEmpty(filePath, "file");
+ validateColo(optionsList);
+ result = client.validate(entityType, filePath, skipDryRun, doAsUser).getMessage();
+ } else if (optionsList.contains(SCHEDULE_OPT)) {
+ validateNotEmpty(entityName, ENTITY_NAME_OPT);
+ colo = getColo(colo);
+ result = client.schedule(entityTypeEnum, entityName, colo, skipDryRun, doAsUser, userProps).getMessage();
+ } else if (optionsList.contains(SUSPEND_OPT)) {
+ validateNotEmpty(entityName, ENTITY_NAME_OPT);
+ colo = getColo(colo);
+ result = client.suspend(entityTypeEnum, entityName, colo, doAsUser).getMessage();
+ } else if (optionsList.contains(RESUME_OPT)) {
+ validateNotEmpty(entityName, ENTITY_NAME_OPT);
+ colo = getColo(colo);
+ result = client.resume(entityTypeEnum, entityName, colo, doAsUser).getMessage();
+ } else if (optionsList.contains(DELETE_OPT)) {
+ validateColo(optionsList);
+ validateNotEmpty(entityName, ENTITY_NAME_OPT);
+ result = client.delete(entityTypeEnum, entityName, doAsUser).getMessage();
+ } else if (optionsList.contains(STATUS_OPT)) {
+ validateNotEmpty(entityName, ENTITY_NAME_OPT);
+ colo = getColo(colo);
+ result = client.getStatus(entityTypeEnum, entityName, colo, doAsUser).getMessage();
+ } else if (optionsList.contains(DEFINITION_OPT)) {
+ validateColo(optionsList);
+ validateNotEmpty(entityName, ENTITY_NAME_OPT);
+ result = client.getDefinition(entityType, entityName, doAsUser).toString();
+ } else if (optionsList.contains(DEPENDENCY_OPT)) {
+ validateColo(optionsList);
+ validateNotEmpty(entityName, ENTITY_NAME_OPT);
+ result = client.getDependency(entityType, entityName, doAsUser).toString();
+ } else if (optionsList.contains(LIST_OPT)) {
+ validateColo(optionsList);
+ validateEntityFields(fields);
+ validateOrderBy(orderBy, entityAction);
+ validateFilterBy(filterBy, entityAction);
+ EntityList entityList = client.getEntityList(entityType, fields, nameSubsequence, tagKeywords,
+ filterBy, filterTags, orderBy, sortOrder, offset, numResults, doAsUser);
+ result = entityList != null ? entityList.toString() : "No entity of type (" + entityType + ") found.";
+ } else if (optionsList.contains(SUMMARY_OPT)) {
+ validateEntityTypeForSummary(entityType);
+ validateNotEmpty(cluster, CLUSTER_OPT);
+ validateEntityFields(fields);
+ validateFilterBy(filterBy, entityAction);
+ validateOrderBy(orderBy, entityAction);
+ result = ResponseHelper.getString(client.getEntitySummary(
+ entityType, cluster, start, end, fields, filterBy, filterTags,
+ orderBy, sortOrder, offset, numResults, numInstances, doAsUser));
+ } else if (optionsList.contains(TOUCH_OPT)) {
+ validateNotEmpty(entityName, ENTITY_NAME_OPT);
+ colo = getColo(colo);
+ result = client.touch(entityType, entityName, colo, skipDryRun, doAsUser).getMessage();
+ } else if (optionsList.contains(HELP_CMD)) {
+ OUT.get().println("Falcon Help");
+ } else {
+ throw new FalconCLIException("Invalid command");
+ }
+ OUT.get().println(result);
+ }
+
+ private void validateColo(Set<String> optionsList) throws FalconCLIException {
+ if (optionsList.contains(COLO_OPT)) {
+ throw new FalconCLIException("Invalid argument : " + COLO_OPT);
+ }
+ }
+
+ private void validateEntityFields(String fields) throws FalconCLIException {
+ if (StringUtils.isEmpty(fields)) {
+ return;
+ }
+ String[] fieldsList = fields.split(",");
+ for (String s : fieldsList) {
+ try {
+ EntityList.EntityFieldList.valueOf(s.toUpperCase());
+ } catch (IllegalArgumentException ie) {
+ throw new FalconCLIException("Invalid fields argument : " + FIELDS_OPT);
+ }
+ }
+ }
+
+ private Date parseDateString(String time) throws FalconCLIException {
+ if (time != null && !time.isEmpty()) {
+ try {
+ return SchemaHelper.parseDateUTC(time);
+ } catch(Exception e) {
+ throw new FalconCLIException("Time " + time + " is not valid", e);
+ }
+ }
+ return null;
+ }
+
+}