You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/02/20 22:32:30 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-683] Add azkaban client retry logic.

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 86b6762  [GOBBLIN-683] Add azkaban client retry logic.
86b6762 is described below

commit 86b67626371720a57fc73bdc3172b4ace2803eb2
Author: Kuai Yu <ku...@linkedin.com>
AuthorDate: Wed Feb 20 14:32:25 2019 -0800

    [GOBBLIN-683] Add azkaban client retry logic.
    
    Closes #2555 from yukuai518/session
---
 .../modules/orchestration/AzkabanClient.java       | 358 ++++++++-----------
 .../orchestration/AzkabanClientException.java      |   2 +-
 .../modules/orchestration/AzkabanClientParams.java |   5 +-
 .../modules/orchestration/AzkabanClientStatus.java |  28 --
 .../orchestration/AzkabanExecuteFlowStatus.java    |   4 -
 .../AzkabanFetchExecuteFlowStatus.java             |   4 -
 .../orchestration/AzkabanMultiCallables.java       | 378 +++++++++++++++++++++
 ...banClientException.java => AzkabanSuccess.java} |  18 +-
 ...Exception.java => InvalidSessionException.java} |  14 +-
 ...ion.java => UnreachableStatementException.java} |  14 +-
 .../modules/orchestration/AzkabanClientTest.java   |  63 ++--
 11 files changed, 580 insertions(+), 308 deletions(-)

diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
index ec6bce4..1f9293a 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
@@ -21,47 +21,50 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.http.Header;
 import org.apache.http.HttpEntity;
-import org.apache.http.HttpHeaders;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
-import org.apache.http.NameValuePair;
 import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.utils.URLEncodedUtils;
 import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
 import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.mime.MultipartEntityBuilder;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.impl.conn.BasicHttpClientConnectionManager;
-import org.apache.http.message.BasicHeader;
-import org.apache.http.message.BasicNameValuePair;
 import org.apache.http.ssl.SSLContextBuilder;
 import org.apache.http.ssl.TrustStrategy;
 import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.rholder.retry.AttemptTimeLimiters;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.SimpleTimeLimiter;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 
 import lombok.Builder;
 
+import org.apache.gobblin.util.ExecutorsUtils;
+
 
 /**
  * A simple http based client that uses Ajax API to communicate with Azkaban server.
@@ -72,15 +75,16 @@ import lombok.Builder;
  */
 public class AzkabanClient implements Closeable {
   protected final String username;
+  protected final String password;
   protected final String url;
   protected final long sessionExpireInMin; // default value is 12h.
   protected SessionManager sessionManager;
-  protected String password;
   protected String sessionId;
   protected long sessionCreationTime = 0;
   protected CloseableHttpClient httpClient;
-
-  private boolean customHttpClientProvided = true;
+  private ExecutorService executorService;
+  private Closer closer = Closer.create();
+  private Retryer<AzkabanClientStatus> retryer;
   private static Logger log = LoggerFactory.getLogger(AzkabanClient.class);
 
   /**
@@ -92,7 +96,8 @@ public class AzkabanClient implements Closeable {
                           String url,
                           long sessionExpireInMin,
                           CloseableHttpClient httpClient,
-                          SessionManager sessionManager)
+                          SessionManager sessionManager,
+                          ExecutorService executorService)
       throws AzkabanClientException {
     this.username = username;
     this.password = password;
@@ -100,9 +105,16 @@ public class AzkabanClient implements Closeable {
     this.sessionExpireInMin = sessionExpireInMin;
     this.httpClient = httpClient;
     this.sessionManager = sessionManager;
+    this.executorService = executorService;
     this.initializeClient();
     this.initializeSessionManager();
-
+    this.intializeExecutorService();
+    this.retryer = RetryerBuilder.<AzkabanClientStatus>newBuilder()
+        .retryIfExceptionOfType(InvalidSessionException.class)
+        .withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(10, TimeUnit.SECONDS, this.executorService))
+        .withWaitStrategy(WaitStrategies.exponentialWait(60, TimeUnit.SECONDS))
+        .withStopStrategy(StopStrategies.stopAfterAttempt(3))
+        .build();
     this.sessionId = this.sessionManager.fetchSession();
     this.sessionCreationTime = System.nanoTime();
   }
@@ -110,7 +122,7 @@ public class AzkabanClient implements Closeable {
   private void initializeClient() throws AzkabanClientException {
     if (this.httpClient == null) {
       this.httpClient = createHttpClient();
-      this.customHttpClientProvided = false;
+      this.closer.register(this.httpClient);
     }
   }
 
@@ -123,6 +135,12 @@ public class AzkabanClient implements Closeable {
     }
   }
 
+  private void intializeExecutorService() {
+    if (this.executorService == null) {
+      this.executorService = Executors.newFixedThreadPool(30);
+    }
+  }
+
   /**
    * Create a {@link CloseableHttpClient} used to communicate with Azkaban server.
    * Derived class can configure different http client by overriding this method.
@@ -159,10 +177,19 @@ public class AzkabanClient implements Closeable {
   /**
    * When current session expired, use {@link SessionManager} to refresh the session id.
    */
-  private void refreshSession() throws AzkabanClientException {
+  void refreshSession(boolean forceRefresh) throws AzkabanClientException {
     Preconditions.checkArgument(this.sessionCreationTime != 0);
-    if ((System.nanoTime() - this.sessionCreationTime) > Duration.ofMinutes(this.sessionExpireInMin).toNanos()) {
+    boolean expired = (System.nanoTime() - this.sessionCreationTime) > Duration
+        .ofMinutes(this.sessionExpireInMin)
+        .toNanos();
+
+    if (expired) {
       log.info("Session expired. Generating a new session.");
+    } else if (forceRefresh) {
+      log.info("Force to refresh session. Generating a new session.");
+    }
+
+    if (expired || forceRefresh) {
       this.sessionId = this.sessionManager.fetchSession();
       this.sessionCreationTime = System.nanoTime();
     }
@@ -226,6 +253,11 @@ public class AzkabanClient implements Closeable {
         .replaceAll("\"", ""))) {
       String message = (null != jsonObject.get(AzkabanClientParams.MESSAGE)) ? jsonObject.get(AzkabanClientParams.MESSAGE).toString()
           .replaceAll("\"", "") : "Unknown issue";
+
+      if (message.contains("Invalid Session")) {
+        throw new InvalidSessionException(message);
+      }
+
       throw new IOException(message);
     }
 
@@ -243,34 +275,16 @@ public class AzkabanClient implements Closeable {
    *
    * @return A status object indicating if AJAX request is successful.
    */
-  public AzkabanClientStatus createProject(
-      String projectName,
-      String description) {
-    try {
-      refreshSession();
-      HttpPost httpPost = new HttpPost(this.url + "/manager");
-      List<NameValuePair> nvps = new ArrayList<>();
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.ACTION, "create"));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, this.sessionId));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.NAME, projectName));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.DESCRIPTION, description));
-      httpPost.setEntity(new UrlEncodedFormEntity(nvps));
-
-      Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
-      Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
-      httpPost.setHeaders(new Header[]{contentType, requestType});
-
-      CloseableHttpResponse response = this.httpClient.execute(httpPost);
-
-      try {
-        handleResponse(response);
-        return new AzkabanClientStatus.SUCCESS();
-      } finally {
-        response.close();
-      }
-    } catch (Exception e) {
-      return new AzkabanClientStatus.FAIL("Azkaban client cannot create project.", e);
-    }
+  public AzkabanClientStatus createProject(String projectName,
+                                           String description) throws AzkabanClientException {
+    AzkabanMultiCallables.CreateProjectCallable callable =
+       AzkabanMultiCallables.CreateProjectCallable.builder()
+           .client(this)
+           .projectName(projectName)
+           .description(description)
+           .build();
+
+    return runWithRetry(callable, AzkabanClientStatus.class);
   }
 
   /**
@@ -281,28 +295,15 @@ public class AzkabanClient implements Closeable {
    *
    * @return A status object indicating if AJAX request is successful.
    */
