You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/02/20 22:32:30 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-683] Add
azkaban client retry logic.
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 86b6762 [GOBBLIN-683] Add azkaban client retry logic.
86b6762 is described below
commit 86b67626371720a57fc73bdc3172b4ace2803eb2
Author: Kuai Yu <ku...@linkedin.com>
AuthorDate: Wed Feb 20 14:32:25 2019 -0800
[GOBBLIN-683] Add azkaban client retry logic.
Closes #2555 from yukuai518/session
---
.../modules/orchestration/AzkabanClient.java | 358 ++++++++-----------
.../orchestration/AzkabanClientException.java | 2 +-
.../modules/orchestration/AzkabanClientParams.java | 5 +-
.../modules/orchestration/AzkabanClientStatus.java | 28 --
.../orchestration/AzkabanExecuteFlowStatus.java | 4 -
.../AzkabanFetchExecuteFlowStatus.java | 4 -
.../orchestration/AzkabanMultiCallables.java | 378 +++++++++++++++++++++
...banClientException.java => AzkabanSuccess.java} | 18 +-
...Exception.java => InvalidSessionException.java} | 14 +-
...ion.java => UnreachableStatementException.java} | 14 +-
.../modules/orchestration/AzkabanClientTest.java | 63 ++--
11 files changed, 580 insertions(+), 308 deletions(-)
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
index ec6bce4..1f9293a 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
@@ -21,47 +21,50 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.http.Header;
import org.apache.http.HttpEntity;
-import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
-import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.BasicHttpClientConnectionManager;
-import org.apache.http.message.BasicHeader;
-import org.apache.http.message.BasicNameValuePair;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.TrustStrategy;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.github.rholder.retry.AttemptTimeLimiters;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.SimpleTimeLimiter;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import lombok.Builder;
+import org.apache.gobblin.util.ExecutorsUtils;
+
/**
* A simple http based client that uses Ajax API to communicate with Azkaban server.
@@ -72,15 +75,16 @@ import lombok.Builder;
*/
public class AzkabanClient implements Closeable {
protected final String username;
+ protected final String password;
protected final String url;
protected final long sessionExpireInMin; // default value is 12h.
protected SessionManager sessionManager;
- protected String password;
protected String sessionId;
protected long sessionCreationTime = 0;
protected CloseableHttpClient httpClient;
-
- private boolean customHttpClientProvided = true;
+ private ExecutorService executorService;
+ private Closer closer = Closer.create();
+ private Retryer<AzkabanClientStatus> retryer;
private static Logger log = LoggerFactory.getLogger(AzkabanClient.class);
/**
@@ -92,7 +96,8 @@ public class AzkabanClient implements Closeable {
String url,
long sessionExpireInMin,
CloseableHttpClient httpClient,
- SessionManager sessionManager)
+ SessionManager sessionManager,
+ ExecutorService executorService)
throws AzkabanClientException {
this.username = username;
this.password = password;
@@ -100,9 +105,16 @@ public class AzkabanClient implements Closeable {
this.sessionExpireInMin = sessionExpireInMin;
this.httpClient = httpClient;
this.sessionManager = sessionManager;
+ this.executorService = executorService;
this.initializeClient();
this.initializeSessionManager();
-
+ this.intializeExecutorService();
+ this.retryer = RetryerBuilder.<AzkabanClientStatus>newBuilder()
+ .retryIfExceptionOfType(InvalidSessionException.class)
+ .withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(10, TimeUnit.SECONDS, this.executorService))
+ .withWaitStrategy(WaitStrategies.exponentialWait(60, TimeUnit.SECONDS))
+ .withStopStrategy(StopStrategies.stopAfterAttempt(3))
+ .build();
this.sessionId = this.sessionManager.fetchSession();
this.sessionCreationTime = System.nanoTime();
}
@@ -110,7 +122,7 @@ public class AzkabanClient implements Closeable {
private void initializeClient() throws AzkabanClientException {
if (this.httpClient == null) {
this.httpClient = createHttpClient();
- this.customHttpClientProvided = false;
+ this.closer.register(this.httpClient);
}
}
@@ -123,6 +135,12 @@ public class AzkabanClient implements Closeable {
}
}
+ private void intializeExecutorService() {
+ if (this.executorService == null) {
+ this.executorService = Executors.newFixedThreadPool(30);
+ }
+ }
+
/**
* Create a {@link CloseableHttpClient} used to communicate with Azkaban server.
* Derived class can configure different http client by overriding this method.
@@ -159,10 +177,19 @@ public class AzkabanClient implements Closeable {
/**
* When current session expired, use {@link SessionManager} to refresh the session id.
*/
- private void refreshSession() throws AzkabanClientException {
+ void refreshSession(boolean forceRefresh) throws AzkabanClientException {
Preconditions.checkArgument(this.sessionCreationTime != 0);
- if ((System.nanoTime() - this.sessionCreationTime) > Duration.ofMinutes(this.sessionExpireInMin).toNanos()) {
+ boolean expired = (System.nanoTime() - this.sessionCreationTime) > Duration
+ .ofMinutes(this.sessionExpireInMin)
+ .toNanos();
+
+ if (expired) {
log.info("Session expired. Generating a new session.");
+ } else if (forceRefresh) {
+ log.info("Force to refresh session. Generating a new session.");
+ }
+
+ if (expired || forceRefresh) {
this.sessionId = this.sessionManager.fetchSession();
this.sessionCreationTime = System.nanoTime();
}
@@ -226,6 +253,11 @@ public class AzkabanClient implements Closeable {
.replaceAll("\"", ""))) {
String message = (null != jsonObject.get(AzkabanClientParams.MESSAGE)) ? jsonObject.get(AzkabanClientParams.MESSAGE).toString()
.replaceAll("\"", "") : "Unknown issue";
+
+ if (message.contains("Invalid Session")) {
+ throw new InvalidSessionException(message);
+ }
+
throw new IOException(message);
}
@@ -243,34 +275,16 @@ public class AzkabanClient implements Closeable {
*
* @return A status object indicating if AJAX request is successful.
*/
- public AzkabanClientStatus createProject(
- String projectName,
- String description) {
- try {
- refreshSession();
- HttpPost httpPost = new HttpPost(this.url + "/manager");
- List<NameValuePair> nvps = new ArrayList<>();
- nvps.add(new BasicNameValuePair(AzkabanClientParams.ACTION, "create"));
- nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, this.sessionId));
- nvps.add(new BasicNameValuePair(AzkabanClientParams.NAME, projectName));
- nvps.add(new BasicNameValuePair(AzkabanClientParams.DESCRIPTION, description));
- httpPost.setEntity(new UrlEncodedFormEntity(nvps));
-
- Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
- Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
- httpPost.setHeaders(new Header[]{contentType, requestType});
-
- CloseableHttpResponse response = this.httpClient.execute(httpPost);
-
- try {
- handleResponse(response);
- return new AzkabanClientStatus.SUCCESS();
- } finally {
- response.close();
- }
- } catch (Exception e) {
- return new AzkabanClientStatus.FAIL("Azkaban client cannot create project.", e);
- }
+ public AzkabanClientStatus createProject(String projectName,
+ String description) throws AzkabanClientException {
+ AzkabanMultiCallables.CreateProjectCallable callable =
+ AzkabanMultiCallables.CreateProjectCallable.builder()
+ .client(this)
+ .projectName(projectName)
+ .description(description)
+ .build();
+
+ return runWithRetry(callable, AzkabanClientStatus.class);
}
/**
@@ -281,28 +295,15 @@ public class AzkabanClient implements Closeable {
*
* @return A status object indicating if AJAX request is successful.
*/
- public AzkabanClientStatus deleteProject(String projectName) {
- try {
- refreshSession();
- List<NameValuePair> nvps = new ArrayList<>();
- nvps.add(new BasicNameValuePair(AzkabanClientParams.DELETE, "true"));
- nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, this.sessionId));
- nvps.add(new BasicNameValuePair(AzkabanClientParams.PROJECT, projectName));
-
- Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
- Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
+ public AzkabanClientStatus deleteProject(String projectName) throws AzkabanClientException {
- HttpGet httpGet = new HttpGet(url + "/manager?" + URLEncodedUtils.format(nvps, "UTF-8"));
- httpGet.setHeaders(new Header[]{contentType, requestType});
+ AzkabanMultiCallables.DeleteProjectCallable callable =
+ AzkabanMultiCallables.DeleteProjectCallable.builder()
+ .client(this)
+ .projectName(projectName)
+ .build();
- CloseableHttpResponse response = this.httpClient.execute(httpGet);
- response.close();
- return new AzkabanClientStatus.SUCCESS();
-
- } catch (Exception e) {
- return new AzkabanClientStatus.FAIL("Azkaban client cannot delete project = "
- + projectName, e);
- }
+ return runWithRetry(callable, AzkabanClientStatus.class);
}
/**
@@ -314,33 +315,17 @@ public class AzkabanClient implements Closeable {
*
* @return A status object indicating if AJAX request is successful.
*/
- public AzkabanClientStatus uploadProjectZip(
- String projectName,
- File zipFile) {
- try {
- refreshSession();
- HttpPost httpPost = new HttpPost(this.url + "/manager");
- HttpEntity entity = MultipartEntityBuilder.create()
- .addTextBody(AzkabanClientParams.SESSION_ID, sessionId)
- .addTextBody(AzkabanClientParams.AJAX, "upload")
- .addTextBody(AzkabanClientParams.PROJECT, projectName)
- .addBinaryBody("file", zipFile,
- ContentType.create("application/zip"), zipFile.getName())
- .build();
- httpPost.setEntity(entity);
+ public AzkabanClientStatus uploadProjectZip(String projectName,
+ File zipFile) throws AzkabanClientException {
- CloseableHttpResponse response = this.httpClient.execute(httpPost);
+ AzkabanMultiCallables.UploadProjectCallable callable =
+ AzkabanMultiCallables.UploadProjectCallable.builder()
+ .client(this)
+ .projectName(projectName)
+ .zipFile(zipFile)
+ .build();
- try {
- handleResponse(response);
- return new AzkabanClientStatus.SUCCESS();
- } finally {
- response.close();
- }
- } catch (Exception e) {
- return new AzkabanClientStatus.FAIL("Azkaban client cannot upload zip to project = "
- + projectName, e);
- }
+ return runWithRetry(callable, AzkabanClientStatus.class);
}
/**
@@ -353,44 +338,20 @@ public class AzkabanClient implements Closeable {
*
* @return The status object which contains success status and execution id.
*/
- public AzkabanExecuteFlowStatus executeFlowWithOptions(
- String projectName,
- String flowName,
- Map<String, String> flowOptions,
- Map<String, String> flowParameters) {
-
- try {
- refreshSession();
- HttpPost httpPost = new HttpPost(this.url + "/executor");
- List<NameValuePair> nvps = new ArrayList<>();
- nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "executeFlow"));
- nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, this.sessionId));
- nvps.add(new BasicNameValuePair(AzkabanClientParams.PROJECT, projectName));
- nvps.add(new BasicNameValuePair(AzkabanClientParams.FLOW, flowName));
- nvps.add(new BasicNameValuePair(AzkabanClientParams.CONCURRENT_OPTION, "ignore"));
-
- addFlowOptions(nvps, flowOptions);
- addFlowParameters(nvps, flowParameters);
-
- httpPost.setEntity(new UrlEncodedFormEntity(nvps));
-
- Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
- Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
- httpPost.setHeaders(new Header[]{contentType, requestType});
-
- CloseableHttpResponse response = this.httpClient.execute(httpPost);
-
- try {
- Map<String, String> map = handleResponse(response);
- return new AzkabanExecuteFlowStatus(
- new AzkabanExecuteFlowStatus.ExecuteId(map.get(AzkabanClientParams.EXECID)));
- } finally {
- response.close();
- }
- } catch (Exception e) {
- return new AzkabanExecuteFlowStatus("Azkaban client cannot execute flow = "
- + flowName, e);
- }
+ public AzkabanExecuteFlowStatus executeFlowWithOptions(String projectName,
+ String flowName,
+ Map<String, String> flowOptions,
+ Map<String, String> flowParameters) throws AzkabanClientException {
+ AzkabanMultiCallables.ExecuteFlowCallable callable =
+ AzkabanMultiCallables.ExecuteFlowCallable.builder()
+ .client(this)
+ .projectName(projectName)
+ .flowName(flowName)
+ .flowOptions(flowOptions)
+ .flowParameters(flowParameters)
+ .build();
+
+ return runWithRetry(callable, AzkabanExecuteFlowStatus.class);
}
/**
@@ -402,104 +363,83 @@ public class AzkabanClient implements Closeable {
*
* @return The status object which contains success status and execution id.
*/
- public AzkabanExecuteFlowStatus executeFlow(
- String projectName,
- String flowName,
- Map<String, String> flowParameters) {
+ public AzkabanExecuteFlowStatus executeFlow(String projectName,
+ String flowName,
+ Map<String, String> flowParameters) throws AzkabanClientException {
return executeFlowWithOptions(projectName, flowName, null, flowParameters);
}
/**
* Cancel a flow by execution id.
*/
- public AzkabanClientStatus cancelFlow(int execId) {
- try {
- refreshSession();
- List<NameValuePair> nvps = new ArrayList<>();
- nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "cancelFlow"));
- nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, this.sessionId));
- nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, String.valueOf(execId)));
-
- Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
- Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
-
- HttpGet httpGet = new HttpGet(url + "/executor?" + URLEncodedUtils.format(nvps, "UTF-8"));
- httpGet.setHeaders(new Header[]{contentType, requestType});
-
- CloseableHttpResponse response = this.httpClient.execute(httpGet);
- try {
- handleResponse(response);
- return new AzkabanClientStatus.SUCCESS();
- } finally {
- response.close();
- }
- } catch (Exception e) {
- return new AzkabanClientStatus.FAIL("", e);
- }
+ public AzkabanClientStatus cancelFlow(String execId) throws AzkabanClientException {
+ AzkabanMultiCallables.CancelFlowCallable callable =
+ AzkabanMultiCallables.CancelFlowCallable.builder()
+ .client(this)
+ .execId(execId)
+ .build();
+
+ return runWithRetry(callable, AzkabanClientStatus.class);
}
+ /**
+ * Fetch an execution log.
+ */
+ public AzkabanClientStatus fetchExecutionLog(String execId,
+ String jobId,
+ String offset,
+ String length,
+ File ouf) throws AzkabanClientException {
+ AzkabanMultiCallables.FetchExecLogCallable callable =
+ AzkabanMultiCallables.FetchExecLogCallable.builder()
+ .client(this)
+ .execId(execId)
+ .jobId(jobId)
+ .offset(offset)
+ .length(length)
+ .output(ouf)
+ .build();
+
+ return runWithRetry(callable, AzkabanClientStatus.class);
+ }
/**
- * Given an execution id, fetches all the detailed information of that execution, including a list of all the job executions.
+ * Given an execution id, fetches all the detailed information of that execution,
+ * including a list of all the job executions.
*
* @param execId execution id to be fetched.
*
- * @return The status object which contains success status and all the detailed information of that execution.
+ * @return The status object which contains success status and all the detailed
+ * information of that execution.
*/
- public AzkabanFetchExecuteFlowStatus fetchFlowExecution (String execId) {
- try {
- refreshSession();
- List<NameValuePair> nvps = new ArrayList<>();
- nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "fetchexecflow"));
- nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, this.sessionId));
- nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, execId));
-
- Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
- Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
-
- HttpGet httpGet = new HttpGet(url + "/executor?" + URLEncodedUtils.format(nvps, "UTF-8"));
- httpGet.setHeaders(new Header[]{contentType, requestType});
-
- CloseableHttpResponse response = this.httpClient.execute(httpGet);
- try {
- Map<String, String> map = handleResponse(response);
- return new AzkabanFetchExecuteFlowStatus(new AzkabanFetchExecuteFlowStatus.Execution(map));
- } finally {
- response.close();
- }
- } catch (Exception e) {
- return new AzkabanFetchExecuteFlowStatus("Azkaban client cannot "
- + "fetch execId " + execId, e);
- }
+ public AzkabanFetchExecuteFlowStatus fetchFlowExecution(String execId) throws AzkabanClientException {
+ AzkabanMultiCallables.FetchFlowExecCallable callable =
+ AzkabanMultiCallables.FetchFlowExecCallable.builder()
+ .client(this)
+ .execId(execId)
+ .build();
+
+ return runWithRetry(callable, AzkabanFetchExecuteFlowStatus.class);
}
- private void addFlowParameters(List<NameValuePair> nvps, Map<String, String> flowParams) {
- if (flowParams != null) {
- for (Map.Entry<String, String> entry : flowParams.entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
- if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) {
- log.debug("New flow parameter added:" + key + "-->" + value);
- nvps.add(new BasicNameValuePair("flowOverride[" + key + "]", value));
- }
- }
- }
- }
-
- private void addFlowOptions(List<NameValuePair> nvps, Map<String, String> flowOptions) {
- if (flowOptions != null) {
- for (Map.Entry<String, String> option : flowOptions.entrySet()) {
- log.debug("New flow option added:" + option .getKey()+ "-->" + option.getValue());
- nvps.add(new BasicNameValuePair(option.getKey(), option.getValue()));
+ private <T> T runWithRetry(Callable callable, Class<T> cls) throws AzkabanClientException {
+ try {
+ AzkabanClientStatus status = this.retryer.call(callable);
+ if (status.getClass().equals(cls)) {
+ return ((T)status);
}
+ } catch (ExecutionException e) {
+ Throwables.propagateIfPossible(e.getCause(), AzkabanClientException.class);
+ } catch (RetryException e) {
+ throw new AzkabanClientException("RetryException occurred ", e);
}
+ // should never reach to here.
+ throw new UnreachableStatementException("Cannot reach here.");
}
@Override
public void close()
throws IOException {
- if (!customHttpClientProvided) {
- this.httpClient.close();
- }
+ this.closer.close();
}
}
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
index da94c03..5e4a2b0 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
@@ -25,7 +25,7 @@ import java.io.IOException;
public class AzkabanClientException extends IOException {
private static final long serialVersionUID = 11324144L;
- public AzkabanClientException(String message, Exception e) {
+ public AzkabanClientException(String message, Throwable e) {
super(message, e);
}
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientParams.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientParams.java
index 27bb4f2..6625d12 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientParams.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientParams.java
@@ -39,5 +39,8 @@ public class AzkabanClientParams {
public static final String STATUS = "status";
public static final String ERROR = "error";
public static final String EXECID = "execid";
-
+ public static final String JOBID = "jobId";
+ public static final String DATA = "data";
+ public static final String OFFSET = "offset";
+ public static final String LENGTH = "length";
}
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientStatus.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientStatus.java
index 132f4ad..7ff9c1a 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientStatus.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientStatus.java
@@ -21,10 +21,6 @@ import lombok.Getter;
@Getter
public abstract class AzkabanClientStatus<RS> {
- private boolean success = true;
- private String failMsg = "";
- private Throwable throwable = null;
-
private RS response = null;
public AzkabanClientStatus() {
@@ -33,28 +29,4 @@ public abstract class AzkabanClientStatus<RS> {
public AzkabanClientStatus(RS response) {
this.response = response;
}
-
- public AzkabanClientStatus(String failMsg, Throwable throwable) {
- this.success = false;
- this.failMsg = failMsg;
- this.throwable = throwable;
- }
-
- /**
- * This status captures basic success.
- */
- public static class SUCCESS extends AzkabanClientStatus<Object> {
- public SUCCESS() {
- super();
- }
- }
-
- /**
- * This status captures basic failure (fail message and throwable).
- */
- public static class FAIL extends AzkabanClientStatus<Object> {
- public FAIL(String failMsg, Throwable throwable) {
- super(failMsg, throwable);
- }
- }
}
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanExecuteFlowStatus.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanExecuteFlowStatus.java
index be9af89..2908ec4 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanExecuteFlowStatus.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanExecuteFlowStatus.java
@@ -29,10 +29,6 @@ public class AzkabanExecuteFlowStatus extends AzkabanClientStatus<AzkabanExecute
super(executeId);
}
- public AzkabanExecuteFlowStatus(String failMsg, Throwable throwable) {
- super(failMsg, throwable);
- }
-
@Getter
@AllArgsConstructor
public static class ExecuteId {
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanFetchExecuteFlowStatus.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanFetchExecuteFlowStatus.java
index 1e0afe8..5a52cad 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanFetchExecuteFlowStatus.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanFetchExecuteFlowStatus.java
@@ -33,10 +33,6 @@ public class AzkabanFetchExecuteFlowStatus extends AzkabanClientStatus<AzkabanFe
super(exec);
}
- public AzkabanFetchExecuteFlowStatus(String failMsg, Throwable throwable) {
- super(failMsg, throwable);
- }
-
@Getter
@AllArgsConstructor
public static class Execution {
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanMultiCallables.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanMultiCallables.java
new file mode 100644
index 0000000..9091220
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanMultiCallables.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URLEncodedUtils;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.mime.MultipartEntityBuilder;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.message.BasicNameValuePair;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Closer;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * This class encapsulates all the operations an {@link AzkabanClient} can do.
+ */
+class AzkabanMultiCallables {
+
+ /**
+ * This class can never been instantiated.
+ */
+ private AzkabanMultiCallables() {
+ }
+
+ /**
+ * A callable that will create a project on Azkaban.
+ */
+ @Builder
+ static class CreateProjectCallable implements Callable<AzkabanClientStatus> {
+ private AzkabanClient client;
+ private String projectName;
+ private String description;
+ private boolean invalidSession = false;
+
+ @Override
+ public AzkabanClientStatus call()
+ throws AzkabanClientException {
+
+ try (Closer closer = Closer.create()) {
+ client.refreshSession(this.invalidSession);
+ HttpPost httpPost = new HttpPost(client.url + "/manager");
+ List<NameValuePair> nvps = new ArrayList<>();
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.ACTION, "create"));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, client.sessionId));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.NAME, projectName));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.DESCRIPTION, description));
+ httpPost.setEntity(new UrlEncodedFormEntity(nvps));
+
+ Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
+ Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
+ httpPost.setHeaders(new Header[]{contentType, requestType});
+
+ CloseableHttpResponse response = client.httpClient.execute(httpPost);
+ closer.register(response);
+ client.handleResponse(response);
+ return new AzkabanSuccess();
+ } catch (InvalidSessionException e) {
+ this.invalidSession = true;
+ throw e;
+ } catch (Throwable e) {
+ throw new AzkabanClientException("Azkaban client cannot create project = "
+ + projectName, e);
+ }
+ }
+ }
+
+ /**
+ * A callable that will delete a project on Azkaban.
+ */
+ @Builder
+ static class DeleteProjectCallable implements Callable<AzkabanClientStatus> {
+ private AzkabanClient client;
+ private String projectName;
+ private boolean invalidSession = false;
+
+ @Override
+ public AzkabanClientStatus call()
+ throws AzkabanClientException {
+
+ try (Closer closer = Closer.create()) {
+ client.refreshSession(this.invalidSession);
+ List<NameValuePair> nvps = new ArrayList<>();
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.DELETE, "true"));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, client.sessionId));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.PROJECT, projectName));
+
+ Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
+ Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
+
+ HttpGet httpGet = new HttpGet(client.url + "/manager?" + URLEncodedUtils.format(nvps, "UTF-8"));
+ httpGet.setHeaders(new Header[]{contentType, requestType});
+
+ CloseableHttpResponse response = client.httpClient.execute(httpGet);
+ closer.register(response);
+ client.handleResponse(response);
+ return new AzkabanSuccess();
+ } catch (InvalidSessionException e) {
+ this.invalidSession = true;
+ throw e;
+ } catch (Throwable e) {
+ throw new AzkabanClientException("Azkaban client cannot delete project = "
+ + projectName, e);
+ }
+ }
+ }
+
+ /**
+ * A callable that will execute a flow on Azkaban.
+ */
+ @Builder
+ static class UploadProjectCallable implements Callable<AzkabanClientStatus> {
+ private AzkabanClient client;
+ private String projectName;
+ private File zipFile;
+ private boolean invalidSession = false;
+
+ @Override
+ public AzkabanClientStatus call()
+ throws AzkabanClientException {
+
+ try (Closer closer = Closer.create()) {
+ client.refreshSession(this.invalidSession);
+ HttpPost httpPost = new HttpPost(client.url + "/manager");
+ HttpEntity entity = MultipartEntityBuilder.create()
+ .addTextBody(AzkabanClientParams.SESSION_ID, client.sessionId)
+ .addTextBody(AzkabanClientParams.AJAX, "upload")
+ .addTextBody(AzkabanClientParams.PROJECT, projectName)
+ .addBinaryBody("file", zipFile,
+ ContentType.create("application/zip"), zipFile.getName())
+ .build();
+ httpPost.setEntity(entity);
+
+ CloseableHttpResponse response = client.httpClient.execute(httpPost);
+ closer.register(response);
+ client.handleResponse(response);
+ return new AzkabanSuccess();
+ } catch (InvalidSessionException e) {
+ this.invalidSession = true;
+ throw e;
+ } catch (Throwable e) {
+ throw new AzkabanClientException("Azkaban client cannot upload zip to project = "
+ + projectName, e);
+ }
+ }
+ }
+
+ /**
+ * A callable that will execute a flow on Azkaban.
+ */
+ @Builder
+ static class ExecuteFlowCallable implements Callable<AzkabanClientStatus> {
+ private AzkabanClient client;
+ private String projectName;
+ private String flowName;
+ private Map<String, String> flowOptions;
+ private Map<String, String> flowParameters;
+ private boolean invalidSession = false;
+
+ @Override
+ public AzkabanExecuteFlowStatus call()
+ throws AzkabanClientException {
+ try (Closer closer = Closer.create()) {
+ client.refreshSession(this.invalidSession);
+ HttpPost httpPost = new HttpPost(client.url + "/executor");
+ List<NameValuePair> nvps = new ArrayList<>();
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "executeFlow"));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, client.sessionId));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.PROJECT, projectName));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.FLOW, flowName));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.CONCURRENT_OPTION, "ignore"));
+
+ addFlowOptions(nvps, flowOptions);
+ addFlowParameters(nvps, flowParameters);
+
+ httpPost.setEntity(new UrlEncodedFormEntity(nvps));
+
+ Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
+ Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
+ httpPost.setHeaders(new Header[]{contentType, requestType});
+
+ CloseableHttpResponse response = client.httpClient.execute(httpPost);
+ closer.register(response);
+ Map<String, String> map = client.handleResponse(response);
+ return new AzkabanExecuteFlowStatus(
+ new AzkabanExecuteFlowStatus.ExecuteId(map.get(AzkabanClientParams.EXECID)));
+ } catch (InvalidSessionException e) {
+ this.invalidSession = true;
+ throw e;
+ } catch (Exception e) {
+ throw new AzkabanClientException("Azkaban client cannot execute flow = "
+ + flowName, e);
+ }
+ }
+
+ private void addFlowParameters(List<NameValuePair> nvps, Map<String, String> flowParams) {
+ if (flowParams != null) {
+ for (Map.Entry<String, String> entry : flowParams.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) {
+ nvps.add(new BasicNameValuePair("flowOverride[" + key + "]", value));
+ }
+ }
+ }
+ }
+
+ private void addFlowOptions(List<NameValuePair> nvps, Map<String, String> flowOptions) {
+ if (flowOptions != null) {
+ for (Map.Entry<String, String> option : flowOptions.entrySet()) {
+ nvps.add(new BasicNameValuePair(option.getKey(), option.getValue()));
+ }
+ }
+ }
+ }
+
+ /**
+ * A callable that will cancel a flow on Azkaban.
+ */
+ @Builder
+ static class CancelFlowCallable implements Callable<AzkabanClientStatus> {
+ private AzkabanClient client;
+ private String execId;
+ private boolean invalidSession = false;
+
+ @Override
+ public AzkabanClientStatus call()
+ throws AzkabanClientException {
+ try (Closer closer = Closer.create()) {
+ client.refreshSession(this.invalidSession);
+ List<NameValuePair> nvps = new ArrayList<>();
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "cancelFlow"));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, client.sessionId));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, String.valueOf(execId)));
+
+ Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
+ Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
+
+ HttpGet httpGet = new HttpGet(client.url + "/executor?" + URLEncodedUtils.format(nvps, "UTF-8"));
+ httpGet.setHeaders(new Header[]{contentType, requestType});
+
+ CloseableHttpResponse response = client.httpClient.execute(httpGet);
+ closer.register(response);
+ client.handleResponse(response);
+ return new AzkabanSuccess();
+ } catch (InvalidSessionException e) {
+ this.invalidSession = true;
+ throw e;
+ } catch (Exception e) {
+ throw new AzkabanClientException("Azkaban client cannot cancel flow execId = "
+ + execId, e);
+ }
+ }
+ }
+
+ /**
+ * A callable that will fetch a flow status on Azkaban.
+ */
+ @Builder
+ static class FetchFlowExecCallable implements Callable<AzkabanClientStatus> {
+ private AzkabanClient client;
+ private String execId;
+ private boolean invalidSession = false;
+
+ @Override
+ public AzkabanFetchExecuteFlowStatus call()
+ throws AzkabanClientException {
+ try (Closer closer = Closer.create()) {
+ client.refreshSession(this.invalidSession);
+ List<NameValuePair> nvps = new ArrayList<>();
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "fetchexecflow"));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, client.sessionId));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, execId));
+
+ Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
+ Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
+
+ HttpGet httpGet = new HttpGet(client.url + "/executor?" + URLEncodedUtils.format(nvps, "UTF-8"));
+ httpGet.setHeaders(new Header[]{contentType, requestType});
+
+ CloseableHttpResponse response = client.httpClient.execute(httpGet);
+ closer.register(response);
+ Map<String, String> map = client.handleResponse(response);
+ return new AzkabanFetchExecuteFlowStatus(new AzkabanFetchExecuteFlowStatus.Execution(map));
+ } catch (InvalidSessionException e) {
+ this.invalidSession = true;
+ throw e;
+ } catch (Exception e) {
+ throw new AzkabanClientException("Azkaban client cannot "
+ + "fetch execId " + execId, e);
+ }
+ }
+ }
+
+ /**
+ * A callable that will fetch a flow log on Azkaban.
+ */
+ @Builder
+ static class FetchExecLogCallable implements Callable<AzkabanClientStatus> {
+ private AzkabanClient client;
+ private String execId;
+ private String jobId;
+ private String offset;
+ private String length;
+ private File output;
+ private boolean invalidSession = false;
+
+ @Override
+ public AzkabanClientStatus call()
+ throws AzkabanClientException {
+ try (Closer closer = Closer.create()) {
+ client.refreshSession(this.invalidSession);
+ List<NameValuePair> nvps = new ArrayList<>();
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "fetchExecJobLogs"));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, client.sessionId));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, execId));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.JOBID, jobId));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.OFFSET, offset));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.LENGTH, length));
+
+ Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
+ Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
+
+ HttpGet httpGet = new HttpGet(client.url + "/executor?" + URLEncodedUtils.format(nvps, "UTF-8"));
+ httpGet.setHeaders(new Header[]{contentType, requestType});
+
+ CloseableHttpResponse response = client.httpClient.execute(httpGet);
+ closer.register(response);
+ Map<String, String> map = client.handleResponse(response);
+ FileUtils.writeStringToFile(output, map.get(AzkabanClientParams.DATA), Charsets.UTF_8);
+ return new AzkabanSuccess();
+ } catch (InvalidSessionException e) {
+ this.invalidSession = true;
+ throw e;
+ } catch (Exception e) {
+ throw new AzkabanClientException("Azkaban client cannot "
+ + "fetch execId " + execId, e);
+ }
+ }
+ }
+}
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSuccess.java
similarity index 71%
copy from gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
copy to gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSuccess.java
index da94c03..07b8391 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSuccess.java
@@ -17,19 +17,15 @@
package org.apache.gobblin.service.modules.orchestration;
-import java.io.IOException;
-
/**
- * Exception raised by {@link AzkabanClient}.
+ * A successful status for {@link AzkabanClient}.
*/
-public class AzkabanClientException extends IOException {
- private static final long serialVersionUID = 11324144L;
-
- public AzkabanClientException(String message, Exception e) {
- super(message, e);
+public class AzkabanSuccess extends AzkabanClientStatus<String> {
+ public AzkabanSuccess() {
+ this("");
}
- public AzkabanClientException(String message) {
- super(message);
+ public AzkabanSuccess(String response) {
+ super(response);
}
-}
\ No newline at end of file
+}
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/InvalidSessionException.java
similarity index 75%
copy from gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
copy to gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/InvalidSessionException.java
index da94c03..77d278f 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/InvalidSessionException.java
@@ -17,19 +17,15 @@
package org.apache.gobblin.service.modules.orchestration;
-import java.io.IOException;
-
/**
- * Exception raised by {@link AzkabanClient}.
+ * Used by {@link AzkabanClient} to indicate current session is invalid.
*/
-public class AzkabanClientException extends IOException {
- private static final long serialVersionUID = 11324144L;
-
- public AzkabanClientException(String message, Exception e) {
+public class InvalidSessionException extends AzkabanClientException {
+ public InvalidSessionException(String message, Exception e) {
super(message, e);
}
- public AzkabanClientException(String message) {
+ public InvalidSessionException(String message) {
super(message);
}
-}
\ No newline at end of file
+}
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/UnreachableStatementException.java
similarity index 75%
copy from gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
copy to gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/UnreachableStatementException.java
index da94c03..7e62fd7 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/UnreachableStatementException.java
@@ -17,19 +17,15 @@
package org.apache.gobblin.service.modules.orchestration;
-import java.io.IOException;
-
/**
- * Exception raised by {@link AzkabanClient}.
+ * Used by {@link AzkabanClient} to indicate an unreachable code block.
*/
-public class AzkabanClientException extends IOException {
- private static final long serialVersionUID = 11324144L;
-
- public AzkabanClientException(String message, Exception e) {
+public class UnreachableStatementException extends AzkabanClientException {
+ public UnreachableStatementException(String message, Exception e) {
super(message, e);
}
- public AzkabanClientException(String message) {
+ public UnreachableStatementException(String message) {
super(message);
}
-}
\ No newline at end of file
+}
diff --git a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
index edefe3e..e648830 100644
--- a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
+++ b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
@@ -70,41 +70,44 @@ public class AzkabanClientTest {
this.client.close();
}
- private void ensureProjectExist(String projectName, String description) {
- AzkabanClientStatus status;
+ private void ensureProjectExist(String projectName, String description) throws AzkabanClientException {
// make sure it is in a clean state
- status = this.client.deleteProject(projectName);
- Assert.assertTrue(status.isSuccess());
+ this.client.deleteProject(projectName);
// make sure the project is created successfully
- status = this.client.createProject(projectName, description);
- Assert.assertTrue(status.isSuccess());
+ this.client.createProject(projectName, description);
}
@Test(enabled = false)
- public void testCreateProject() {
+ public void testFetchLog() throws AzkabanClientException {
+ String execId = "11211956";
+ String jobId = "tracking-hourly-bucket1";
+
+ // fetch log
+ this.client.fetchExecutionLog(execId, jobId, "0", "100000000", new File("/tmp/sample.log"));
+ }
+
+
+ @Test(enabled = false)
+ public void testCreateProject() throws AzkabanClientException {
String projectName = "project-create";
String description = "This is a create project test.";
- AzkabanClientStatus status;
ensureProjectExist(projectName, description);
// the second time creation should fail
- status = this.client.createProject(projectName, description);
- Assert.assertFalse(status.isSuccess());
+ this.client.createProject(projectName, description);
}
@Test(enabled = false)
- public void testDeleteProject() {
+ public void testDeleteProject() throws AzkabanClientException {
String projectName = "project-delete";
String description = "This is a delete project test.";
- AzkabanClientStatus status;
ensureProjectExist(projectName, description);
// delete the new project
- status = this.client.deleteProject(projectName);
- Assert.assertTrue(status.isSuccess());
+ this.client.deleteProject(projectName);
}
@Test(enabled = false)
@@ -112,18 +115,20 @@ public class AzkabanClientTest {
String projectName = "project-upload";
String description = "This is a upload project test.";
String flowName = "test-upload";
- AzkabanClientStatus status;
ensureProjectExist(projectName, description);
// upload Zip to project
File zipFile = createAzkabanZip(flowName);
- status = this.client.uploadProjectZip(projectName, zipFile);
- Assert.assertTrue(status.isSuccess());
+ this.client.uploadProjectZip(projectName, zipFile);
// upload Zip to an non-existed project
- status = this.client.uploadProjectZip("Non-existed-project", zipFile);
- Assert.assertFalse(status.isSuccess());
+ try {
+ this.client.uploadProjectZip("Non-existed-project", zipFile);
+ Assert.fail();
+ } catch (Exception e) {
+ log.info("Expected exception " + e.toString());
+ }
}
@Test(enabled = false)
@@ -136,12 +141,10 @@ public class AzkabanClientTest {
// upload Zip to project
File zipFile = createAzkabanZip(flowName);
- AzkabanClientStatus status = this.client.uploadProjectZip(projectName, zipFile);
- Assert.assertTrue(status.isSuccess());
+ this.client.uploadProjectZip(projectName, zipFile);
// execute a flow
AzkabanExecuteFlowStatus execStatus = this.client.executeFlow(projectName, flowName, Maps.newHashMap());
- Assert.assertTrue(execStatus.isSuccess());
log.info("Execid: {}", execStatus.getResponse().execId);
}
@@ -155,8 +158,7 @@ public class AzkabanClientTest {
// upload Zip to project
File zipFile = createAzkabanZip(flowName);
- AzkabanClientStatus status = this.client.uploadProjectZip(projectName, zipFile);
- Assert.assertTrue(status.isSuccess());
+ this.client.uploadProjectZip(projectName, zipFile);
Map<String, String> flowParams = Maps.newHashMap();
flowParams.put("gobblin.source", "DummySource");
@@ -164,7 +166,6 @@ public class AzkabanClientTest {
// execute a flow
AzkabanExecuteFlowStatus execStatus = this.client.executeFlow(projectName, flowName, flowParams);
- Assert.assertTrue(execStatus.isSuccess());
log.info("Execid: {}", execStatus.getResponse().execId);
}
@@ -178,14 +179,12 @@ public class AzkabanClientTest {
// upload Zip to project
File zipFile = createAzkabanZip(flowName);
- AzkabanClientStatus status = this.client.uploadProjectZip(projectName, zipFile);
- Assert.assertTrue(status.isSuccess());
+ this.client.uploadProjectZip(projectName, zipFile);
Map<String, String> flowOptions = Maps.newHashMap();
// execute a flow
AzkabanExecuteFlowStatus execStatus = this.client.executeFlowWithOptions(projectName, flowName, flowOptions, Maps.newHashMap());
- Assert.assertTrue(execStatus.isSuccess());
log.info("Execid: {}", execStatus.getResponse().execId);
}
@@ -199,14 +198,12 @@ public class AzkabanClientTest {
// upload Zip to project
File zipFile = createAzkabanZip(flowName);
- AzkabanClientStatus status = this.client.uploadProjectZip(projectName, zipFile);
- Assert.assertTrue(status.isSuccess());
+ this.client.uploadProjectZip(projectName, zipFile);
Map<String, String> flowOptions = Maps.newHashMap();
// execute a flow
AzkabanExecuteFlowStatus execStatus = this.client.executeFlowWithOptions(projectName, flowName, flowOptions, Maps.newHashMap());
- Assert.assertTrue(execStatus.isSuccess());
log.info("Execid: {}", execStatus.getResponse().execId);
// wait for the job started and failed
@@ -214,7 +211,9 @@ public class AzkabanClientTest {
// job should fail
AzkabanFetchExecuteFlowStatus fetchExecuteFlowStatus = this.client.fetchFlowExecution(execStatus.getResponse().execId);
- Assert.assertTrue(fetchExecuteFlowStatus.isSuccess());
+ for (Map.Entry<String, String> entry : fetchExecuteFlowStatus.getResponse().getMap().entrySet()) {
+ log.info(entry.getKey() + " -> " + entry.getValue());
+ }
}
@Test(enabled = false)