You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2016/01/19 16:14:17 UTC
falcon git commit: FALCON-1707 Code Refactoring for Falcon Client.
Contributed by Ajay Yadava.
Repository: falcon
Updated Branches:
refs/heads/master 0084c355c -> ae59db3c8
FALCON-1707 Code Refactoring for Falcon Client. Contributed by Ajay Yadava.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ae59db3c
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ae59db3c
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ae59db3c
Branch: refs/heads/master
Commit: ae59db3c834dde64a3cb372904158a8ba7566a3a
Parents: 0084c35
Author: Ajay Yadava <aj...@gmail.com>
Authored: Tue Jan 19 20:05:32 2016 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Tue Jan 19 20:05:32 2016 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../falcon/client/FalconCLIException.java | 2 +-
.../org/apache/falcon/client/FalconClient.java | 931 +++++++------------
3 files changed, 330 insertions(+), 605 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/ae59db3c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 43c7b81..7980393 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -59,6 +59,8 @@ Proposed Release Version: 0.9
FALCON-1213 Base framework of the native scheduler(Pallavi Rao)
IMPROVEMENTS
+ FALCON-1707 Code Refactoring for Falcon Client(Ajay Yadava)
+
FALCON-1733 Support for building falcon with JDK 1.8 also(Narayan Periwal via Ajay Yadava)
FALCON-1662 Ensure entity can be scheduled on multiple clusters on same colo (Pallavi Rao)
http://git-wip-us.apache.org/repos/asf/falcon/blob/ae59db3c/client/src/main/java/org/apache/falcon/client/FalconCLIException.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconCLIException.java b/client/src/main/java/org/apache/falcon/client/FalconCLIException.java
index 831f7ac..e7dfa52 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconCLIException.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconCLIException.java
@@ -77,7 +77,7 @@ public class FalconCLIException extends Exception {
return statusValue + ";" + message;
}
- public static FalconCLIException fromReponse(ClientResponse clientResponse, Class clazz) {
+ public static FalconCLIException fromReponse(ClientResponse clientResponse, Class<? extends APIResult> clazz) {
return new FalconCLIException(getMessage(clientResponse, clazz));
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/ae59db3c/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 3f3a871..52ecef2 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -18,11 +18,32 @@
package org.apache.falcon.client;
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.client.urlconnection.HTTPSProperties;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.security.SecureRandom;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManager;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.net.util.TrustManagerUtils;
@@ -50,31 +71,11 @@ import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
-
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSession;
-import javax.net.ssl.TrustManager;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriBuilder;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintStream;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.security.SecureRandom;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HTTPSProperties;
/**
* Client API to submit and manage Falcon Entities (Cluster, Feed, Process) jobs
@@ -82,30 +83,61 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class FalconClient extends AbstractFalconClient {
- public static final AtomicReference<PrintStream> OUT = new AtomicReference<PrintStream>(System.out);
+ public static final AtomicReference<PrintStream> OUT = new AtomicReference<>(System.out);
public static final String WS_HEADER_PREFIX = "header:";
public static final String USER = System.getProperty("user.name");
public static final String AUTH_URL = "api/options?" + PseudoAuthenticator.USER_NAME + "=" + USER;
+
+
+ public static final String PATH = "path";
+ public static final String COLO = "colo";
+ private static final String KEY = "key";
+ private static final String VALUE = "value";
+ public static final String CLUSTER = "cluster";
+ public static final String RUN_ID = "runid";
+ public static final String FORCE = "force";
+ public static final String SHOW_SCHEDULER = "showScheduler";
+ public static final String ENTITY_NAME = "name";
+ public static final String SKIP_DRYRUN = "skipDryRun";
+ public static final String FILTER_BY = "filterBy";
+ public static final String ORDER_BY = "orderBy";
+ public static final String SORT_ORDER = "sortOrder";
+ public static final String OFFSET = "offset";
+ public static final String NUM_RESULTS = "numResults";
+ public static final String START = "start";
+ public static final String END = "end";
+ public static final String INSTANCE_TIME = "instanceTime";
+ public static final String PROPERTIES = "properties";
+ private static final String FIELDS = "fields";
+ private static final String NAME_SUBSEQUENCE = "nameseq";
+ private static final String FILTER_TAGS = "tags";
+ private static final String TAG_KEYWORDS = "tagkeys";
+ private static final String LIFECYCLE = "lifecycle";
+ private static final String NUM_INSTANCES = "numInstances";
+
+
+
+
+ public static final String DO_AS_OPT = "doAs";
/**
* Name of the HTTP cookie used for the authentication token between the client and the server.
*/
public static final String AUTH_COOKIE = "hadoop.auth";
private static final String AUTH_COOKIE_EQ = AUTH_COOKIE + "=";
- private static final KerberosAuthenticator AUTHENTICATOR = new KerberosAuthenticator();
+ private static final KerberosAuthenticator AUTHENTICATOR = new KerberosAuthenticator();
private static final String TEMPLATE_SUFFIX = "-template.xml";
- private static final String PROPERTIES_SUFFIX = ".properties";
+ private static final String PROPERTIES_SUFFIX = ".properties";
public static final HostnameVerifier ALL_TRUSTING_HOSTNAME_VERIFIER = new HostnameVerifier() {
@Override
public boolean verify(String hostname, SSLSession sslSession) {
return true;
}
};
-
private final WebResource service;
private final AuthenticatedURL.Token authenticationToken;
@@ -182,10 +214,6 @@ public class FalconClient extends AbstractFalconClient {
this.debugMode = debugMode;
}
- public Properties getClientProperties() {
- return clientProperties;
- }
-
public static AuthenticatedURL.Token getToken(String baseUrl) throws FalconCLIException {
AuthenticatedURL.Token currentToken = new AuthenticatedURL.Token();
try {
@@ -210,7 +238,7 @@ public class FalconClient extends AbstractFalconClient {
VALIDATE("api/entities/validate/", HttpMethod.POST, MediaType.TEXT_XML),
SUBMIT("api/entities/submit/", HttpMethod.POST, MediaType.TEXT_XML),
UPDATE("api/entities/update/", HttpMethod.POST, MediaType.TEXT_XML),
- SUBMITandSCHEDULE("api/entities/submitAndSchedule/", HttpMethod.POST, MediaType.TEXT_XML),
+ SUBMITANDSCHEDULE("api/entities/submitAndSchedule/", HttpMethod.POST, MediaType.TEXT_XML),
SCHEDULE("api/entities/schedule/", HttpMethod.POST, MediaType.TEXT_XML),
SUSPEND("api/entities/suspend/", HttpMethod.POST, MediaType.TEXT_XML),
RESUME("api/entities/resume/", HttpMethod.POST, MediaType.TEXT_XML),
@@ -314,193 +342,195 @@ public class FalconClient extends AbstractFalconClient {
}
public APIResult schedule(EntityType entityType, String entityName, String colo,
- Boolean skipDryRun, String doAsUser, String properties)
- throws FalconCLIException {
-
- return sendEntityRequest(Entities.SCHEDULE, entityType, entityName,
- colo, skipDryRun, doAsUser, properties);
-
+ Boolean skipDryRun, String doAsUser, String properties) throws FalconCLIException {
+ String type = entityType.toString().toLowerCase();
+ ClientResponse clientResponse = new ResourceBuilder().path(Entities.SCHEDULE.path, type, entityName)
+ .addQueryParam(COLO, colo).addQueryParam(SKIP_DRYRUN, skipDryRun)
+ .addQueryParam(PROPERTIES, properties).addQueryParam(DO_AS_OPT, doAsUser).call(Entities.SCHEDULE);
+ return getResponse(APIResult.class, clientResponse);
}
public APIResult suspend(EntityType entityType, String entityName, String colo, String doAsUser)
throws FalconCLIException {
-
- return sendEntityRequest(Entities.SUSPEND, entityType, entityName, colo, null, doAsUser, null);
-
+ String type = entityType.toString().toLowerCase();
+ ClientResponse clientResponse = new ResourceBuilder().path(Entities.SUSPEND.path, type, entityName)
+ .addQueryParam(COLO, colo).addQueryParam(DO_AS_OPT, doAsUser).call(Entities.SUSPEND);
+ return getResponse(APIResult.class, clientResponse);
}
public APIResult resume(EntityType entityType, String entityName, String colo, String doAsUser)
throws FalconCLIException {
-
- return sendEntityRequest(Entities.RESUME, entityType, entityName, colo, null, doAsUser, null);
-
+ String type = entityType.toString().toLowerCase();
+ ClientResponse clientResponse = new ResourceBuilder().path(Entities.RESUME.path, type, entityName)
+ .addQueryParam(COLO, colo).addQueryParam(DO_AS_OPT, doAsUser).call(Entities.RESUME);
+ return getResponse(APIResult.class, clientResponse);
}
- public APIResult delete(EntityType entityType, String entityName, String doAsUser)
- throws FalconCLIException {
-
- return sendEntityRequest(Entities.DELETE, entityType, entityName, null, null, doAsUser, null);
-
+ public APIResult delete(EntityType entityType, String entityName, String doAsUser) throws FalconCLIException {
+ String type = entityType.toString().toLowerCase();
+ ClientResponse clientResponse = new ResourceBuilder().path(Entities.DELETE.path, type, entityName)
+ .addQueryParam(DO_AS_OPT, doAsUser).call(Entities.DELETE);
+ return getResponse(APIResult.class, clientResponse);
}
public APIResult validate(String entityType, String filePath, Boolean skipDryRun, String doAsUser)
throws FalconCLIException {
-
InputStream entityStream = getServletInputStream(filePath);
- return sendEntityRequestWithObject(Entities.VALIDATE, entityType,
- entityStream, null, skipDryRun, doAsUser, null);
+ ClientResponse clientResponse = new ResourceBuilder().path(Entities.VALIDATE.path, entityType)
+ .addQueryParam(SKIP_DRYRUN, skipDryRun).addQueryParam(DO_AS_OPT, doAsUser)
+ .call(Entities.VALIDATE, entityStream);
+ return getResponse(APIResult.class, clientResponse);
}
public APIResult submit(String entityType, String filePath, String doAsUser)
throws FalconCLIException {
-
InputStream entityStream = getServletInputStream(filePath);
- return sendEntityRequestWithObject(Entities.SUBMIT, entityType,
- entityStream, null, null, doAsUser, null);
+ ClientResponse clientResponse = new ResourceBuilder().path(Entities.SUBMIT.path, entityType)
+ .addQueryParam(DO_AS_OPT, doAsUser).call(Entities.SUBMIT, entityStream);
+ return getResponse(APIResult.class, clientResponse);
}
public APIResult update(String entityType, String entityName, String filePath,
- Boolean skipDryRun, String doAsUser)
- throws FalconCLIException {
+ Boolean skipDryRun, String doAsUser) throws FalconCLIException {
InputStream entityStream = getServletInputStream(filePath);
Entities operation = Entities.UPDATE;
- WebResource resource = service.path(operation.path).path(entityType).path(entityName);
- if (null != skipDryRun) {
- resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
- }
- if (StringUtils.isNotEmpty(doAsUser)) {
- resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
- }
- ClientResponse clientResponse = resource
- .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(operation.mimeType).type(MediaType.TEXT_XML)
- .method(operation.method, ClientResponse.class, entityStream);
- printClientResponse(clientResponse);
- checkIfSuccessful(clientResponse);
- return parseAPIResult(clientResponse);
+ ClientResponse clientResponse = new ResourceBuilder().path(operation.path, entityType, entityName)
+ .addQueryParam(SKIP_DRYRUN, skipDryRun).addQueryParam(DO_AS_OPT, doAsUser)
+ .call(operation, entityStream);
+ return getResponse(APIResult.class, clientResponse);
}
@Override
- public APIResult submitAndSchedule(String entityType, String filePath,
- Boolean skipDryRun, String doAsUser, String properties)
- throws FalconCLIException {
-
+ public APIResult submitAndSchedule(String entityType, String filePath, Boolean skipDryRun,
+ String doAsUser, String properties) throws FalconCLIException {
InputStream entityStream = getServletInputStream(filePath);
- return sendEntityRequestWithObject(Entities.SUBMITandSCHEDULE,
- entityType, entityStream, null, skipDryRun, doAsUser, properties);
+ ClientResponse clientResponse = new ResourceBuilder().path(Entities.SUBMITANDSCHEDULE.path, entityType)
+ .addQueryParam(SKIP_DRYRUN, skipDryRun).addQueryParam(DO_AS_OPT, doAsUser)
+ .addQueryParam(PROPERTIES, properties).call(Entities.SUBMITANDSCHEDULE, entityStream);
+ return getResponse(APIResult.class, clientResponse);
}
public APIResult getStatus(EntityType entityType, String entityName, String colo,
String doAsUser, boolean showScheduler) throws FalconCLIException {
+ String type = entityType.toString().toLowerCase();
+ ClientResponse clientResponse = new ResourceBuilder().path(Entities.STATUS.path, type, entityName)
+ .addQueryParam(COLO, colo).addQueryParam(DO_AS_OPT, doAsUser)
+ .addQueryParam(SHOW_SCHEDULER, showScheduler).call(Entities.STATUS);
- return sendEntityRequest(Entities.STATUS, entityType, entityName, colo, null, doAsUser, null, showScheduler);
+ return getResponse(APIResult.class, clientResponse);
}
- public Entity getDefinition(String entityType, String entityName, String doAsUser)
- throws FalconCLIException {
-
- return sendDefinitionRequest(Entities.DEFINITION, entityType,
- entityName, doAsUser);
+ public Entity getDefinition(String entityType, String entityName, String doAsUser) throws FalconCLIException {
+ ClientResponse clientResponse = new ResourceBuilder().path(Entities.DEFINITION.path, entityType, entityName)
+ .call(Entities.DEFINITION);
+ String entity = getResponseAsString(clientResponse);
+ return Entity.fromString(EntityType.getEnum(entityType), entity);
}
public EntityList getDependency(String entityType, String entityName, String doAsUser)
throws FalconCLIException {
- return sendDependencyRequest(Entities.DEPENDENCY, entityType, entityName, doAsUser);
+
+ ClientResponse clientResponse = new ResourceBuilder().path(Entities.DEPENDENCY.path, entityType, entityName)
+ .addQueryParam(DO_AS_OPT, doAsUser).call(Entities.DEPENDENCY);
+
+ printClientResponse(clientResponse);
+ checkIfSuccessful(clientResponse);
+
+ EntityList result = clientResponse.getEntity(EntityList.class);
+ if (result == null || result.getElements() == null) {
+ return null;
+ }
+ return result;
}
//SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
public SchedulableEntityInstanceResult getFeedSlaMissPendingAlerts(String entityType, String entityName,
String startTime, String endTime, String colo) throws FalconCLIException {
-
- WebResource resource = service.path(Entities.SLA.path).path(entityType).queryParam("start", startTime)
- .queryParam("colo", colo);
- if (endTime != null) {
- resource = resource.queryParam("end", endTime);
- }
- if (entityName != null) {
- resource = resource.queryParam("name", entityName);
- }
- ClientResponse clientResponse = resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(Entities.SLA.mimeType).type(MediaType.APPLICATION_JSON)
- .method(Entities.SLA.method, ClientResponse.class);
- checkIfSuccessful(clientResponse);
- return clientResponse.getEntity(SchedulableEntityInstanceResult.class);
+ ClientResponse clientResponse = new ResourceBuilder().path(Entities.SLA.path, entityType)
+ .addQueryParam(START, startTime).addQueryParam(COLO, colo).addQueryParam(END, endTime)
+ .addQueryParam(ENTITY_NAME, entityName).call(Entities.SLA);
+ return getResponse(SchedulableEntityInstanceResult.class, clientResponse);
}
- public TriageResult triage(String entityType, String entityName, String instanceTime, String colo)
- throws FalconCLIException {
- WebResource resource = service
- .path(Instances.TRIAGE.path).path(entityType).path(entityName)
- .queryParam("start", instanceTime).queryParam("colo", colo);
- ClientResponse clientResponse = resource
- .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(Instances.TRIAGE.mimeType).type(MediaType.TEXT_XML)
- .method(Instances.TRIAGE.method, ClientResponse.class);
-
- printClientResponse(clientResponse);
- checkIfSuccessful(clientResponse);
- return clientResponse.getEntity(TriageResult.class);
+ public TriageResult triage(String entityType, String entityName, String instanceTime,
+ String colo) throws FalconCLIException {
+ ClientResponse clientResponse = new ResourceBuilder().path(Instances.TRIAGE.path, entityType, entityName)
+ .addQueryParam(START, instanceTime).addQueryParam(COLO, colo).call(Instances.TRIAGE);
+ return getResponse(TriageResult.class, clientResponse);
}
@Override
public EntityList getEntityList(String entityType, String fields, String nameSubsequence, String tagKeywords,
String filterBy, String filterTags, String orderBy, String sortOrder,
Integer offset, Integer numResults, String doAsUser) throws FalconCLIException {
- return sendListRequest(Entities.LIST, entityType, fields, nameSubsequence, tagKeywords, filterBy,
- filterTags, orderBy, sortOrder, offset, numResults, doAsUser);
+ Entities operation = Entities.LIST;
+ ClientResponse clientResponse = new ResourceBuilder().path(operation.path, entityType)
+ .addQueryParam(DO_AS_OPT, doAsUser).addQueryParam(NUM_RESULTS, numResults)
+ .addQueryParam(OFFSET, offset).addQueryParam(SORT_ORDER, sortOrder)
+ .addQueryParam(ORDER_BY, orderBy).addQueryParam(FILTER_BY, filterBy)
+ .addQueryParam(FIELDS, fields).addQueryParam(NAME_SUBSEQUENCE, nameSubsequence)
+ .addQueryParam(TAG_KEYWORDS, tagKeywords).addQueryParam(FILTER_TAGS, filterTags)
+ .call(operation);
+
+ printClientResponse(clientResponse);
+ checkIfSuccessful(clientResponse);
+
+ EntityList result = clientResponse.getEntity(EntityList.class);
+ if (result == null || result.getElements() == null) {
+ return null;
+ }
+ return result;
}
@Override
public EntitySummaryResult getEntitySummary(String entityType, String cluster, String start, String end,
String fields, String filterBy, String filterTags,
- String orderBy, String sortOrder,
- Integer offset, Integer numResults, Integer numInstances, String doAsUser)
- throws FalconCLIException {
- return sendEntitySummaryRequest(Entities.SUMMARY, entityType, cluster, start, end, fields, filterBy, filterTags,
- orderBy, sortOrder, offset, numResults, numInstances, doAsUser);
+ String orderBy, String sortOrder, Integer offset, Integer numResults,
+ Integer numInstances, String doAsUser) throws FalconCLIException {
+ ClientResponse clientResponse = new ResourceBuilder().path(Entities.SUMMARY.path, entityType)
+ .addQueryParam(CLUSTER, cluster).addQueryParam(START, start).addQueryParam(END, end)
+ .addQueryParam(SORT_ORDER, sortOrder).addQueryParam(ORDER_BY, orderBy)
+ .addQueryParam(OFFSET, offset).addQueryParam(NUM_RESULTS, numResults)
+ .addQueryParam(DO_AS_OPT, doAsUser).addQueryParam(FILTER_BY, filterBy)
+ .addQueryParam(NUM_INSTANCES, numInstances).addQueryParam(FIELDS, fields)
+ .addQueryParam(FILTER_TAGS, filterTags).call(Entities.SUMMARY);
+ return getResponse(EntitySummaryResult.class, clientResponse);
}
@Override
public APIResult touch(String entityType, String entityName, String colo,
Boolean skipDryRun, String doAsUser) throws FalconCLIException {
Entities operation = Entities.TOUCH;
- WebResource resource = service.path(operation.path).path(entityType).path(entityName);
- if (colo != null) {
- resource = resource.queryParam("colo", colo);
- }
- if (null != skipDryRun) {
- resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
- }
- if (StringUtils.isNotEmpty(doAsUser)) {
- resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
- }
- ClientResponse clientResponse = resource
- .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(operation.mimeType).type(MediaType.TEXT_XML)
- .method(operation.method, ClientResponse.class);
- printClientResponse(clientResponse);
- checkIfSuccessful(clientResponse);
- return parseAPIResult(clientResponse);
+ ClientResponse clientResponse = new ResourceBuilder().path(operation.path, entityType, entityName)
+ .addQueryParam(COLO, colo).addQueryParam(SKIP_DRYRUN, skipDryRun)
+ .addQueryParam(DO_AS_OPT, doAsUser).call(operation);
+ return getResponse(APIResult.class, clientResponse);
}
public InstancesResult getRunningInstances(String type, String entity, String colo, List<LifeCycle> lifeCycles,
String filterBy, String orderBy, String sortOrder,
Integer offset, Integer numResults, String doAsUser) throws FalconCLIException {
-
- return sendInstanceRequest(Instances.RUNNING, type, entity, null, null,
- null, null, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults, doAsUser)
- .getEntity(InstancesResult.class);
+ ClientResponse clientResponse = new ResourceBuilder().path(Instances.RUNNING.path, type, entity)
+ .addQueryParam(FILTER_BY, filterBy).addQueryParam(ORDER_BY, orderBy)
+ .addQueryParam(SORT_ORDER, sortOrder).addQueryParam(OFFSET, offset)
+ .addQueryParam(NUM_RESULTS, numResults).addQueryParam(COLO, colo)
+ .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(USER, doAsUser).call(Instances.RUNNING);
+ return getResponse(InstancesResult.class, clientResponse);
}
@Override
public InstancesResult getStatusOfInstances(String type, String entity, String start, String end, String colo,
List<LifeCycle> lifeCycles, String filterBy, String orderBy,
- String sortOrder, Integer offset, Integer numResults, String doAsUser,
- Boolean allAttempts) throws FalconCLIException {
- return sendInstanceRequest(Instances.STATUS, type, entity, start, end,
- null, null, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults, doAsUser, allAttempts)
- .getEntity(InstancesResult.class);
+ String sortOrder, Integer offset, Integer numResults,
+ String doAsUser, Boolean allAttempts) throws FalconCLIException {
+ ClientResponse clientResponse = new ResourceBuilder().path(Instances.STATUS.path, type, entity)
+ .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo)
+ .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(FILTER_BY, filterBy)
+ .addQueryParam(ORDER_BY, orderBy).addQueryParam(SORT_ORDER, sortOrder)
+ .addQueryParam(OFFSET, offset).addQueryParam(NUM_RESULTS, numResults)
+ .addQueryParam(USER, doAsUser).call(Instances.STATUS);
+ return getResponse(InstancesResult.class, clientResponse);
}
public InstancesSummaryResult getSummaryOfInstances(String type, String entity,
@@ -508,48 +538,47 @@ public class FalconClient extends AbstractFalconClient {
String colo, List<LifeCycle> lifeCycles,
String filterBy, String orderBy, String sortOrder,
String doAsUser) throws FalconCLIException {
-
- return sendInstanceRequest(Instances.SUMMARY, type, entity, start, end, null,
- null, colo, lifeCycles, filterBy, orderBy, sortOrder, 0, null, doAsUser)
- .getEntity(InstancesSummaryResult.class);
+ ClientResponse clientResponse = new ResourceBuilder().path(Instances.SUMMARY.path, type, entity)
+ .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo)
+ .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(USER, doAsUser).call(Instances.SUMMARY);
+ return getResponse(InstancesSummaryResult.class, clientResponse);
}
public FeedInstanceResult getFeedListing(String type, String entity, String start,
- String end, String colo, String doAsUser)
- throws FalconCLIException {
-
- return sendInstanceRequest(Instances.LISTING, type, entity, start, end, null,
- null, colo, null, "", "", "", 0, null, doAsUser).getEntity(FeedInstanceResult.class);
+ String end, String colo, String doAsUser) throws FalconCLIException {
+ ClientResponse clientResponse = new ResourceBuilder().path(Instances.KILL.path, type, entity)
+ .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo)
+ .addQueryParam(USER, doAsUser).call(Instances.LISTING);
+ return getResponse(FeedInstanceResult.class, clientResponse);
}
public InstancesResult killInstances(String type, String entity, String start,
String end, String colo, String clusters,
String sourceClusters, List<LifeCycle> lifeCycles,
- String doAsUser)
- throws FalconCLIException, UnsupportedEncodingException {
-
- return sendInstanceRequest(Instances.KILL, type, entity, start, end,
- getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles, doAsUser);
+ String doAsUser) throws FalconCLIException, UnsupportedEncodingException {
+ InputStream props = getServletInputStream(clusters, sourceClusters, null);
+ ClientResponse clientResponse = new ResourceBuilder().path(Instances.KILL.path, type, entity)
+ .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo)
+ .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(USER, doAsUser).call(Instances.KILL, props);
+ return getResponse(InstancesResult.class, clientResponse);
}
- public InstancesResult suspendInstances(String type, String entity, String start,
- String end, String colo, String clusters,
- String sourceClusters, List<LifeCycle> lifeCycles,
- String doAsUser)
- throws FalconCLIException, UnsupportedEncodingException {
-
- return sendInstanceRequest(Instances.SUSPEND, type, entity, start, end,
- getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles, doAsUser);
+ public InstancesResult suspendInstances(String type, String entity, String start, String end, String colo,
+ String clusters, String sourceClusters, List<LifeCycle> lifeCycles,
+ String doAsUser) throws FalconCLIException, UnsupportedEncodingException {
+ ClientResponse clientResponse = new ResourceBuilder().path(Instances.SUSPEND.path, type, entity)
+ .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo)
+ .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(USER, doAsUser).call(Instances.SUSPEND);
+ return getResponse(InstancesResult.class, clientResponse);
}
- public InstancesResult resumeInstances(String type, String entity, String start,
- String end, String colo, String clusters,
- String sourceClusters, List<LifeCycle> lifeCycles,
- String doAsUser)
- throws FalconCLIException, UnsupportedEncodingException {
-
- return sendInstanceRequest(Instances.RESUME, type, entity, start, end,
- getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles, doAsUser);
+ public InstancesResult resumeInstances(String type, String entity, String start, String end, String colo,
+ String clusters, String sourceClusters, List<LifeCycle> lifeCycles,
+ String doAsUser) throws FalconCLIException, UnsupportedEncodingException {
+ ClientResponse clientResponse = new ResourceBuilder().path(Instances.RESUME.path, type, entity)
+ .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo)
+ .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(USER, doAsUser).call(Instances.RESUME);
+ return getResponse(InstancesResult.class, clientResponse);
}
public InstancesResult rerunInstances(String type, String entity, String start,
@@ -573,8 +602,12 @@ public class FalconClient extends AbstractFalconClient {
}
}
String temp = (buffer.length() == 0) ? null : buffer.toString();
- return sendInstanceRequest(Instances.RERUN, type, entity, start, end,
- getServletInputStream(clusters, sourceClusters, temp), null, colo, lifeCycles, isForced, doAsUser);
+ InputStream props = getServletInputStream(clusters, sourceClusters, temp);
+ ClientResponse clientResponse = new ResourceBuilder().path(Instances.RERUN.path, type, entity)
+ .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo)
+ .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(FORCE, isForced)
+ .addQueryParam(USER, doAsUser).call(Instances.RERUN, props);
+ return getResponse(InstancesResult.class, clientResponse);
}
public InstancesResult getLogsOfInstances(String type, String entity, String start,
@@ -582,9 +615,13 @@ public class FalconClient extends AbstractFalconClient {
List<LifeCycle> lifeCycles, String filterBy,
String orderBy, String sortOrder, Integer offset,
Integer numResults, String doAsUser) throws FalconCLIException {
- return sendInstanceRequest(Instances.LOG, type, entity, start, end,
- null, runId, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults, doAsUser)
- .getEntity(InstancesResult.class);
+ ClientResponse clientResponse = new ResourceBuilder().path(Instances.LOG.path, type, entity)
+ .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo)
+ .addQueryParam(RUN_ID, runId).addQueryParam(LIFECYCLE, lifeCycles, type)
+ .addQueryParam(FILTER_BY, filterBy).addQueryParam(ORDER_BY, orderBy)
+ .addQueryParam(SORT_ORDER, sortOrder).addQueryParam(OFFSET, offset)
+ .addQueryParam(NUM_RESULTS, numResults).addQueryParam(USER, doAsUser).call(Instances.LOG);
+ return getResponse(InstancesResult.class, clientResponse);
}
public InstancesResult getParamsOfInstance(String type, String entity,
@@ -596,8 +633,11 @@ public class FalconClient extends AbstractFalconClient {
throw new FalconCLIException("Start date is mandatory and should be"
+ " a valid date in YYYY-MM-DDTHH:MMZ format.");
}
- return sendInstanceRequest(Instances.PARAMS, type, entity,
- start, null, null, null, colo, lifeCycles, doAsUser);
+
+ ClientResponse clientResponse = new ResourceBuilder().path(Instances.PARAMS.path, type, entity)
+ .addQueryParam(START, start).addQueryParam(LIFECYCLE, lifeCycles, type)
+ .addQueryParam(USER, doAsUser).call(Instances.PARAMS);
+ return getResponse(InstancesResult.class, clientResponse);
}
public String getThreadDump(String doAsUser) throws FalconCLIException {
@@ -611,17 +651,8 @@ public class FalconClient extends AbstractFalconClient {
public int getStatus(String doAsUser) throws FalconCLIException {
AdminOperations job = AdminOperations.VERSION;
-
- WebResource resource = service.path(job.path);
-
- if (StringUtils.isNotEmpty(doAsUser)) {
- resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
- }
-
- ClientResponse clientResponse = resource
- .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(job.mimeType).type(MediaType.TEXT_PLAIN)
- .method(job.method, ClientResponse.class);
+ ClientResponse clientResponse = new ResourceBuilder().path(job.path).addQueryParam(DO_AS_OPT, doAsUser)
+ .call(job);
printClientResponse(clientResponse);
return clientResponse.getStatus();
}
@@ -638,18 +669,8 @@ public class FalconClient extends AbstractFalconClient {
public LineageGraphResult getEntityLineageGraph(String pipelineName, String doAsUser) throws FalconCLIException {
MetadataOperations operation = MetadataOperations.LINEAGE;
-
- WebResource resource = service.path(operation.path)
- .queryParam(FalconMetadataCLI.PIPELINE_OPT, pipelineName);
-
- if (StringUtils.isNotEmpty(doAsUser)) {
- resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
- }
-
- ClientResponse clientResponse = resource
- .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(operation.mimeType).type(operation.mimeType)
- .method(operation.method, ClientResponse.class);
+ ClientResponse clientResponse = new ResourceBuilder().path(operation.path).addQueryParam(DO_AS_OPT, doAsUser)
+ .addQueryParam(FalconMetadataCLI.PIPELINE_OPT, pipelineName).call(operation);
printClientResponse(clientResponse);
checkIfSuccessful(clientResponse);
return clientResponse.getEntity(LineageGraphResult.class);
@@ -682,326 +703,118 @@ public class FalconClient extends AbstractFalconClient {
return stream;
}
- private APIResult sendEntityRequest(Entities entities, EntityType entityType, String entityName,
- String colo, Boolean skipDryRun, String doAsUser, String properties,
- boolean showScheduler) throws FalconCLIException {
- WebResource resource = service.path(entities.path)
- .path(entityType.toString().toLowerCase()).path(entityName);
- if (colo != null) {
- resource = resource.queryParam("colo", colo);
- }
- if (null != skipDryRun) {
- resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
- }
- if (StringUtils.isNotEmpty(doAsUser)) {
- resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
- }
-
- if (StringUtils.isNotEmpty(properties)) {
- resource = resource.queryParam("properties", properties);
- }
-
- resource = resource.queryParam("showScheduler", Boolean.toString(showScheduler));
-
- ClientResponse clientResponse = resource
- .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(entities.mimeType).type(MediaType.TEXT_XML)
- .method(entities.method, ClientResponse.class);
-
+ private <T extends APIResult> T getResponse(Class<T> clazz,
+ ClientResponse clientResponse) throws FalconCLIException {
printClientResponse(clientResponse);
+ checkIfSuccessful(clientResponse, clazz);
+ return clientResponse.getEntity(clazz);
+ }
+ private String getResponseAsString(ClientResponse clientResponse) throws FalconCLIException {
+ printClientResponse(clientResponse);
checkIfSuccessful(clientResponse);
-
- // should be removed return parseAPIResult(clientResponse);
- return clientResponse.getEntity(APIResult.class);
+ return clientResponse.getEntity(String.class);
}
- private APIResult sendEntityRequest(Entities entities, EntityType entityType,
- String entityName, String colo, Boolean skipDryRun,
- String doAsUser, String properties) throws FalconCLIException {
- return sendEntityRequest(entities, entityType, entityName, colo, skipDryRun, doAsUser, properties, false);
- }
+ private class ResourceBuilder {
+ WebResource resource;
- private WebResource addParamsToResource(WebResource resource,
- String start, String end, String runId, String colo,
- String fields, String nameSubsequence, String tagKeywords, String filterBy,
- String tags, String orderBy, String sortOrder, Integer offset,
- Integer numResults, Integer numInstances, Boolean isForced,
- String doAsUser, Boolean allAttempts) {
- if (StringUtils.isNotEmpty(fields)) {
- resource = resource.queryParam("fields", fields);
- }
- if (StringUtils.isNotEmpty(nameSubsequence)) {
- resource = resource.queryParam("nameseq", nameSubsequence);
- }
- if (StringUtils.isNotEmpty(tagKeywords)) {
- resource = resource.queryParam("tagkeys", tagKeywords);
- }
- if (StringUtils.isNotEmpty(tags)) {
- resource = resource.queryParam("tags", tags);
- }
- if (StringUtils.isNotEmpty(filterBy)) {
- resource = resource.queryParam("filterBy", filterBy);
- }
- if (StringUtils.isNotEmpty(orderBy)) {
- resource = resource.queryParam("orderBy", orderBy);
- }
- if (StringUtils.isNotEmpty(sortOrder)) {
- resource = resource.queryParam("sortOrder", sortOrder);
- }
- if (StringUtils.isNotEmpty(start)) {
- resource = resource.queryParam("start", start);
- }
- if (StringUtils.isNotEmpty(end)) {
- resource = resource.queryParam("end", end);
- }
- if (runId != null) {
- resource = resource.queryParam("runid", runId);
- }
- if (colo != null) {
- resource = resource.queryParam("colo", colo);
- }
- if (offset != null) {
- resource = resource.queryParam("offset", offset.toString());
- }
- if (numResults != null) {
- resource = resource.queryParam("numResults", numResults.toString());
- }
- if (numInstances != null) {
- resource = resource.queryParam("numInstances", numInstances.toString());
- }
- if (isForced != null) {
- resource = resource.queryParam("force", String.valueOf(isForced));
+ private ResourceBuilder path(String... paths) {
+ for (String path : paths) {
+ if (resource == null) {
+ resource = service.path(path);
+ } else {
+ resource = resource.path(path);
+ }
+ }
+ return this;
}
- if (StringUtils.isNotEmpty(doAsUser)) {
- resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
- }
- if (allAttempts != null) {
- resource = resource.queryParam("allAttempts", String.valueOf(allAttempts));
+ public ResourceBuilder addQueryParam(String paramName, Integer value) {
+ if (value != null) {
+ resource = resource.queryParam(paramName, value.toString());
+ }
+ return this;
}
- return resource;
-
- }
- private EntitySummaryResult sendEntitySummaryRequest(Entities entities, String entityType, String cluster,
- String start, String end,
- String fields, String filterBy, String filterTags,
- String orderBy, String sortOrder, Integer offset, Integer numResults,
- Integer numInstances, String doAsUser) throws FalconCLIException {
- WebResource resource = service.path(entities.path).path(entityType);
- if (StringUtils.isNotEmpty(cluster)) {
- resource = resource.queryParam("cluster", cluster);
+ public ResourceBuilder addQueryParam(String paramName, Boolean paramValue) {
+ if (paramValue != null) {
+ resource = resource.queryParam(paramName, String.valueOf(paramValue));
+ }
+ return this;
}
- resource = addParamsToResource(resource, start, end, null, null,
- fields, null, null, filterBy, filterTags,
- orderBy, sortOrder,
- offset, numResults, numInstances, null, doAsUser, false);
-
- ClientResponse clientResponse = resource
- .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(entities.mimeType).type(MediaType.TEXT_XML)
- .method(entities.method, ClientResponse.class);
-
- printClientResponse(clientResponse);
-
- checkIfSuccessful(clientResponse);
- return clientResponse.getEntity(EntitySummaryResult.class);
- }
- //RESUME CHECKSTYLE CHECK ParameterNumberCheck
-
- private Entity sendDefinitionRequest(Entities entities, String entityType,
- String entityName, String doAsUser) throws FalconCLIException {
+ public ResourceBuilder addQueryParam(String paramName, String paramValue) {
+ if (StringUtils.isNotBlank(paramValue)) {
+ resource = resource.queryParam(paramName, paramValue);
+ }
+ return this;
+ }
- WebResource resource = service.path(entities.path).path(entityType).path(entityName);
- if (StringUtils.isNotEmpty(doAsUser)) {
- resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
+ public ResourceBuilder addQueryParam(String paramName, List<LifeCycle> lifeCycles,
+ String type) throws FalconCLIException {
+ if (lifeCycles != null) {
+ checkLifeCycleOption(lifeCycles, type);
+ for (LifeCycle lifeCycle : lifeCycles) {
+ resource = resource.queryParam(paramName, lifeCycle.toString());
+ }
+ }
+ return this;
}
- ClientResponse clientResponse = resource
- .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+ private ClientResponse call(Entities entities) {
+ return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(entities.mimeType).type(MediaType.TEXT_XML)
.method(entities.method, ClientResponse.class);
-
- printClientResponse(clientResponse);
-
- checkIfSuccessful(clientResponse);
- String entity = clientResponse.getEntity(String.class);
-
- return Entity.fromString(EntityType.getEnum(entityType), entity);
-
- }
-
- private EntityList sendDependencyRequest(Entities entities, String entityType,
- String entityName, String doAsUser) throws FalconCLIException {
-
- WebResource resource = service.path(entities.path).path(entityType).path(entityName);
- if (StringUtils.isNotEmpty(doAsUser)) {
- resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
}
- ClientResponse clientResponse = resource
- .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(entities.mimeType).type(MediaType.TEXT_XML)
- .method(entities.method, ClientResponse.class);
- printClientResponse(clientResponse);
-
- checkIfSuccessful(clientResponse);
-
- return parseEntityList(clientResponse);
- }
-
- private APIResult sendEntityRequestWithObject(Entities entities, String entityType,
- Object requestObject, String colo, Boolean skipDryRun,
- String doAsUser, String properties) throws FalconCLIException {
- WebResource resource = service.path(entities.path)
- .path(entityType);
- if (colo != null) {
- resource = resource.queryParam("colo", colo);
+ public ClientResponse call(AdminOperations operation) {
+ return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+ .accept(operation.mimeType).type(MediaType.TEXT_XML)
+ .method(operation.method, ClientResponse.class);
}
- if (null != skipDryRun) {
- resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
+ private ClientResponse call(MetadataOperations metadataOperations) {
+ return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+ .accept(metadataOperations.mimeType).type(MediaType.TEXT_XML)
+ .method(metadataOperations.method, ClientResponse.class);
}
- if (StringUtils.isNotEmpty(doAsUser)) {
- resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
+ public ClientResponse call(Instances operation) {
+ return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+ .accept(operation.mimeType).type(MediaType.TEXT_XML)
+ .method(operation.method, ClientResponse.class);
}
- if (StringUtils.isNotEmpty(properties)) {
- resource = resource.queryParam("properties", properties);
+ public ClientResponse call(Entities operation, InputStream entityStream) {
+ return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+ .accept(operation.mimeType).type(MediaType.TEXT_XML)
+ .method(operation.method, ClientResponse.class, entityStream);
}
- ClientResponse clientResponse = resource
- .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(entities.mimeType).type(MediaType.TEXT_XML)
- .method(entities.method, ClientResponse.class, requestObject);
-
- printClientResponse(clientResponse);
-
- checkIfSuccessful(clientResponse);
-
- //remove this return parseAPIResult(clientResponse);
- return clientResponse.getEntity(APIResult.class);
+ public ClientResponse call(Instances operation, InputStream entityStream) {
+ return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+ .accept(operation.mimeType).type(MediaType.TEXT_XML)
+ .method(operation.method, ClientResponse.class, entityStream);
+ }
}
public FeedLookupResult reverseLookUp(String type, String path, String doAsUser) throws FalconCLIException {
Entities api = Entities.LOOKUP;
- WebResource resource = service.path(api.path).path(type);
- resource = resource.queryParam("path", path);
- if (StringUtils.isNotEmpty(doAsUser)) {
- resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
- }
- ClientResponse response = resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(api.mimeType)
- .method(api.method, ClientResponse.class);
- printClientResponse(response);
- checkIfSuccessful(response);
- return response.getEntity(FeedLookupResult.class);
- }
-
- //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
- private InstancesResult sendInstanceRequest(Instances instances, String type,
- String entity, String start, String end, InputStream props,
- String runid, String colo,
- List<LifeCycle> lifeCycles, String doAsUser) throws FalconCLIException {
- return sendInstanceRequest(instances, type, entity, start, end, props,
- runid, colo, lifeCycles, "", "", "", 0, null, doAsUser, null)
- .getEntity(InstancesResult.class);
- }
-
- private InstancesResult sendInstanceRequest(Instances instances, String type,
- String entity, String start, String end, InputStream props,
- String runid, String colo, List<LifeCycle> lifeCycles,
- Boolean isForced, String doAsUser) throws FalconCLIException {
- return sendInstanceRequest(instances, type, entity, start, end, props,
- runid, colo, lifeCycles, "", "", "", 0, null, isForced, doAsUser,
- null).getEntity(InstancesResult.class);
- }
-
- private ClientResponse sendInstanceRequest(Instances instances, String type, String entity,
- String start, String end, InputStream props, String runid, String colo,
- List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder,
- Integer offset, Integer numResults, String doAsUser) throws FalconCLIException {
-
- return sendInstanceRequest(instances, type, entity, start, end, props,
- runid, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults, null, doAsUser, null);
- }
-
- private ClientResponse sendInstanceRequest(Instances instances, String type, String entity, String start,
- String end, InputStream props, String runid, String colo,
- List<LifeCycle> lifeCycles, String filterBy, String orderBy,
- String sortOrder, Integer offset, Integer numResults, String doAsUser,
- Boolean allAttempts) throws FalconCLIException {
- return sendInstanceRequest(instances, type, entity, start, end, props, runid, colo, lifeCycles, filterBy,
- orderBy, sortOrder, offset, numResults, null, doAsUser, allAttempts);
- }
-
- private ClientResponse sendInstanceRequest(Instances instances, String type, String entity,
- String start, String end, InputStream props, String runid, String colo,
- List<LifeCycle> lifeCycles, String filterBy, String orderBy,
- String sortOrder, Integer offset, Integer numResults, Boolean isForced,
- String doAsUser, Boolean allAttempts) throws FalconCLIException {
- checkType(type);
- WebResource resource = service.path(instances.path).path(type)
- .path(entity);
-
- resource = addParamsToResource(resource, start, end, runid, colo,
- null, null, null, filterBy, null, orderBy, sortOrder, offset, numResults, null, isForced, doAsUser,
- allAttempts);
-
- if (lifeCycles != null) {
- checkLifeCycleOption(lifeCycles, type);
- for (LifeCycle lifeCycle : lifeCycles) {
- resource = resource.queryParam("lifecycle", lifeCycle.toString());
- }
- }
-
- ClientResponse clientResponse;
- if (props == null) {
- clientResponse = resource
- .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(instances.mimeType)
- .method(instances.method, ClientResponse.class);
- } else {
- clientResponse = resource
- .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(instances.mimeType)
- .method(instances.method, ClientResponse.class, props);
- }
- printClientResponse(clientResponse);
- checkIfSuccessful(clientResponse);
- return clientResponse;
+ ClientResponse response = new ResourceBuilder().path(api.path, type).addQueryParam(DO_AS_OPT, doAsUser)
+ .addQueryParam(PATH, path).call(api);
+ return getResponse(FeedLookupResult.class, response);
}
-
public FeedInstanceResult getFeedInstanceListing(String type, String entity, String start, String end, String colo
, String doAsUser) throws FalconCLIException {
checkType(type);
Instances api = Instances.LISTING;
- WebResource resource = service.path(api.path).path(type).path(entity);
- if (colo != null) {
- resource = resource.queryParam("colo", colo);
- }
- if (StringUtils.isNotEmpty(doAsUser)) {
- resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
- }
- if (StringUtils.isNotEmpty(start)){
- resource = resource.queryParam("start", start);
- }
- if (StringUtils.isNotEmpty(end)) {
- resource = resource.queryParam("end", end);
- }
-
- ClientResponse clientResponse = resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(api.mimeType)
- .method(api.method, ClientResponse.class);
-
- printClientResponse(clientResponse);
- checkIfSuccessful(clientResponse, FeedInstanceResult.class);
- return clientResponse.getEntity(FeedInstanceResult.class);
+ ClientResponse clientResponse = new ResourceBuilder().path(api.path, type, entity)
+ .addQueryParam(COLO, colo).addQueryParam(DO_AS_OPT, doAsUser).addQueryParam(START, start)
+ .addQueryParam(END, end).call(api);
+ return getResponse(FeedInstanceResult.class, clientResponse);
}
@@ -1009,17 +822,9 @@ public class FalconClient extends AbstractFalconClient {
String colo) throws FalconCLIException {
checkType(entityType);
Instances api = Instances.DEPENDENCY;
-
- WebResource resource = service.path(api.path).path(entityType).path(entityName);
- resource = resource.queryParam("instanceTime", instanceTime);
- resource = resource.queryParam("colo", colo);
- ClientResponse clientResponse = resource
- .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(api.mimeType)
- .method(api.method, ClientResponse.class);
- printClientResponse(clientResponse);
- checkIfSuccessful(clientResponse);
- return clientResponse.getEntity(InstanceDependencyResult.class);
+ ClientResponse clientResponse = new ResourceBuilder().path(api.path, entityType, entityName)
+ .addQueryParam(COLO, colo).addQueryParam(INSTANCE_TIME, instanceTime).call(api);
+ return getResponse(InstanceDependencyResult.class, clientResponse);
}
//RESUME CHECKSTYLE CHECK VisibilityModifierCheck
@@ -1047,43 +852,10 @@ public class FalconClient extends AbstractFalconClient {
}
}
- //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
- private EntityList sendListRequest(Entities entities, String entityType, String fields, String nameSubsequence,
- String tagKeywords, String filterBy, String filterTags, String orderBy,
- String sortOrder, Integer offset, Integer numResults, String doAsUser
- ) throws FalconCLIException {
- WebResource resource = service.path(entities.path)
- .path(entityType);
- resource = addParamsToResource(resource, null, null, null, null, fields, nameSubsequence, tagKeywords,
- filterBy, filterTags, orderBy, sortOrder, offset, numResults, null, null, doAsUser, false);
-
- ClientResponse clientResponse = resource
- .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(entities.mimeType).type(MediaType.TEXT_XML)
- .method(entities.method, ClientResponse.class);
-
- printClientResponse(clientResponse);
-
- checkIfSuccessful(clientResponse);
-
- return parseEntityList(clientResponse);
- }
- // RESUME CHECKSTYLE CHECK ParameterNumberCheck
-
private String sendAdminRequest(AdminOperations job, String doAsUser) throws FalconCLIException {
- WebResource resource = service.path(job.path);
-
- if (StringUtils.isNotEmpty(doAsUser)) {
- resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
- }
-
- ClientResponse clientResponse = resource
- .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(job.mimeType)
- .type(job.mimeType)
- .method(job.method, ClientResponse.class);
- printClientResponse(clientResponse);
- return clientResponse.getEntity(String.class);
+ ClientResponse clientResponse = new ResourceBuilder().path(job.path).addQueryParam(DO_AS_OPT, doAsUser)
+ .call(job);
+ return getResponseAsString(clientResponse);
}
private String sendRequestForReplicationMetrics(final MetadataOperations operation, final String schedEntityType,
@@ -1161,22 +933,6 @@ public class FalconClient extends AbstractFalconClient {
return clientResponse.getEntity(String.class);
}
- private APIResult parseAPIResult(ClientResponse clientResponse)
- throws FalconCLIException {
-
- return clientResponse.getEntity(APIResult.class);
- }
-
- private EntityList parseEntityList(ClientResponse clientResponse)
- throws FalconCLIException {
-
- EntityList result = clientResponse.getEntity(EntityList.class);
- if (result == null || result.getElements() == null) {
- return null;
- }
- return result;
-
- }
public String getVertex(String id, String doAsUser) throws FalconCLIException {
return sendMetadataLineageRequest(MetadataOperations.VERTICES, id, doAsUser);
@@ -1208,11 +964,8 @@ public class FalconClient extends AbstractFalconClient {
return recipePath;
}
- public APIResult submitRecipe(String recipeName,
- String recipeToolClassName,
- final String recipeOperation,
- String recipePropertiesFile,
- Boolean skipDryRun,
+ public APIResult submitRecipe(String recipeName, String recipeToolClassName,
+ final String recipeOperation, String recipePropertiesFile, Boolean skipDryRun,
final String doAsUser) throws FalconCLIException {
String recipePath = getRecipePath(recipePropertiesFile);
@@ -1269,54 +1022,25 @@ public class FalconClient extends AbstractFalconClient {
private String sendMetadataLineageRequest(MetadataOperations job, String id,
String doAsUser) throws FalconCLIException {
- WebResource resource = service.path(job.path).path(id);
- if (StringUtils.isNotEmpty(doAsUser)) {
- resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
- }
-
- ClientResponse clientResponse = resource
- .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(job.mimeType)
- .type(job.mimeType)
- .method(job.method, ClientResponse.class);
- printClientResponse(clientResponse);
- return clientResponse.getEntity(String.class);
+ ClientResponse clientResponse = new ResourceBuilder().path(job.path, id).addQueryParam(DO_AS_OPT, doAsUser)
+ .call(job);
+ return getResponseAsString(clientResponse);
}
private String sendMetadataLineageRequest(MetadataOperations job, String key,
String value, String doAsUser) throws FalconCLIException {
- WebResource resource = service.path(job.path);
-
- if (StringUtils.isNotEmpty(doAsUser)) {
- resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
- }
- ClientResponse clientResponse = resource.queryParam("key", key)
- .queryParam("value", value)
- .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(job.mimeType)
- .type(job.mimeType)
- .method(job.method, ClientResponse.class);
- printClientResponse(clientResponse);
- return clientResponse.getEntity(String.class);
+ ClientResponse clientResponse = new ResourceBuilder().path(job.path).addQueryParam(DO_AS_OPT, doAsUser)
+ .addQueryParam(KEY, key).addQueryParam(VALUE, value).call(job);
+ return getResponseAsString(clientResponse);
}
private String sendMetadataLineageRequestForEdges(MetadataOperations job, String id,
String direction, String doAsUser) throws FalconCLIException {
- WebResource resource = service.path(job.path)
- .path(id).path(direction);
-
- if (StringUtils.isNotEmpty(doAsUser)) {
- resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
- }
-
- ClientResponse clientResponse = resource
- .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(job.mimeType)
- .type(job.mimeType)
- .method(job.method, ClientResponse.class);
- printClientResponse(clientResponse);
- return clientResponse.getEntity(String.class);
+ ClientResponse clientResponse = new ResourceBuilder().path(job.path, id, direction)
+ .addQueryParam(DO_AS_OPT, doAsUser).call(job);
+ return getResponseAsString(clientResponse);
}
+
/*
* Donot use this getMessage use the overloaded one
* with clazz as param for better error handling
@@ -1329,15 +1053,14 @@ public class FalconClient extends AbstractFalconClient {
}
}
- private void checkIfSuccessful(ClientResponse clientResponse, Class clazz) throws FalconCLIException {
+ private void checkIfSuccessful(ClientResponse clientResponse,
+ Class<? extends APIResult> clazz) throws FalconCLIException {
Response.Status.Family statusFamily = clientResponse.getClientResponseStatus().getFamily();
- if (statusFamily != Response.Status.Family.SUCCESSFUL
- && statusFamily != Response.Status.Family.INFORMATIONAL) {
+ if (statusFamily != Response.Status.Family.SUCCESSFUL && statusFamily != Response.Status.Family.INFORMATIONAL) {
throw FalconCLIException.fromReponse(clientResponse, clazz);
}
}
-
private void printClientResponse(ClientResponse clientResponse) {
if (getDebugMode()) {
OUT.get().println(clientResponse.toString());