-  public AzkabanClientStatus deleteProject(String projectName) {
-    try {
-      refreshSession();
-      List<NameValuePair> nvps = new ArrayList<>();
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.DELETE, "true"));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, this.sessionId));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.PROJECT, projectName));
-
-      Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
-      Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
+  public AzkabanClientStatus deleteProject(String projectName) throws AzkabanClientException {
 
-      HttpGet httpGet = new HttpGet(url + "/manager?" + URLEncodedUtils.format(nvps, "UTF-8"));
-      httpGet.setHeaders(new Header[]{contentType, requestType});
+    AzkabanMultiCallables.DeleteProjectCallable callable =
+        AzkabanMultiCallables.DeleteProjectCallable.builder()
+            .client(this)
+            .projectName(projectName)
+            .build();
 
-      CloseableHttpResponse response = this.httpClient.execute(httpGet);
-      response.close();
-      return new AzkabanClientStatus.SUCCESS();
-
-    } catch (Exception e) {
-      return new AzkabanClientStatus.FAIL("Azkaban client cannot delete project = "
-          + projectName, e);
-    }
+    return runWithRetry(callable, AzkabanClientStatus.class);
   }
 
   /**
@@ -314,33 +315,17 @@ public class AzkabanClient implements Closeable {
    *
    * @return A status object indicating if AJAX request is successful.
    */
