You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/04/22 18:04:53 UTC
falcon git commit: FALCON-1897 Extension Job Management: CLI support
Repository: falcon
Updated Branches:
refs/heads/master 897aa5fd1 -> 12675022a
FALCON-1897 Extension Job Management: CLI support
Note: There is a bug to run Falcon CLI after the code refactoring. sowmyaramesh has the patch to fix this and will send a pull request. Tested all the CLI methods with her fix patch.
Author: yzheng-hortonworks <yz...@hortonworks.com>
Reviewers: "Balu Vellanki <ba...@apache.org>"
Closes #103 from yzheng-hortonworks/FALCON-1897
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/12675022
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/12675022
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/12675022
Branch: refs/heads/master
Commit: 12675022a383ccdec250d2900776cdffc409c394
Parents: 897aa5f
Author: yzheng-hortonworks <yz...@hortonworks.com>
Authored: Fri Apr 22 09:04:48 2016 -0700
Committer: bvellanki <bv...@hortonworks.com>
Committed: Fri Apr 22 09:04:48 2016 -0700
----------------------------------------------------------------------
.../org/apache/falcon/cli/FalconEntityCLI.java | 32 ++--
.../apache/falcon/cli/FalconExtensionCLI.java | 138 ++++++++++++++---
.../org/apache/falcon/FalconCLIConstants.java | 6 +
.../org/apache/falcon/client/FalconClient.java | 153 ++++++++++++++++++-
.../apache/falcon/resource/InstancesResult.java | 2 +-
5 files changed, 287 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/12675022/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
----------------------------------------------------------------------
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
index 5c3d2a6..37a6992 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
@@ -43,11 +43,6 @@ import java.util.Set;
*/
public class FalconEntityCLI extends FalconCLI {
- private static final String SUBMIT_OPT = "submit";
- private static final String UPDATE_OPT = "update";
- private static final String DELETE_OPT = "delete";
- private static final String SUBMIT_AND_SCHEDULE_OPT = "submitAndSchedule";
- private static final String VALIDATE_OPT = "validate";
private static final String DEFINITION_OPT = "definition";
public static final String SLA_MISS_ALERT_OPT = "slaAlert";
@@ -55,7 +50,6 @@ public class FalconEntityCLI extends FalconCLI {
private static final String PATH_OPT = "path";
private static final String TOUCH_OPT = "touch";
private static final String PROPS_OPT = "properties";
- private static final String FIELDS_OPT = "fields";
private static final String TAGS_OPT = "tags";
private static final String NUM_INSTANCES_OPT = "numInstances";
private static final String SHOWSCHEDULER_OPT = "showScheduler";
@@ -68,9 +62,9 @@ public class FalconEntityCLI extends FalconCLI {
Options entityOptions = new Options();
- Option submit = new Option(SUBMIT_OPT, false,
+ Option submit = new Option(FalconCLIConstants.SUBMIT_OPT, false,
"Submits an entity xml to Falcon");
- Option update = new Option(UPDATE_OPT, false,
+ Option update = new Option(FalconCLIConstants.UPDATE_OPT, false,
"Updates an existing entity xml");
Option schedule = new Option(FalconCLIConstants.SCHEDULE_OPT, false,
"Schedules a submited entity in Falcon");
@@ -78,11 +72,11 @@ public class FalconEntityCLI extends FalconCLI {
"Suspends a running entity in Falcon");
Option resume = new Option(FalconCLIConstants.RESUME_OPT, false,
"Resumes a suspended entity in Falcon");
- Option delete = new Option(DELETE_OPT, false,
+ Option delete = new Option(FalconCLIConstants.DELETE_OPT, false,
"Deletes an entity in Falcon, and kills its instance from workflow engine");
- Option submitAndSchedule = new Option(SUBMIT_AND_SCHEDULE_OPT, false,
+ Option submitAndSchedule = new Option(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT, false,
"Submits and entity to Falcon and schedules it immediately");
- Option validate = new Option(VALIDATE_OPT, false,
+ Option validate = new Option(FalconCLIConstants.VALIDATE_OPT, false,
"Validates an entity based on the entity type");
Option status = new Option(FalconCLIConstants.STATUS_OPT, false,
"Gets the status of entity");
@@ -129,7 +123,7 @@ public class FalconEntityCLI extends FalconCLI {
Option colo = new Option(FalconCLIConstants.COLO_OPT, true, "Colo name");
Option cluster = new Option(FalconCLIConstants.CLUSTER_OPT, true, "Cluster name");
colo.setRequired(false);
- Option fields = new Option(FIELDS_OPT, true, "Entity fields to show for a request");
+ Option fields = new Option(FalconCLIConstants.FIELDS_OPT, true, "Entity fields to show for a request");
Option filterBy = new Option(FalconCLIConstants.FILTER_BY_OPT, true,
"Filter returned entities by the specified status");
Option filterTags = new Option(TAGS_OPT, true, "Filter returned entities by the specified tags");
@@ -203,7 +197,7 @@ public class FalconEntityCLI extends FalconCLI {
String filterTags = commandLine.getOptionValue(TAGS_OPT);
String nameSubsequence = commandLine.getOptionValue(FalconCLIConstants.NAMESEQ_OPT);
String tagKeywords = commandLine.getOptionValue(FalconCLIConstants.TAGKEYS_OPT);
- String fields = commandLine.getOptionValue(FIELDS_OPT);
+ String fields = commandLine.getOptionValue(FalconCLIConstants.FIELDS_OPT);
String feedInstancePath = commandLine.getOptionValue(PATH_OPT);
Integer offset = parseIntegerInput(commandLine.getOptionValue(FalconCLIConstants.OFFSET_OPT), 0, "offset");
Integer numResults = parseIntegerInput(commandLine.getOptionValue(FalconCLIConstants.NUM_RESULTS_OPT),
@@ -248,7 +242,7 @@ public class FalconEntityCLI extends FalconCLI {
SchedulableEntityInstanceResult response = client.getFeedSlaMissPendingAlerts(entityType,
entityName, start, end, colo);
result = ResponseHelper.getString(response);
- } else if (optionsList.contains(SUBMIT_OPT)) {
+ } else if (optionsList.contains(FalconCLIConstants.SUBMIT_OPT)) {
validateNotEmpty(filePath, "file");
validateColo(optionsList);
result = client.submit(entityType, filePath, doAsUser).getMessage();
@@ -256,16 +250,16 @@ public class FalconEntityCLI extends FalconCLI {
validateNotEmpty(feedInstancePath, PATH_OPT);
FeedLookupResult resp = client.reverseLookUp(entityType, feedInstancePath, doAsUser);
result = ResponseHelper.getString(resp);
- } else if (optionsList.contains(UPDATE_OPT)) {
+ } else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) {
validateNotEmpty(filePath, "file");
validateColo(optionsList);
validateNotEmpty(entityName, FalconCLIConstants.ENTITY_NAME_OPT);
result = client.update(entityType, entityName, filePath, skipDryRun, doAsUser).getMessage();
- } else if (optionsList.contains(SUBMIT_AND_SCHEDULE_OPT)) {
+ } else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) {
validateNotEmpty(filePath, "file");
validateColo(optionsList);
result = client.submitAndSchedule(entityType, filePath, skipDryRun, doAsUser, userProps).getMessage();
- } else if (optionsList.contains(VALIDATE_OPT)) {
+ } else if (optionsList.contains(FalconCLIConstants.VALIDATE_OPT)) {
validateNotEmpty(filePath, "file");
validateColo(optionsList);
result = client.validate(entityType, filePath, skipDryRun, doAsUser).getMessage();
@@ -281,7 +275,7 @@ public class FalconEntityCLI extends FalconCLI {
validateNotEmpty(entityName, FalconCLIConstants.ENTITY_NAME_OPT);
colo = getColo(colo);
result = client.resume(entityTypeEnum, entityName, colo, doAsUser).getMessage();
- } else if (optionsList.contains(DELETE_OPT)) {
+ } else if (optionsList.contains(FalconCLIConstants.DELETE_OPT)) {
validateColo(optionsList);
validateNotEmpty(entityName, FalconCLIConstants.ENTITY_NAME_OPT);
result = client.delete(entityTypeEnum, entityName, doAsUser).getMessage();
@@ -341,7 +335,7 @@ public class FalconEntityCLI extends FalconCLI {
try {
EntityList.EntityFieldList.valueOf(s.toUpperCase());
} catch (IllegalArgumentException ie) {
- throw new FalconCLIException("Invalid fields argument : " + FIELDS_OPT);
+ throw new FalconCLIException("Invalid fields argument : " + FalconCLIConstants.FIELDS_OPT);
}
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/12675022/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
----------------------------------------------------------------------
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
index c85a196..6496106 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
@@ -27,8 +27,11 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconCLIConstants;
import org.apache.falcon.client.FalconCLIException;
import org.apache.falcon.client.FalconClient;
+import org.apache.falcon.resource.ExtensionInstanceList;
+import org.apache.falcon.resource.ExtensionJobList;
import java.io.PrintStream;
import java.util.HashSet;
@@ -41,12 +44,15 @@ import java.util.concurrent.atomic.AtomicReference;
public class FalconExtensionCLI {
public static final AtomicReference<PrintStream> OUT = new AtomicReference<>(System.out);
- // Extension artifact repository Commands
+ // Extension commands
public static final String ENUMERATE_OPT = "enumerate";
public static final String DEFINITION_OPT = "definition";
public static final String DESCRIBE_OPT = "describe";
+ public static final String INSTANCES_OPT = "instances";
+ // Input parameters
public static final String ENTENSION_NAME_OPT = "extensionName";
+ public static final String JOB_NAME_OPT = "jobName";
public FalconExtensionCLI() {
}
@@ -59,56 +65,154 @@ public class FalconExtensionCLI {
String result;
String extensionName = commandLine.getOptionValue(ENTENSION_NAME_OPT);
+ String jobName = commandLine.getOptionValue(JOB_NAME_OPT);
+ String filePath = commandLine.getOptionValue(FalconCLIConstants.FILE_PATH_OPT);
+ String doAsUser = commandLine.getOptionValue(FalconCLIConstants.DO_AS_OPT);
if (optionsList.contains(ENUMERATE_OPT)) {
result = client.enumerateExtensions();
- prettyPrintJson(result);
+ result = prettyPrintJson(result);
} else if (optionsList.contains(DEFINITION_OPT)) {
- validateExtensionName(extensionName);
+ validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
result = client.getExtensionDefinition(extensionName);
- prettyPrintJson(result);
+ result = prettyPrintJson(result);
} else if (optionsList.contains(DESCRIBE_OPT)) {
- validateExtensionName(extensionName);
+ validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
result = client.getExtensionDescription(extensionName);
- OUT.get().println(result);
+ } else if (optionsList.contains(FalconCLIConstants.SUBMIT_OPT)) {
+ validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
+ validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
+ result = client.submitExtensionJob(extensionName, filePath, doAsUser).getMessage();
+ } else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) {
+ validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
+ validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
+ result = client.submitAndScheduleExtensionJob(extensionName, filePath, doAsUser).getMessage();
+ } else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) {
+ validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
+ validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
+ result = client.updateExtensionJob(extensionName, filePath, doAsUser).getMessage();
+ } else if (optionsList.contains(FalconCLIConstants.VALIDATE_OPT)) {
+ validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
+ validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
+ result = client.validateExtensionJob(extensionName, filePath, doAsUser).getMessage();
+ } else if (optionsList.contains(FalconCLIConstants.SCHEDULE_OPT)) {
+ validateRequiredParameter(jobName, JOB_NAME_OPT);
+ result = client.scheduleExtensionJob(jobName, doAsUser).getMessage();
+ } else if (optionsList.contains(FalconCLIConstants.SUSPEND_OPT)) {
+ validateRequiredParameter(jobName, JOB_NAME_OPT);
+ result = client.suspendExtensionJob(jobName, doAsUser).getMessage();
+ } else if (optionsList.contains(FalconCLIConstants.RESUME_OPT)) {
+ validateRequiredParameter(jobName, JOB_NAME_OPT);
+ result = client.resumeExtensionJob(jobName, doAsUser).getMessage();
+ } else if (optionsList.contains(FalconCLIConstants.DELETE_OPT)) {
+ validateRequiredParameter(jobName, JOB_NAME_OPT);
+ result = client.deleteExtensionJob(jobName, doAsUser).getMessage();
+ } else if (optionsList.contains(FalconCLIConstants.LIST_OPT)) {
+ validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
+ ExtensionJobList jobs = client.listExtensionJob(extensionName, doAsUser,
+ commandLine.getOptionValue(FalconCLIConstants.SORT_ORDER_OPT),
+ commandLine.getOptionValue(FalconCLIConstants.OFFSET_OPT),
+ commandLine.getOptionValue(FalconCLIConstants.NUM_RESULTS_OPT),
+ commandLine.getOptionValue(FalconCLIConstants.FIELDS_OPT));
+ result = jobs != null ? jobs.toString() : "No extension job (" + extensionName + ") found.";
+ } else if (optionsList.contains(INSTANCES_OPT)) {
+ validateRequiredParameter(jobName, JOB_NAME_OPT);
+ ExtensionInstanceList instances = client.listExtensionInstance(jobName, doAsUser,
+ commandLine.getOptionValue(FalconCLIConstants.FIELDS_OPT),
+ commandLine.getOptionValue(FalconCLIConstants.START_OPT),
+ commandLine.getOptionValue(FalconCLIConstants.END_OPT),
+ commandLine.getOptionValue(FalconCLIConstants.INSTANCE_STATUS_OPT),
+ commandLine.getOptionValue(FalconCLIConstants.ORDER_BY_OPT),
+ commandLine.getOptionValue(FalconCLIConstants.SORT_ORDER_OPT),
+ commandLine.getOptionValue(FalconCLIConstants.OFFSET_OPT),
+ commandLine.getOptionValue(FalconCLIConstants.NUM_RESULTS_OPT));
+ result = instances != null ? instances.toString() : "No instance (" + jobName + ") found.";
} else {
throw new FalconCLIException("Invalid extension command");
}
+ OUT.get().println(result);
}
public Options createExtensionOptions() {
Options extensionOptions = new Options();
- OptionGroup group = new OptionGroup();
Option enumerate = new Option(ENUMERATE_OPT, false, "Enumerate all extensions");
Option definition = new Option(DEFINITION_OPT, false, "Get extension definition");
Option describe = new Option(DESCRIBE_OPT, false, "Get extension description");
+ Option list = new Option(FalconCLIConstants.LIST_OPT, false, "List extension jobs and associated entities");
+ Option instances = new Option(INSTANCES_OPT, false, "List instances of an extension job");
+ Option submit = new Option(FalconCLIConstants.SUBMIT_OPT, false, "Submit an extension job");
+ Option submitAndSchedule = new Option(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT, false,
+ "Submit and schedule an extension job");
+ Option update = new Option(FalconCLIConstants.UPDATE_OPT, false, "Update an extension job");
+ Option validate = new Option(FalconCLIConstants.VALIDATE_OPT, false, "Validate an extension job");
+ Option schedule = new Option(FalconCLIConstants.SCHEDULE_OPT, false, "Schedule an extension job");
+ Option suspend = new Option(FalconCLIConstants.SUSPEND_OPT, false, "Suspend an extension job");
+ Option resume = new Option(FalconCLIConstants.RESUME_OPT, false, "Resume an extension job");
+ Option delete = new Option(FalconCLIConstants.DELETE_OPT, false, "Delete an extension job");
+
+ OptionGroup group = new OptionGroup();
group.addOption(enumerate);
group.addOption(definition);
group.addOption(describe);
-
+ group.addOption(list);
+ group.addOption(instances);
+ group.addOption(submit);
+ group.addOption(submitAndSchedule);
+ group.addOption(update);
+ group.addOption(validate);
+ group.addOption(schedule);
+ group.addOption(suspend);
+ group.addOption(resume);
+ group.addOption(delete);
extensionOptions.addOptionGroup(group);
- Option name = new Option(ENTENSION_NAME_OPT, true, "Extension name");
- extensionOptions.addOption(name);
+ Option extensionName = new Option(ENTENSION_NAME_OPT, true, "Extension name");
+ Option jobName = new Option(JOB_NAME_OPT, true, "Extension job name");
+ Option instanceStatus = new Option(FalconCLIConstants.INSTANCE_STATUS_OPT, true, "Instance status");
+ Option sortOrder = new Option(FalconCLIConstants.SORT_ORDER_OPT, true, "asc or desc order for results");
+ Option offset = new Option(FalconCLIConstants.OFFSET_OPT, true, "Start returning instances from this offset");
+ Option numResults = new Option(FalconCLIConstants.NUM_RESULTS_OPT, true,
+ "Number of results to return per request");
+ Option fields = new Option(FalconCLIConstants.FIELDS_OPT, true, "Entity fields to show for a request");
+ Option doAs = new Option(FalconCLIConstants.DO_AS_OPT, true, "doAs user");
+ Option start = new Option(FalconCLIConstants.START_OPT, true, "Start time of instances");
+ Option end = new Option(FalconCLIConstants.END_OPT, true, "End time of instances");
+ Option status = new Option(FalconCLIConstants.STATUS_OPT, true, "Filter returned instances by status");
+ Option orderBy = new Option(FalconCLIConstants.ORDER_BY_OPT, true, "Order returned instances by this field");
+ Option filePath = new Option(FalconCLIConstants.FILE_PATH_OPT, true, "File path of extension parameters");
+
+ extensionOptions.addOption(extensionName);
+ extensionOptions.addOption(jobName);
+ extensionOptions.addOption(instanceStatus);
+ extensionOptions.addOption(sortOrder);
+ extensionOptions.addOption(offset);
+ extensionOptions.addOption(numResults);
+ extensionOptions.addOption(fields);
+ extensionOptions.addOption(doAs);
+ extensionOptions.addOption(start);
+ extensionOptions.addOption(end);
+ extensionOptions.addOption(status);
+ extensionOptions.addOption(orderBy);
+ extensionOptions.addOption(filePath);
return extensionOptions;
}
- private void validateExtensionName(final String extensionName) throws FalconCLIException {
- if (StringUtils.isBlank(extensionName)) {
- throw new FalconCLIException("Extension name cannot be null or empty");
+ private void validateRequiredParameter(final String parameter, final String parameterName)
+ throws FalconCLIException {
+ if (StringUtils.isBlank(parameter)) {
+ throw new FalconCLIException("The parameter " + parameterName + " cannot be null or empty");
}
}
- private static void prettyPrintJson(final String jsonString) {
+ private static String prettyPrintJson(final String jsonString) {
if (StringUtils.isBlank(jsonString)) {
- OUT.get().println("No result returned");
- return;
+ return "No result returned";
}
Gson gson = new GsonBuilder().setPrettyPrinting().create();
JsonParser jp = new JsonParser();
JsonElement je = jp.parse(jsonString.trim());
- OUT.get().println(gson.toJson(je));
+ return gson.toJson(je);
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/12675022/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/FalconCLIConstants.java b/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
index bfa1748..436875d 100644
--- a/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
+++ b/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
@@ -44,6 +44,11 @@ public final class FalconCLIConstants {
public static final String ENTITY_NAME_OPT = "name";
public static final String FILE_PATH_OPT = "file";
public static final String VERSION_OPT = "version";
+ public static final String SUBMIT_OPT = "submit";
+ public static final String UPDATE_OPT = "update";
+ public static final String DELETE_OPT = "delete";
+ public static final String SUBMIT_AND_SCHEDULE_OPT = "submitAndSchedule";
+ public static final String VALIDATE_OPT = "validate";
public static final String SCHEDULE_OPT = "schedule";
public static final String SUSPEND_OPT = "suspend";
public static final String RESUME_OPT = "resume";
@@ -52,6 +57,7 @@ public final class FalconCLIConstants {
public static final String DEPENDENCY_OPT = "dependency";
public static final String LIST_OPT = "list";
public static final String SKIPDRYRUN_OPT = "skipDryRun";
+ public static final String FIELDS_OPT = "fields";
public static final String INSTANCE_STATUS_OPT = "instanceStatus";
public static final String NAMESEQ_OPT = "nameseq";
public static final String TAGKEYS_OPT = "tagkeys";
http://git-wip-us.apache.org/repos/asf/falcon/blob/12675022/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index e8ff6f1..36fb873 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -54,6 +54,8 @@ import org.apache.falcon.metadata.RelationshipType;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.EntitySummaryResult;
+import org.apache.falcon.resource.ExtensionInstanceList;
+import org.apache.falcon.resource.ExtensionJobList;
import org.apache.falcon.resource.FeedInstanceResult;
import org.apache.falcon.resource.FeedLookupResult;
import org.apache.falcon.resource.InstanceDependencyResult;
@@ -334,9 +336,19 @@ public class FalconClient extends AbstractFalconClient {
*/
protected static enum ExtensionOperations {
- ENUMERATE("api/extensions/enumerate/", HttpMethod.GET, MediaType.APPLICATION_JSON),
- DESCRIBE("api/extensions/describe/", HttpMethod.GET, MediaType.TEXT_PLAIN),
- DEFINITION("api/extensions/definition", HttpMethod.GET, MediaType.APPLICATION_JSON);
+ ENUMERATE("api/extension/enumerate/", HttpMethod.GET, MediaType.APPLICATION_JSON),
+ DESCRIBE("api/extension/describe/", HttpMethod.GET, MediaType.TEXT_PLAIN),
+ DEFINITION("api/extension/definition", HttpMethod.GET, MediaType.APPLICATION_JSON),
+ LIST("api/extension/list", HttpMethod.GET, MediaType.APPLICATION_JSON),
+ INSTANCES("api/extension/instances", HttpMethod.GET, MediaType.APPLICATION_JSON),
+ SUBMIT("api/extension/submit", HttpMethod.POST, MediaType.TEXT_XML),
+ SUBMIT_AND_SCHEDULE("api/extension/submitAndSchedule", HttpMethod.POST, MediaType.TEXT_XML),
+ UPDATE("api/extension/update", HttpMethod.POST, MediaType.TEXT_XML),
+ VALIDATE("api/extension/validate", HttpMethod.POST, MediaType.TEXT_XML),
+ SCHEDULE("api/extension/schedule", HttpMethod.POST, MediaType.TEXT_XML),
+ SUSPEND("api/extension/suspend", HttpMethod.POST, MediaType.TEXT_XML),
+ RESUME("api/extension/resume", HttpMethod.POST, MediaType.TEXT_XML),
+ DELETE("api/extension/delete", HttpMethod.POST, MediaType.TEXT_XML);
private String path;
private String method;
@@ -740,7 +752,7 @@ public class FalconClient extends AbstractFalconClient {
return stream;
}
- private <T extends APIResult> T getResponse(Class<T> clazz,
+ private <T> T getResponse(Class<T> clazz,
ClientResponse clientResponse) throws FalconCLIException {
printClientResponse(clientResponse);
checkIfSuccessful(clientResponse);
@@ -823,6 +835,12 @@ public class FalconClient extends AbstractFalconClient {
.method(operation.method, ClientResponse.class);
}
+ public ClientResponse call(ExtensionOperations operation) {
+ return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+ .accept(operation.mimeType).type(MediaType.TEXT_XML)
+ .method(operation.method, ClientResponse.class);
+ }
+
public ClientResponse call(Entities operation, InputStream entityStream) {
return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(operation.mimeType).type(MediaType.TEXT_XML)
@@ -834,6 +852,12 @@ public class FalconClient extends AbstractFalconClient {
.accept(operation.mimeType).type(MediaType.TEXT_XML)
.method(operation.method, ClientResponse.class, entityStream);
}
+
+ public ClientResponse call(ExtensionOperations operation, InputStream entityStream) {
+ return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+ .accept(operation.mimeType).type(MediaType.TEXT_XML)
+ .method(operation.method, ClientResponse.class, entityStream);
+ }
}
public FeedLookupResult reverseLookUp(String type, String path, String doAsUser) throws FalconCLIException {
@@ -988,15 +1012,130 @@ public class FalconClient extends AbstractFalconClient {
}
public String enumerateExtensions() throws FalconCLIException {
- return sendExtensionRequest(ExtensionOperations.ENUMERATE, null);
+ ClientResponse clientResponse = new ResourceBuilder()
+ .path(ExtensionOperations.ENUMERATE.path)
+ .call(ExtensionOperations.ENUMERATE);
+ return getResponse(String.class, clientResponse);
}
public String getExtensionDefinition(final String extensionName) throws FalconCLIException {
- return sendExtensionRequest(ExtensionOperations.DEFINITION, extensionName);
+ ClientResponse clientResponse = new ResourceBuilder()
+ .path(ExtensionOperations.DEFINITION.path, extensionName)
+ .call(ExtensionOperations.DEFINITION);
+ return getResponse(String.class, clientResponse);
}
public String getExtensionDescription(final String extensionName) throws FalconCLIException {
- return sendExtensionRequest(ExtensionOperations.DESCRIBE, extensionName);
+ ClientResponse clientResponse = new ResourceBuilder()
+ .path(ExtensionOperations.DESCRIBE.path, extensionName)
+ .call(ExtensionOperations.DESCRIBE);
+ return getResponse(String.class, clientResponse);
+ }
+
+ public APIResult submitExtensionJob(final String extensionName, final String filePath, final String doAsUser)
+ throws FalconCLIException {
+ InputStream entityStream = getServletInputStream(filePath);
+ ClientResponse clientResponse = new ResourceBuilder()
+ .path(ExtensionOperations.SUBMIT.path, extensionName)
+ .addQueryParam(DO_AS_OPT, doAsUser)
+ .call(ExtensionOperations.SUBMIT, entityStream);
+ return getResponse(APIResult.class, clientResponse);
+ }
+
+ public APIResult submitAndScheduleExtensionJob(final String extensionName, final String filePath,
+ final String doAsUser) throws FalconCLIException {
+ InputStream entityStream = getServletInputStream(filePath);
+ ClientResponse clientResponse = new ResourceBuilder()
+ .path(ExtensionOperations.SUBMIT_AND_SCHEDULE.path, extensionName)
+ .addQueryParam(DO_AS_OPT, doAsUser)
+ .call(ExtensionOperations.SUBMIT_AND_SCHEDULE, entityStream);
+ return getResponse(APIResult.class, clientResponse);
+ }
+
+ public APIResult updateExtensionJob(final String extensionName, final String filePath, final String doAsUser)
+ throws FalconCLIException {
+ InputStream entityStream = getServletInputStream(filePath);
+ ClientResponse clientResponse = new ResourceBuilder()
+ .path(ExtensionOperations.UPDATE.path, extensionName)
+ .addQueryParam(DO_AS_OPT, doAsUser)
+ .call(ExtensionOperations.UPDATE, entityStream);
+ return getResponse(APIResult.class, clientResponse);
+ }
+
+ public APIResult validateExtensionJob(final String extensionName, final String filePath, final String doAsUser)
+ throws FalconCLIException {
+ InputStream entityStream = getServletInputStream(filePath);
+ ClientResponse clientResponse = new ResourceBuilder()
+ .path(ExtensionOperations.VALIDATE.path, extensionName)
+ .addQueryParam(DO_AS_OPT, doAsUser)
+ .call(ExtensionOperations.VALIDATE, entityStream);
+ return getResponse(APIResult.class, clientResponse);
+ }
+
+ public APIResult scheduleExtensionJob(final String jobName, final String doAsUser) throws FalconCLIException {
+ ClientResponse clientResponse = new ResourceBuilder()
+ .path(ExtensionOperations.SCHEDULE.path, jobName)
+ .addQueryParam(DO_AS_OPT, doAsUser)
+ .call(ExtensionOperations.SCHEDULE);
+ return getResponse(APIResult.class, clientResponse);
+ }
+
+ public APIResult suspendExtensionJob(final String jobName, final String doAsUser) throws FalconCLIException {
+ ClientResponse clientResponse = new ResourceBuilder()
+ .path(ExtensionOperations.SUSPEND.path, jobName)
+ .addQueryParam(DO_AS_OPT, doAsUser)
+ .call(ExtensionOperations.SUSPEND);
+ return getResponse(APIResult.class, clientResponse);
+ }
+
+ public APIResult resumeExtensionJob(final String jobName, final String doAsUser) throws FalconCLIException {
+ ClientResponse clientResponse = new ResourceBuilder()
+ .path(ExtensionOperations.RESUME.path, jobName)
+ .addQueryParam(DO_AS_OPT, doAsUser)
+ .call(ExtensionOperations.RESUME);
+ return getResponse(APIResult.class, clientResponse);
+ }
+
+ public APIResult deleteExtensionJob(final String jobName, final String doAsUser) throws FalconCLIException {
+ ClientResponse clientResponse = new ResourceBuilder()
+ .path(ExtensionOperations.DELETE.path, jobName)
+ .addQueryParam(DO_AS_OPT, doAsUser)
+ .call(ExtensionOperations.DELETE);
+ return getResponse(APIResult.class, clientResponse);
+ }
+
+ public ExtensionJobList listExtensionJob(final String extensionName, final String doAsUser,
+ final String sortOrder, final String offset,
+ final String numResults, final String fields) throws FalconCLIException {
+ ClientResponse clientResponse = new ResourceBuilder()
+ .path(ExtensionOperations.LIST.path, extensionName)
+ .addQueryParam(DO_AS_OPT, doAsUser)
+ .addQueryParam(FIELDS, fields)
+ .addQueryParam(SORT_ORDER, sortOrder)
+ .addQueryParam(OFFSET, offset)
+ .addQueryParam(NUM_RESULTS, numResults)
+ .call(ExtensionOperations.LIST);
+ return getResponse(ExtensionJobList.class, clientResponse);
+ }
+
+ public ExtensionInstanceList listExtensionInstance(final String jobName, final String doAsUser, final String fields,
+ final String start, final String end, final String status,
+ final String orderBy, final String sortOrder,
+ final String offset, final String numResults)
+ throws FalconCLIException {
+ ClientResponse clientResponse = new ResourceBuilder()
+ .path(ExtensionOperations.INSTANCES.path, jobName)
+ .addQueryParam(DO_AS_OPT, doAsUser)
+ .addQueryParam(FIELDS, fields)
+ .addQueryParam(START, start)
+ .addQueryParam(END, end)
+ .addQueryParam(INSTANCE_STATUS, status)
+ .addQueryParam(ORDER_BY, orderBy)
+ .addQueryParam(SORT_ORDER, sortOrder)
+ .addQueryParam(OFFSET, offset)
+ .addQueryParam(NUM_RESULTS, numResults)
+ .call(ExtensionOperations.INSTANCES);
+ return getResponse(ExtensionInstanceList.class, clientResponse);
}
private String sendExtensionRequest(final ExtensionOperations operation,
http://git-wip-us.apache.org/repos/asf/falcon/blob/12675022/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
index e12c083..f8de645 100644
--- a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
+++ b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
@@ -181,7 +181,7 @@ public class InstancesResult extends APIResult {
+ (this.sourceCluster == null ? "" : ", source-cluster:"
+ this.sourceCluster)
+ (this.cluster == null ? "" : ", cluster:"
- + this.cluster) + "}";
+ + this.cluster) + "}\n";
}
}