-  public AzkabanClientStatus uploadProjectZip(
-      String projectName,
-      File zipFile) {
-    try {
-      refreshSession();
-      HttpPost httpPost = new HttpPost(this.url + "/manager");
-      HttpEntity entity = MultipartEntityBuilder.create()
-          .addTextBody(AzkabanClientParams.SESSION_ID, sessionId)
-          .addTextBody(AzkabanClientParams.AJAX, "upload")
-          .addTextBody(AzkabanClientParams.PROJECT, projectName)
-          .addBinaryBody("file", zipFile,
-              ContentType.create("application/zip"), zipFile.getName())
-          .build();
-      httpPost.setEntity(entity);
+  public AzkabanClientStatus uploadProjectZip(String projectName,
+                                              File zipFile) throws AzkabanClientException {
 
-      CloseableHttpResponse response = this.httpClient.execute(httpPost);
+    AzkabanMultiCallables.UploadProjectCallable callable =
+        AzkabanMultiCallables.UploadProjectCallable.builder()
+            .client(this)
+            .projectName(projectName)
+            .zipFile(zipFile)
+            .build();
 
-      try {
-        handleResponse(response);
-        return new AzkabanClientStatus.SUCCESS();
-      } finally {
-        response.close();
-      }
-    } catch (Exception e) {
-      return new AzkabanClientStatus.FAIL("Azkaban client cannot upload zip to project = "
-          + projectName, e);
-    }
+    return runWithRetry(callable, AzkabanClientStatus.class);
   }
 
   /**
@@ -353,44 +338,20 @@ public class AzkabanClient implements Closeable {
    *
    * @return The status object which contains success status and execution id.
    */
-  public AzkabanExecuteFlowStatus executeFlowWithOptions(
-      String projectName,
-      String flowName,
-      Map<String, String> flowOptions,
-      Map<String, String> flowParameters) {
-
-    try {
-      refreshSession();
-      HttpPost httpPost = new HttpPost(this.url + "/executor");
-      List<NameValuePair> nvps = new ArrayList<>();
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "executeFlow"));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, this.sessionId));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.PROJECT, projectName));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.FLOW, flowName));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.CONCURRENT_OPTION, "ignore"));
-
-      addFlowOptions(nvps, flowOptions);
-      addFlowParameters(nvps, flowParameters);
-
-      httpPost.setEntity(new UrlEncodedFormEntity(nvps));
-
-      Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
-      Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
-      httpPost.setHeaders(new Header[]{contentType, requestType});
-
-      CloseableHttpResponse response = this.httpClient.execute(httpPost);
-
-      try {
-        Map<String, String> map = handleResponse(response);
-        return new AzkabanExecuteFlowStatus(
-            new AzkabanExecuteFlowStatus.ExecuteId(map.get(AzkabanClientParams.EXECID)));
-      } finally {
-        response.close();
-      }
-    } catch (Exception e) {
-      return new AzkabanExecuteFlowStatus("Azkaban client cannot execute flow = "
-          + flowName, e);
-    }
+  public AzkabanExecuteFlowStatus executeFlowWithOptions(String projectName,
+                                                         String flowName,
+                                                         Map<String, String> flowOptions,
+                                                         Map<String, String> flowParameters) throws AzkabanClientException {
+    AzkabanMultiCallables.ExecuteFlowCallable callable =
+        AzkabanMultiCallables.ExecuteFlowCallable.builder()
+            .client(this)
+            .projectName(projectName)
+            .flowName(flowName)
+            .flowOptions(flowOptions)
+            .flowParameters(flowParameters)
+            .build();
+
+    return runWithRetry(callable, AzkabanExecuteFlowStatus.class);
   }
 
   /**
@@ -402,104 +363,83 @@ public class AzkabanClient implements Closeable {
    *
    * @return The status object which contains success status and execution id.
    */
-  public AzkabanExecuteFlowStatus executeFlow(
-      String projectName,
-      String flowName,
-      Map<String, String> flowParameters) {
+  public AzkabanExecuteFlowStatus executeFlow(String projectName,
+                                              String flowName,
+                                              Map<String, String> flowParameters) throws AzkabanClientException {
     return executeFlowWithOptions(projectName, flowName, null, flowParameters);
   }
 
   /**
    * Cancel a flow by execution id.
    */
-  public AzkabanClientStatus cancelFlow(int execId) {
-    try {
-      refreshSession();
-      List<NameValuePair> nvps = new ArrayList<>();
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "cancelFlow"));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, this.sessionId));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, String.valueOf(execId)));
-
-      Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
-      Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
-
-      HttpGet httpGet = new HttpGet(url + "/executor?" + URLEncodedUtils.format(nvps, "UTF-8"));
-      httpGet.setHeaders(new Header[]{contentType, requestType});
-
-      CloseableHttpResponse response = this.httpClient.execute(httpGet);
-      try {
-        handleResponse(response);
-        return new AzkabanClientStatus.SUCCESS();
-      } finally {
-        response.close();
-      }
-    } catch (Exception e) {
-      return new AzkabanClientStatus.FAIL("", e);
-    }
+  public AzkabanClientStatus cancelFlow(String execId) throws AzkabanClientException {
+    AzkabanMultiCallables.CancelFlowCallable callable =
+        AzkabanMultiCallables.CancelFlowCallable.builder()
+            .client(this)
+            .execId(execId)
+            .build();
+
+    return runWithRetry(callable, AzkabanClientStatus.class);
   }
 
+  /**
+   * Fetch an execution log.
+   */
+  public AzkabanClientStatus fetchExecutionLog(String execId,
+                                               String jobId,
+                                               String offset,
+                                               String length,
+                                               File ouf) throws AzkabanClientException {
+    AzkabanMultiCallables.FetchExecLogCallable callable =
+        AzkabanMultiCallables.FetchExecLogCallable.builder()
+            .client(this)
+            .execId(execId)
+            .jobId(jobId)
+            .offset(offset)
+            .length(length)
+            .output(ouf)
+            .build();
+
+    return runWithRetry(callable, AzkabanClientStatus.class);
+  }
 
   /**
-   * Given an execution id, fetches all the detailed information of that execution, including a list of all the job executions.
+   * Given an execution id, fetches all the detailed information of that execution,
+   * including a list of all the job executions.
    *
    * @param execId execution id to be fetched.
    *
-   * @return The status object which contains success status and all the detailed information of that execution.
+   * @return The status object which contains success status and all the detailed
+   *         information of that execution.
    */
-  public AzkabanFetchExecuteFlowStatus fetchFlowExecution (String execId) {
-    try {
-      refreshSession();
-      List<NameValuePair> nvps = new ArrayList<>();
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "fetchexecflow"));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, this.sessionId));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, execId));
-
-      Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
-      Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
-
-      HttpGet httpGet = new HttpGet(url + "/executor?" + URLEncodedUtils.format(nvps, "UTF-8"));
-      httpGet.setHeaders(new Header[]{contentType, requestType});
-
-      CloseableHttpResponse response = this.httpClient.execute(httpGet);
-      try {
-        Map<String, String> map = handleResponse(response);
-        return new AzkabanFetchExecuteFlowStatus(new AzkabanFetchExecuteFlowStatus.Execution(map));
-      } finally {
-        response.close();
-      }
-    } catch (Exception e) {
-      return new AzkabanFetchExecuteFlowStatus("Azkaban client cannot "
-          + "fetch execId " + execId, e);
-    }
+  public AzkabanFetchExecuteFlowStatus fetchFlowExecution(String execId) throws AzkabanClientException {
+    AzkabanMultiCallables.FetchFlowExecCallable callable =
+        AzkabanMultiCallables.FetchFlowExecCallable.builder()
+            .client(this)
+            .execId(execId)
+            .build();
+
+    return runWithRetry(callable, AzkabanFetchExecuteFlowStatus.class);
   }
 
-  private void addFlowParameters(List<NameValuePair> nvps, Map<String, String> flowParams) {
-    if (flowParams != null) {
-      for (Map.Entry<String, String> entry : flowParams.entrySet()) {
-        String key = entry.getKey();
-        String value = entry.getValue();
-        if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) {
-          log.debug("New flow parameter added:" + key + "-->" + value);
-          nvps.add(new BasicNameValuePair("flowOverride[" + key + "]", value));
-        }
-      }
-    }
-  }
-
-  private void addFlowOptions(List<NameValuePair> nvps, Map<String, String> flowOptions) {
-    if (flowOptions != null) {
-      for (Map.Entry<String, String> option : flowOptions.entrySet()) {
-        log.debug("New flow option added:" + option .getKey()+ "-->" + option.getValue());
-        nvps.add(new BasicNameValuePair(option.getKey(), option.getValue()));
+  private <T> T runWithRetry(Callable callable, Class<T> cls) throws AzkabanClientException {
+    try {
+      AzkabanClientStatus status = this.retryer.call(callable);
+      if (status.getClass().equals(cls)) {
+        return ((T)status);
       }
+    } catch (ExecutionException e) {
+      Throwables.propagateIfPossible(e.getCause(), AzkabanClientException.class);
+    } catch (RetryException e) {
+      throw new AzkabanClientException("RetryException occurred ", e);
     }
+    // should never reach to here.
+    throw new UnreachableStatementException("Cannot reach here.");
   }
 
   @Override
   public void close()
       throws IOException {
-    if (!customHttpClientProvided) {
-      this.httpClient.close();
-    }
+    this.closer.close();
   }
 }
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
index da94c03..5e4a2b0 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
@@ -25,7 +25,7 @@ import java.io.IOException;
 public class AzkabanClientException extends IOException {
   private static final long serialVersionUID = 11324144L;
 
-  public AzkabanClientException(String message, Exception e) {
+  public AzkabanClientException(String message, Throwable e) {
     super(message, e);
   }
 
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientParams.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientParams.java
index 27bb4f2..6625d12 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientParams.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientParams.java
@@ -39,5 +39,8 @@ public class AzkabanClientParams {
   public static final String STATUS = "status";
   public static final String ERROR = "error";
   public static final String EXECID = "execid";
-
+  public static final String JOBID = "jobId";
+  public static final String DATA = "data";
+  public static final String OFFSET = "offset";
+  public static final String LENGTH = "length";
 }
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientStatus.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientStatus.java
index 132f4ad..7ff9c1a 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientStatus.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientStatus.java
@@ -21,10 +21,6 @@ import lombok.Getter;
 
 @Getter
 public abstract class AzkabanClientStatus<RS> {
-  private boolean success = true;
-  private String failMsg = "";
-  private Throwable throwable = null;
-
   private RS response = null;
 
   public AzkabanClientStatus() {
@@ -33,28 +29,4 @@ public abstract class AzkabanClientStatus<RS> {
   public AzkabanClientStatus(RS response) {
     this.response = response;
   }
-
-  public AzkabanClientStatus(String failMsg, Throwable throwable) {
-    this.success = false;
-    this.failMsg = failMsg;
-    this.throwable = throwable;
-  }
-
-  /**
-   * This status captures basic success.
-   */
-  public static class SUCCESS extends AzkabanClientStatus<Object> {
-    public SUCCESS() {
-      super();
-    }
-  }
-
-  /**
-   * This status captures basic failure (fail message and throwable).
-   */
-  public static class FAIL extends AzkabanClientStatus<Object> {
-    public FAIL(String failMsg, Throwable throwable) {
-      super(failMsg, throwable);
-    }
-  }
 }
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanExecuteFlowStatus.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanExecuteFlowStatus.java
index be9af89..2908ec4 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanExecuteFlowStatus.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanExecuteFlowStatus.java
@@ -29,10 +29,6 @@ public class AzkabanExecuteFlowStatus extends AzkabanClientStatus<AzkabanExecute
     super(executeId);
   }
 
-  public AzkabanExecuteFlowStatus(String failMsg, Throwable throwable) {
-    super(failMsg, throwable);
-  }
-
   @Getter
   @AllArgsConstructor
   public static class ExecuteId {
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanFetchExecuteFlowStatus.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanFetchExecuteFlowStatus.java
index 1e0afe8..5a52cad 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanFetchExecuteFlowStatus.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanFetchExecuteFlowStatus.java
@@ -33,10 +33,6 @@ public class AzkabanFetchExecuteFlowStatus extends AzkabanClientStatus<AzkabanFe
     super(exec);
   }
 
-  public AzkabanFetchExecuteFlowStatus(String failMsg, Throwable throwable) {
-    super(failMsg, throwable);
-  }
-
   @Getter
   @AllArgsConstructor
   public static class Execution {
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanMultiCallables.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanMultiCallables.java
new file mode 100644
index 0000000..9091220
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanMultiCallables.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URLEncodedUtils;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.mime.MultipartEntityBuilder;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.message.BasicNameValuePair;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Closer;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * This class encapsulates all the operations an {@link AzkabanClient} can do.
+ */
+class AzkabanMultiCallables {
+
+  /**
+   * This class can never been instantiated.
+   */
+  private AzkabanMultiCallables() {
+  }
+
+  /**
+   * A callable that will create a project on Azkaban.
+   */
+  @Builder
+  static class CreateProjectCallable implements Callable<AzkabanClientStatus> {
+    private AzkabanClient client;
+    private String projectName;
+    private String description;
+    private boolean invalidSession = false;
+
+    @Override
+    public AzkabanClientStatus call()
+        throws AzkabanClientException {
+
+      try (Closer closer = Closer.create()) {
+        client.refreshSession(this.invalidSession);
+        HttpPost httpPost = new HttpPost(client.url + "/manager");
+        List<NameValuePair> nvps = new ArrayList<>();
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.ACTION, "create"));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, client.sessionId));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.NAME, projectName));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.DESCRIPTION, description));
+        httpPost.setEntity(new UrlEncodedFormEntity(nvps));
+
+        Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
+        Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
+        httpPost.setHeaders(new Header[]{contentType, requestType});
+
+        CloseableHttpResponse response = client.httpClient.execute(httpPost);
+        closer.register(response);
+        client.handleResponse(response);
+        return new AzkabanSuccess();
+      } catch (InvalidSessionException e) {
+        this.invalidSession = true;
+        throw e;
+      } catch (Throwable e) {
+        throw new AzkabanClientException("Azkaban client cannot create project = "
+            + projectName, e);
+      }
+    }
+  }
+
+  /**
+   * A callable that will delete a project on Azkaban.
+   */
+  @Builder
+  static class DeleteProjectCallable implements Callable<AzkabanClientStatus> {
+    private AzkabanClient client;
+    private String projectName;
+    private boolean invalidSession = false;
+
+    @Override
+    public AzkabanClientStatus call()
+        throws AzkabanClientException {
+
+      try (Closer closer = Closer.create()) {
+        client.refreshSession(this.invalidSession);
+        List<NameValuePair> nvps = new ArrayList<>();
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.DELETE, "true"));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, client.sessionId));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.PROJECT, projectName));
+
+        Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
+        Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
+
+        HttpGet httpGet = new HttpGet(client.url + "/manager?" + URLEncodedUtils.format(nvps, "UTF-8"));
+        httpGet.setHeaders(new Header[]{contentType, requestType});
+
+        CloseableHttpResponse response = client.httpClient.execute(httpGet);
+        closer.register(response);
+        client.handleResponse(response);
+        return new AzkabanSuccess();
+      } catch (InvalidSessionException e) {
+        this.invalidSession = true;
+        throw e;
+      } catch (Throwable e) {
+        throw new AzkabanClientException("Azkaban client cannot delete project = "
+            + projectName, e);
+      }
+    }
+  }
+
+  /**
+   * A callable that will execute a flow on Azkaban.
+   */
+  @Builder
+  static class UploadProjectCallable implements Callable<AzkabanClientStatus> {
+    private AzkabanClient client;
+    private String projectName;
+    private File zipFile;
+    private boolean invalidSession = false;
+
+    @Override
+    public AzkabanClientStatus call()
+        throws AzkabanClientException {
+
+      try (Closer closer = Closer.create()) {
+        client.refreshSession(this.invalidSession);
+        HttpPost httpPost = new HttpPost(client.url + "/manager");
+        HttpEntity entity = MultipartEntityBuilder.create()
+            .addTextBody(AzkabanClientParams.SESSION_ID, client.sessionId)
+            .addTextBody(AzkabanClientParams.AJAX, "upload")
+            .addTextBody(AzkabanClientParams.PROJECT, projectName)
+            .addBinaryBody("file", zipFile,
+                ContentType.create("application/zip"), zipFile.getName())
+            .build();
+        httpPost.setEntity(entity);
+
+        CloseableHttpResponse response = client.httpClient.execute(httpPost);
+        closer.register(response);
+        client.handleResponse(response);
+        return new AzkabanSuccess();
+      } catch (InvalidSessionException e) {
+        this.invalidSession = true;
+        throw e;
+      } catch (Throwable e) {
+        throw new AzkabanClientException("Azkaban client cannot upload zip to project = "
+            + projectName, e);
+      }
+    }
+  }
+
+  /**
+   * A callable that will execute a flow on Azkaban.
+   */
+  @Builder
+  static class ExecuteFlowCallable implements Callable<AzkabanClientStatus> {
+    private AzkabanClient client;
+    private String projectName;
+    private String flowName;
+    private Map<String, String> flowOptions;
+    private Map<String, String> flowParameters;
+    private boolean invalidSession = false;
+
+    @Override
+    public AzkabanExecuteFlowStatus call()
+        throws AzkabanClientException {
+      try (Closer closer = Closer.create()) {
+        client.refreshSession(this.invalidSession);
+        HttpPost httpPost = new HttpPost(client.url + "/executor");
+        List<NameValuePair> nvps = new ArrayList<>();
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "executeFlow"));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, client.sessionId));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.PROJECT, projectName));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.FLOW, flowName));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.CONCURRENT_OPTION, "ignore"));
+
+        addFlowOptions(nvps, flowOptions);
+        addFlowParameters(nvps, flowParameters);
+
+        httpPost.setEntity(new UrlEncodedFormEntity(nvps));
+
+        Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
+        Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
+        httpPost.setHeaders(new Header[]{contentType, requestType});
+
+        CloseableHttpResponse response = client.httpClient.execute(httpPost);
+        closer.register(response);
+        Map<String, String> map = client.handleResponse(response);
+        return new AzkabanExecuteFlowStatus(
+            new AzkabanExecuteFlowStatus.ExecuteId(map.get(AzkabanClientParams.EXECID)));
+      } catch (InvalidSessionException e) {
+        this.invalidSession = true;
+        throw e;
+      } catch (Exception e) {
+        throw new AzkabanClientException("Azkaban client cannot execute flow = "
+            + flowName, e);
+      }
+    }
+
+    private void addFlowParameters(List<NameValuePair> nvps, Map<String, String> flowParams) {
+      if (flowParams != null) {
+        for (Map.Entry<String, String> entry : flowParams.entrySet()) {
+          String key = entry.getKey();
+          String value = entry.getValue();
+          if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) {
+            nvps.add(new BasicNameValuePair("flowOverride[" + key + "]", value));
+          }
+        }
+      }
+    }
+
+    private void addFlowOptions(List<NameValuePair> nvps, Map<String, String> flowOptions) {
+      if (flowOptions != null) {
+        for (Map.Entry<String, String> option : flowOptions.entrySet()) {
+          nvps.add(new BasicNameValuePair(option.getKey(), option.getValue()));
+        }
+      }
+    }
+  }
+
+  /**
+   * A callable that will cancel a flow on Azkaban.
+   */
+  @Builder
+  static class CancelFlowCallable implements Callable<AzkabanClientStatus> {
+    private AzkabanClient client;
+    private String execId;
+    private boolean invalidSession = false;
+
+    @Override
+    public AzkabanClientStatus call()
+        throws AzkabanClientException {
+      try (Closer closer = Closer.create()) {
+        client.refreshSession(this.invalidSession);
+        List<NameValuePair> nvps = new ArrayList<>();
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "cancelFlow"));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, client.sessionId));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, String.valueOf(execId)));
+
+        Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
+        Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
+
+        HttpGet httpGet = new HttpGet(client.url + "/executor?" + URLEncodedUtils.format(nvps, "UTF-8"));
+        httpGet.setHeaders(new Header[]{contentType, requestType});
+
+        CloseableHttpResponse response = client.httpClient.execute(httpGet);
+        closer.register(response);
+        client.handleResponse(response);
+        return new AzkabanSuccess();
+      } catch (InvalidSessionException e) {
+        this.invalidSession = true;
+        throw e;
+      } catch (Exception e) {
+        throw new AzkabanClientException("Azkaban client cannot cancel flow execId = "
+            + execId, e);
+      }
+    }
+  }
+
+  /**
+   * A callable that will fetch a flow status on Azkaban.
+   */
+  @Builder
+  static class FetchFlowExecCallable implements Callable<AzkabanClientStatus> {
+    private AzkabanClient client;
+    private String execId;
+    private boolean invalidSession = false;
+
+    @Override
+    public AzkabanFetchExecuteFlowStatus call()
+        throws AzkabanClientException {
+      try (Closer closer = Closer.create()) {
+        client.refreshSession(this.invalidSession);
+        List<NameValuePair> nvps = new ArrayList<>();
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "fetchexecflow"));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, client.sessionId));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, execId));
+
+        Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
+        Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
+
+        HttpGet httpGet = new HttpGet(client.url + "/executor?" + URLEncodedUtils.format(nvps, "UTF-8"));
+        httpGet.setHeaders(new Header[]{contentType, requestType});
+
+        CloseableHttpResponse response = client.httpClient.execute(httpGet);
+        closer.register(response);
+        Map<String, String> map = client.handleResponse(response);
+        return new AzkabanFetchExecuteFlowStatus(new AzkabanFetchExecuteFlowStatus.Execution(map));
+      } catch (InvalidSessionException e) {
+        this.invalidSession = true;
+        throw e;
+      } catch (Exception e) {
+        throw new AzkabanClientException("Azkaban client cannot "
+            + "fetch execId " + execId, e);
+      }
+    }
+  }
+
+  /**
+   * A callable that will fetch a flow log on Azkaban.
+   */
+  @Builder
+  static class FetchExecLogCallable implements Callable<AzkabanClientStatus> {
+    private AzkabanClient client;
+    private String execId;
+    private String jobId;
+    private String offset;
+    private String length;
+    private File output;
+    private boolean invalidSession = false;
+
+    @Override
+    public AzkabanClientStatus call()
+        throws AzkabanClientException {
+      try (Closer closer = Closer.create()) {
+        client.refreshSession(this.invalidSession);
+        List<NameValuePair> nvps = new ArrayList<>();
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "fetchExecJobLogs"));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, client.sessionId));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, execId));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.JOBID, jobId));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.OFFSET, offset));
+        nvps.add(new BasicNameValuePair(AzkabanClientParams.LENGTH, length));
+
+        Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
+        Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
+
+        HttpGet httpGet = new HttpGet(client.url + "/executor?" + URLEncodedUtils.format(nvps, "UTF-8"));
+        httpGet.setHeaders(new Header[]{contentType, requestType});
+
+        CloseableHttpResponse response = client.httpClient.execute(httpGet);
+        closer.register(response);
+        Map<String, String> map = client.handleResponse(response);
+        FileUtils.writeStringToFile(output, map.get(AzkabanClientParams.DATA), Charsets.UTF_8);
+        return new AzkabanSuccess();
+      } catch (InvalidSessionException e) {
+        this.invalidSession = true;
+        throw e;
+      } catch (Exception e) {
+        throw new AzkabanClientException("Azkaban client cannot "
+            + "fetch execId " + execId, e);
+      }
+    }
+  }
+}
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSuccess.java
similarity index 71%
copy from gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
copy to gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSuccess.java
index da94c03..07b8391 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSuccess.java
@@ -17,19 +17,15 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
-import java.io.IOException;
-
 /**
- * Exception raised by {@link AzkabanClient}.
+ * A successful status for {@link AzkabanClient}.
  */
-public class AzkabanClientException extends IOException {
-  private static final long serialVersionUID = 11324144L;
-
-  public AzkabanClientException(String message, Exception e) {
-    super(message, e);
+public class AzkabanSuccess extends AzkabanClientStatus<String> {
+  public AzkabanSuccess() {
+    this("");
   }
 
-  public AzkabanClientException(String message) {
-    super(message);
+  public AzkabanSuccess(String response) {
+    super(response);
   }
-}
\ No newline at end of file
+}
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/InvalidSessionException.java
similarity index 75%
copy from gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
copy to gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/InvalidSessionException.java
index da94c03..77d278f 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/InvalidSessionException.java
@@ -17,19 +17,15 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
-import java.io.IOException;
-
 /**
- * Exception raised by {@link AzkabanClient}.
+ * Used by {@link AzkabanClient} to indicate current session is invalid.
  */
-public class AzkabanClientException extends IOException {
-  private static final long serialVersionUID = 11324144L;
-
-  public AzkabanClientException(String message, Exception e) {
+public class InvalidSessionException extends AzkabanClientException {
+  public InvalidSessionException(String message, Exception e) {
     super(message, e);
   }
 
-  public AzkabanClientException(String message) {
+  public InvalidSessionException(String message) {
     super(message);
   }
-}
\ No newline at end of file
+}
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/UnreachableStatementException.java
similarity index 75%
copy from gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
copy to gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/UnreachableStatementException.java
index da94c03..7e62fd7 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientException.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/UnreachableStatementException.java
@@ -17,19 +17,15 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
-import java.io.IOException;
-
 /**
- * Exception raised by {@link AzkabanClient}.
+ * Used by {@link AzkabanClient} to indicate an unreachable code block.
  */
-public class AzkabanClientException extends IOException {
-  private static final long serialVersionUID = 11324144L;
-
-  public AzkabanClientException(String message, Exception e) {
+public class UnreachableStatementException extends AzkabanClientException {
+  public UnreachableStatementException(String message, Exception e) {
     super(message, e);
   }
 
-  public AzkabanClientException(String message) {
+  public UnreachableStatementException(String message) {
     super(message);
   }
-}
\ No newline at end of file
+}
diff --git a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
index edefe3e..e648830 100644
--- a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
+++ b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
@@ -70,41 +70,44 @@ public class AzkabanClientTest {
     this.client.close();
   }
 
-  private void ensureProjectExist(String projectName, String description) {
-    AzkabanClientStatus status;
+  private void ensureProjectExist(String projectName, String description) throws AzkabanClientException {
     // make sure it is in a clean state
-    status = this.client.deleteProject(projectName);
-    Assert.assertTrue(status.isSuccess());
+    this.client.deleteProject(projectName);
 
     // make sure the project is created successfully
-    status = this.client.createProject(projectName, description);
-    Assert.assertTrue(status.isSuccess());
+    this.client.createProject(projectName, description);
   }
 
   @Test(enabled = false)
-  public void testCreateProject() {
+  public void testFetchLog() throws AzkabanClientException {
+    String execId = "11211956";
+    String jobId = "tracking-hourly-bucket1";
+
+    // fetch log
+    this.client.fetchExecutionLog(execId, jobId, "0", "100000000", new File("/tmp/sample.log"));
+  }
+
+
+  @Test(enabled = false)
+  public void testCreateProject() throws AzkabanClientException {
     String projectName = "project-create";
     String description = "This is a create project test.";
-    AzkabanClientStatus status;
 
     ensureProjectExist(projectName, description);
 
     // the second time creation should fail
-    status = this.client.createProject(projectName, description);
-    Assert.assertFalse(status.isSuccess());
+    this.client.createProject(projectName, description);
   }
 
   @Test(enabled = false)
-  public void testDeleteProject() {
+  public void testDeleteProject() throws AzkabanClientException {
     String projectName = "project-delete";
     String description = "This is a delete project test.";
-    AzkabanClientStatus status;
 
     ensureProjectExist(projectName, description);
 
     // delete the new project
-    status = this.client.deleteProject(projectName);
-    Assert.assertTrue(status.isSuccess());
+    this.client.deleteProject(projectName);
   }
 
   @Test(enabled = false)
@@ -112,18 +115,20 @@ public class AzkabanClientTest {
     String projectName = "project-upload";
     String description = "This is a upload project test.";
     String flowName = "test-upload";
-    AzkabanClientStatus status;
 
     ensureProjectExist(projectName, description);
 
     // upload Zip to project
     File zipFile = createAzkabanZip(flowName);
-    status = this.client.uploadProjectZip(projectName, zipFile);
-    Assert.assertTrue(status.isSuccess());
+    this.client.uploadProjectZip(projectName, zipFile);
 
     // upload Zip to an non-existed project
-    status = this.client.uploadProjectZip("Non-existed-project", zipFile);
-    Assert.assertFalse(status.isSuccess());
+    try {
+      this.client.uploadProjectZip("Non-existed-project", zipFile);
+      Assert.fail();
+    } catch (Exception e) {
+      log.info("Expected exception " + e.toString());
+    }
   }
 
   @Test(enabled = false)
@@ -136,12 +141,10 @@ public class AzkabanClientTest {
 
     // upload Zip to project
     File zipFile = createAzkabanZip(flowName);
-    AzkabanClientStatus status = this.client.uploadProjectZip(projectName, zipFile);
-    Assert.assertTrue(status.isSuccess());
+    this.client.uploadProjectZip(projectName, zipFile);
 
     // execute a flow
     AzkabanExecuteFlowStatus execStatus = this.client.executeFlow(projectName, flowName, Maps.newHashMap());
-    Assert.assertTrue(execStatus.isSuccess());
     log.info("Execid: {}", execStatus.getResponse().execId);
   }
 
@@ -155,8 +158,7 @@ public class AzkabanClientTest {
 
     // upload Zip to project
     File zipFile = createAzkabanZip(flowName);
-    AzkabanClientStatus status = this.client.uploadProjectZip(projectName, zipFile);
-    Assert.assertTrue(status.isSuccess());
+    this.client.uploadProjectZip(projectName, zipFile);
 
     Map<String, String> flowParams = Maps.newHashMap();
     flowParams.put("gobblin.source", "DummySource");
@@ -164,7 +166,6 @@ public class AzkabanClientTest {
 
     // execute a flow
     AzkabanExecuteFlowStatus execStatus = this.client.executeFlow(projectName, flowName, flowParams);
-    Assert.assertTrue(execStatus.isSuccess());
     log.info("Execid: {}", execStatus.getResponse().execId);
   }
 
@@ -178,14 +179,12 @@ public class AzkabanClientTest {
 
     // upload Zip to project
     File zipFile = createAzkabanZip(flowName);
-    AzkabanClientStatus status = this.client.uploadProjectZip(projectName, zipFile);
-    Assert.assertTrue(status.isSuccess());
+    this.client.uploadProjectZip(projectName, zipFile);
 
     Map<String, String> flowOptions = Maps.newHashMap();
 
     // execute a flow
     AzkabanExecuteFlowStatus execStatus = this.client.executeFlowWithOptions(projectName, flowName, flowOptions, Maps.newHashMap());
-    Assert.assertTrue(execStatus.isSuccess());
     log.info("Execid: {}", execStatus.getResponse().execId);
   }
 
@@ -199,14 +198,12 @@ public class AzkabanClientTest {
 
     // upload Zip to project
     File zipFile = createAzkabanZip(flowName);
-    AzkabanClientStatus status = this.client.uploadProjectZip(projectName, zipFile);
-    Assert.assertTrue(status.isSuccess());
+    this.client.uploadProjectZip(projectName, zipFile);
 
     Map<String, String> flowOptions = Maps.newHashMap();
 
     // execute a flow
     AzkabanExecuteFlowStatus execStatus = this.client.executeFlowWithOptions(projectName, flowName, flowOptions, Maps.newHashMap());
-    Assert.assertTrue(execStatus.isSuccess());
     log.info("Execid: {}", execStatus.getResponse().execId);
 
     // wait for the job started and failed
@@ -214,7 +211,9 @@ public class AzkabanClientTest {
 
     // job should fail
     AzkabanFetchExecuteFlowStatus fetchExecuteFlowStatus = this.client.fetchFlowExecution(execStatus.getResponse().execId);
-    Assert.assertTrue(fetchExecuteFlowStatus.isSuccess());
+    for (Map.Entry<String, String> entry : fetchExecuteFlowStatus.getResponse().getMap().entrySet()) {
+      log.info(entry.getKey() + " -> " + entry.getValue());
+    }
   }
 
   @Test(enabled = false)