You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/08/30 15:55:32 UTC

[6/9] incubator-gobblin git commit: Support to execut Azkaban project from Orchestrator

Support to execut Azkaban project from Orchestrator


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0bb5139c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0bb5139c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0bb5139c

Branch: refs/heads/master
Commit: 0bb5139c8822ded33295b9eb118b67df1cb9f418
Parents: e285202
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Wed Aug 30 03:36:04 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Aug 30 03:36:04 2017 -0700

----------------------------------------------------------------------
 .../orchestration/AzkabanAjaxAPIClient.java     | 340 +++++++++++--------
 .../modules/orchestration/AzkabanJobHelper.java |  70 +++-
 .../orchestration/AzkabanProjectConfig.java     |   2 +-
 .../AzkabanSpecExecutorInstanceProducer.java    |  59 ++--
 gradle/scripts/dependencyDefinitions.gradle     |   1 +
 5 files changed, 307 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
index 31fc753..d0b8471 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
@@ -19,8 +19,6 @@ package org.apache.gobblin.service.modules.orchestration;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
 import java.security.KeyManagementException;
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
@@ -37,7 +35,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.EncoderException;
 import org.apache.commons.codec.net.URLCodec;
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
@@ -55,13 +52,14 @@ import org.apache.http.ssl.SSLContextBuilder;
 import org.apache.http.ssl.TrustStrategy;
 
 import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 
 
 @Slf4j
 public class AzkabanAjaxAPIClient {
-
   private static Splitter SPLIT_ON_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
 
   // TODO: Ensure GET call urls do not grow too big
@@ -71,93 +69,107 @@ public class AzkabanAjaxAPIClient {
   private static final long MILLISECONDS_IN_HOUR = 60 * 60 * 1000;
   private static final URLCodec codec = new URLCodec();
 
+  /***
+   * Authenticate a user and obtain a session.id from response. Once a session.id has been obtained,
+   * until the session expires, this id can be used to do any API requests with a proper permission granted.
+   * A session expires if user log's out, changes machine, browser or location, if Azkaban is restarted,
+   * or if the session expires. The default session timeout is 24 hours (one day). User can re-login irrespective
+   * of wheter the session has expired or not. For the same user, a new session will always override the old one.
+   * @param username Username.
+   * @param password Password.
+   * @param azkabanServerUrl Azkaban Server Url.
+   * @return Session Id.
+   * @throws IOException
+   * @throws EncoderException
+   */
   public static String authenticateAndGetSessionId(String username, String password, String azkabanServerUrl)
       throws IOException, EncoderException {
     // Create post request
-    HttpPost postRequest = new HttpPost(azkabanServerUrl);
-    StringEntity input = new StringEntity(String.format("action=%s&username=%s&password=%s", "login",
-        username, codec.encode(password)));
-    input.setContentType("application/x-www-form-urlencoded");
-    postRequest.setEntity(input);
-    postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
+    Map<String, String> params = Maps.newHashMap();
+    params.put("action", "login");
+    params.put("username", username);
+    params.put("password", codec.encode(password));
 
-    // Make the call, get response
-    @Cleanup CloseableHttpClient httpClient = getHttpClient();
-    HttpResponse response = httpClient.execute(postRequest);
-
-    return handleResponse(response, "session.id").get("session.id");
+    return executePostRequest(preparePostRequest(azkabanServerUrl, null, params)).get("session.id");
   }
 
+  /***
+   * Get project.id for a Project Name.
+   * @param sessionId Session Id.
+   * @param azkabanProjectConfig Azkaban Project Config.
+   * @return Project Id.
+   * @throws IOException
+   */
   public static String getProjectId(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException {
     // Note: Every get call to Azkaban provides a projectId in response, so we have are using fetchProjectFlows call
     // .. because it does not need any additional params other than project name
-    // Create get request
-    HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=fetchprojectflows&session.id=%s&"
-            + "project=%s", azkabanProjectConfig.getAzkabanServerUrl(), sessionId,
-        azkabanProjectConfig.getAzkabanProjectName()));
+    Map<String, String> params = Maps.newHashMap();
+    params.put("ajax", "fetchprojectflows");
+    params.put("project", azkabanProjectConfig.getAzkabanProjectName());
 
-    // Make the call, get response
-    @Cleanup CloseableHttpClient httpClient = getHttpClient();
-    HttpResponse response = httpClient.execute(getRequest);
-    return handleResponse(response, "projectId").get("projectId");
+    return executeGetRequest(prepareGetRequest(azkabanProjectConfig.getAzkabanServerUrl() + "/manager",
+        sessionId, params)).get("projectId");
   }
 
+  /***
+   * Creates an Azkaban project and uploads the zip file. If proxy user and group permissions are specified in
+   * Azkaban Project Config, then this method also adds it to the project configuration.
+   * @param sessionId Session Id.
+   * @param zipFilePath Zip file to upload.
+   * @param azkabanProjectConfig Azkaban Project Config.
+   * @return Project Id.
+   * @throws IOException
+   */
   public static String createAzkabanProject(String sessionId, String zipFilePath,
       AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
+    Map<String, String> params = Maps.newHashMap();
+    params.put("ajax", "executeFlow");
+    params.put("name", azkabanProjectConfig.getAzkabanProjectName());
+    params.put("description", azkabanProjectConfig.getAzkabanProjectDescription());
 
-    String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
-    String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
-    String azkabanProjectDescription = azkabanProjectConfig.getAzkabanProjectDescription();
-    String groupAdminUsers = azkabanProjectConfig.getAzkabanGroupAdminUsers();
-
-    // Create post request
-    HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager?action=create");
-    StringEntity input = new StringEntity(String.format("session.id=%s&name=%s&description=%s", sessionId,
-        azkabanProjectName, azkabanProjectDescription));
-    input.setContentType("application/x-www-form-urlencoded");
-    postRequest.setEntity(input);
-    postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
-
-    // Make the call, get response
-    @Cleanup CloseableHttpClient httpClient = getHttpClient();
-    HttpResponse response = httpClient.execute(postRequest);
-    handleResponse(response);
+    executePostRequest(preparePostRequest(azkabanProjectConfig.getAzkabanServerUrl() +
+        "/manager?action=create", sessionId, params));
 
     // Add proxy user if any
     if (azkabanProjectConfig.getAzkabanUserToProxy().isPresent()) {
       Iterable<String> proxyUsers = SPLIT_ON_COMMA.split(azkabanProjectConfig.getAzkabanUserToProxy().get());
       for (String user : proxyUsers) {
-        addProxyUser(sessionId, azkabanServerUrl, azkabanProjectName, user);
+        addProxyUser(sessionId, azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName(), user);
       }
     }
 
     // Add group permissions if any
     // TODO: Support users (not just groups), and different permission types
     // (though we can add users, we only support groups at the moment and award them with admin permissions)
-    if (StringUtils.isNotBlank(groupAdminUsers)) {
-      String [] groups = StringUtils.split(groupAdminUsers, ",");
+    if (StringUtils.isNotBlank(azkabanProjectConfig.getAzkabanGroupAdminUsers())) {
+      String [] groups = StringUtils.split(azkabanProjectConfig.getAzkabanGroupAdminUsers(), ",");
       for (String group : groups) {
-        addUserPermission(sessionId, azkabanServerUrl, azkabanProjectName, group, true, true, false, false,
-            false, false);
+        addUserPermission(sessionId, azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName(),
+            group, true, true, false, false,false,
+            false);
       }
     }
 
     // Upload zip file to azkaban and return projectId
-    return uploadZipFileToAzkaban(sessionId, azkabanServerUrl, azkabanProjectName, zipFilePath);
+    return uploadZipFileToAzkaban(sessionId, azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName(), zipFilePath);
   }
 
+  /***
+   * Replace an existing Azkaban Project. If proxy user and group permissions are specified in
+   * Azkaban Project Config, then this method also adds it to the project configuration.
+   * @param sessionId Session Id.
+   * @param zipFilePath Zip file to upload.
+   * @param azkabanProjectConfig Azkaban Project Config.
+   * @return Project Id.
+   * @throws IOException
+   */
   public static String replaceAzkabanProject(String sessionId, String zipFilePath,
       AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
-
-    String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
-    String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
-    String azkabanProjectDescription = azkabanProjectConfig.getAzkabanProjectDescription();
-    String groupAdminUsers = azkabanProjectConfig.getAzkabanGroupAdminUsers();
-
     // Change project description
-    changeProjectDescription(sessionId, azkabanServerUrl, azkabanProjectName, azkabanProjectDescription);
+    changeProjectDescription(sessionId, azkabanProjectConfig.getAzkabanServerUrl(),
+        azkabanProjectConfig.getAzkabanProjectName(), azkabanProjectConfig.getAzkabanProjectDescription());
 
     // Add proxy user if any
     // Note: 1. We cannot remove previous proxy-user because there is no way to read it from Azkaban
@@ -166,7 +178,8 @@ public class AzkabanAjaxAPIClient {
     if (azkabanProjectConfig.getAzkabanUserToProxy().isPresent()) {
       Iterable<String> proxyUsers = SPLIT_ON_COMMA.split(azkabanProjectConfig.getAzkabanUserToProxy().get());
       for (String user : proxyUsers) {
-        addProxyUser(sessionId, azkabanServerUrl, azkabanProjectName, user);
+        addProxyUser(sessionId, azkabanProjectConfig.getAzkabanServerUrl(),
+            azkabanProjectConfig.getAzkabanProjectName(), user);
       }
     }
 
@@ -175,12 +188,13 @@ public class AzkabanAjaxAPIClient {
     // Note: 1. We cannot remove previous group-user because there is no way to read it from Azkaban
     //       2. Adding same group-user will return an error message, but we will ignore it
     // (though we can add users, we only support groups at the moment and award them with admin permissions)
-    if (StringUtils.isNotBlank(groupAdminUsers)) {
-      String [] groups = StringUtils.split(groupAdminUsers, ",");
+    if (StringUtils.isNotBlank(azkabanProjectConfig.getAzkabanGroupAdminUsers())) {
+      String [] groups = StringUtils.split(azkabanProjectConfig.getAzkabanGroupAdminUsers(), ",");
       for (String group : groups) {
         try {
-          addUserPermission(sessionId, azkabanServerUrl, azkabanProjectName, group, true, true, false, false, false,
-              false);
+          addUserPermission(sessionId, azkabanProjectConfig.getAzkabanServerUrl(),
+              azkabanProjectConfig.getAzkabanProjectName(), group, true, true,
+              false, false, false,false);
         } catch (IOException e) {
           // Ignore if group already exists, we cannot list existing groups; so its okay to attempt adding exiting
           // .. groups
@@ -192,21 +206,20 @@ public class AzkabanAjaxAPIClient {
     }
 
     // Upload zip file to azkaban and return projectId
-    return uploadZipFileToAzkaban(sessionId, azkabanServerUrl, azkabanProjectName, zipFilePath);
+    return uploadZipFileToAzkaban(sessionId, azkabanProjectConfig.getAzkabanServerUrl(),
+        azkabanProjectConfig.getAzkabanProjectName(), zipFilePath);
   }
 
   private static void addProxyUser(String sessionId, String azkabanServerUrl, String azkabanProjectName,
       String proxyUser)
       throws IOException {
-
     // Create get request (adding same proxy user multiple times is a non-issue, Azkaban handles it)
-    HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=addProxyUser&session.id=%s&"
-        + "project=%s&name=%s", azkabanServerUrl, sessionId, azkabanProjectName, proxyUser));
+    Map<String, String> params = Maps.newHashMap();
+    params.put("ajax", "addProxyUser");
+    params.put("project", azkabanProjectName);
+    params.put("name", proxyUser);
 
-    // Make the call, get response
-    @Cleanup CloseableHttpClient httpClient = getHttpClient();
-    HttpResponse response = httpClient.execute(getRequest);
-    handleResponse(response);
+    executeGetRequest(prepareGetRequest(azkabanServerUrl + "/manager", sessionId, params));
   }
 
   private static void addUserPermission(String sessionId, String azkabanServerUrl, String azkabanProjectName,
@@ -219,100 +232,155 @@ public class AzkabanAjaxAPIClient {
 
     // Create get request (adding same normal user permission multiple times will throw an error, but we cannot
     // list whole list of permissions anyways)
-    HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=addPermission&session.id=%s&"
-            + "project=%s&name=%s&group=%s&permissions[admin]=%s&permissions[read]=%s&permissions[write]=%s"
-            + "&permissions[execute]=%s&permissions[schedule]=%s", azkabanServerUrl, sessionId, azkabanProjectName, name,
-        isGroup, adminPermission, readPermission, writePermission, executePermission, schedulePermission));
-
-    // Make the call, get response
-    @Cleanup CloseableHttpClient httpClient = getHttpClient();
-    HttpResponse response = httpClient.execute(getRequest);
-    handleResponse(response);
+    Map<String, String> params = Maps.newHashMap();
+    params.put("ajax", "addPermission");
+    params.put("project", azkabanProjectName);
+    params.put("name", name);
+    params.put("group", Boolean.toString(isGroup));
+    params.put("permissions[admin]", Boolean.toString(adminPermission));
+    params.put("permissions[read]", Boolean.toString(readPermission));
+    params.put("permissions[write]", Boolean.toString(writePermission));
+    params.put("permissions[execute]", Boolean.toString(executePermission));
+    params.put("permissions[schedule]", Boolean.toString(schedulePermission));
+
+    executeGetRequest(prepareGetRequest(azkabanServerUrl + "/manager", sessionId, params));
   }
 
-  private static String uploadZipFileToAzkaban(String sessionId, String azkabanServerUrl, String azkabanProjectName,
-      String jobZipFile)
+  /***
+   * Schedule the Azkaban Project to run with a schedule.
+   * @param sessionId Session Id.
+   * @param azkabanProjectId Project Id.
+   * @param azkabanProjectConfig Azkaban Project Config.
+   * @throws IOException
+   */
+  public static void scheduleAzkabanProject(String sessionId, String azkabanProjectId,
+      AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
+    Map<String, String> params = Maps.newHashMap();
+    params.put("ajax", "scheduleFlow");
+    params.put("projectName", azkabanProjectConfig.getAzkabanProjectName());
+    params.put("flow", azkabanProjectConfig.getAzkabanProjectFlowName());
+    params.put("projectId", azkabanProjectId);
+    params.put("scheduleTime", getScheduledTimeInAzkabanFormat(LOW_NETWORK_TRAFFIC_BEGIN_HOUR,
+        LOW_NETWORK_TRAFFIC_END_HOUR, JOB_START_DELAY_MINUTES));
+    params.put("scheduleDate", getScheduledDateInAzkabanFormat());
+    params.put("is_recurring", "off");
+
+    // Run once OR push down schedule (TODO: Enable when push down is finalized)
+    //    if (azkabanProjectConfig.isScheduled()) {
+    //      params.put("is_recurring", "on");
+    //      params.put("period", "1d");
+    //    } else {
+    //      params.put("is_recurring", "off");
+    //    }
+
+    executePostRequest(preparePostRequest(azkabanProjectConfig.getAzkabanServerUrl() + "/schedule", sessionId, params));
+  }
 
-    // Create post request
-    HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager");
-    HttpEntity entity = MultipartEntityBuilder
-        .create()
-        .addTextBody("session.id", sessionId)
-        .addTextBody("ajax", "upload")
-        .addBinaryBody("file", new File(jobZipFile),
-            ContentType.create("application/zip"), azkabanProjectName + ".zip")
-        .addTextBody("project", azkabanProjectName)
-        .build();
-    postRequest.setEntity(entity);
+  private static void changeProjectDescription(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+      String projectDescription)
+      throws IOException {
+    String encodedProjectDescription;
+    try {
+      encodedProjectDescription = new URLCodec().encode(projectDescription);
+    } catch (EncoderException e) {
+      throw new IOException("Could not encode Azkaban project description", e);
+    }
 
-    // Make the call, get response
-    @Cleanup CloseableHttpClient httpClient = getHttpClient();
-    HttpResponse response = httpClient.execute(postRequest);
+    Map<String, String> params = Maps.newHashMap();
+    params.put("ajax", "changeDescription");
+    params.put("project", azkabanProjectName);
+    params.put("description", encodedProjectDescription);
 
-    // Obtaining projectId is hard. Uploading zip file is one avenue to get it from Azkaban
-    return handleResponse(response, "projectId").get("projectId");
+    executeGetRequest(prepareGetRequest(azkabanServerUrl + "/manager", sessionId, params));
   }
 
-  public static void scheduleAzkabanProject(String sessionId, String azkabanProjectId,
+  /***
+   * Execute an existing Azkaban project.
+   * @param sessionId Session Id.
+   * @param azkabanProjectId Project Id.
+   * @param azkabanProjectConfig Azkaban Project Config.
+   * @throws IOException
+   */
+  public static void executeAzkabanProject(String sessionId, String azkabanProjectId,
       AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
-    String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
-    String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
-    String azkabanProjectFlowName = azkabanProjectConfig.getAzkabanProjectFlowName();
+    Map<String, String> params = Maps.newHashMap();
+    params.put("ajax", "executeFlow");
+    params.put("projectName", azkabanProjectConfig.getAzkabanProjectName());
+    params.put("flow", azkabanProjectConfig.getAzkabanProjectFlowName());
+
+    executePostRequest(preparePostRequest(azkabanProjectConfig.getAzkabanServerUrl() + "/executor", sessionId, params));
+  }
+
+  private static HttpGet prepareGetRequest(String requestUrl, String sessionId, Map<String, String> params)
+      throws IOException {
+    // Create get request
+    StringBuilder stringEntityBuilder = new StringBuilder();
+    stringEntityBuilder.append(String.format("?session.id=%s", sessionId));
+    for (Map.Entry<String, String> entry : params.entrySet()) {
+      stringEntityBuilder.append(String.format("&%s=%s", entry.getKey(), entry.getValue()));
+    }
 
-    String scheduleString = "is_recurring=off"; // run only once
-    // TODO: Enable scheduling on Azkaban, when we are ready to push down the schedule
-//    if (azkabanProjectConfig.isScheduled()) {
-//      scheduleString = "is_recurring=on&period=1d"; // schedule once every day
-//    }
+    return new HttpGet(requestUrl + stringEntityBuilder);
+  }
 
+  private static HttpPost preparePostRequest(String requestUrl, String sessionId, Map<String, String> params)
+      throws IOException {
     // Create post request
-    HttpPost postRequest = new HttpPost(azkabanServerUrl + "/schedule");
-    StringEntity input = new StringEntity(String.format("session.id=%s&ajax=scheduleFlow"
-            + "&projectName=%s&flow=%s&projectId=%s&scheduleTime=%s&scheduleDate=%s&%s",
-        sessionId, azkabanProjectName, azkabanProjectFlowName, azkabanProjectId,
-        getScheduledTimeInAzkabanFormat(LOW_NETWORK_TRAFFIC_BEGIN_HOUR, LOW_NETWORK_TRAFFIC_END_HOUR,
-            JOB_START_DELAY_MINUTES), getScheduledDateInAzkabanFormat(), scheduleString));
+    HttpPost postRequest = new HttpPost(requestUrl);
+    StringBuilder stringEntityBuilder = new StringBuilder();
+    stringEntityBuilder.append(String.format("session.id=%s", sessionId));
+    for (Map.Entry<String, String> entry : params.entrySet()) {
+      if (stringEntityBuilder.length() > 0) {
+        stringEntityBuilder.append("&");
+      }
+      stringEntityBuilder.append(String.format("%s=%s", entry.getKey(), entry.getValue()));
+    }
+    StringEntity input = new StringEntity(stringEntityBuilder.toString());
     input.setContentType("application/x-www-form-urlencoded");
     postRequest.setEntity(input);
     postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
 
+    return postRequest;
+  }
+
+  private static Map<String, String> executeGetRequest(HttpGet getRequest) throws IOException {
+    // Make the call, get response
+    @Cleanup CloseableHttpClient httpClient = getHttpClient();
+    HttpResponse response = httpClient.execute(getRequest);
+    return handleResponse(response);
+  }
+
+  private static Map<String, String> executePostRequest(HttpPost postRequest) throws IOException {
     // Make the call, get response
     @Cleanup CloseableHttpClient httpClient = getHttpClient();
     HttpResponse response = httpClient.execute(postRequest);
-    handleResponse(response);
+    return handleResponse(response);
   }
 
-  private static void changeProjectDescription(String sessionId, String azkabanServerUrl, String azkabanProjectName,
-      String projectDescription)
+  private static String uploadZipFileToAzkaban(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+      String jobZipFile)
       throws IOException {
 
-    HttpGet getRequest;
-    try {
-      // Create get request (adding same proxy user multiple times is a non-issue, Azkaban handles it)
-      getRequest = new HttpGet(String
-          .format("%s/manager?ajax=changeDescription&session.id=%s&" + "project=%s&description=%s", azkabanServerUrl,
-              sessionId, azkabanProjectName, new URLCodec().encode(projectDescription)));
-    } catch (EncoderException e) {
-      throw new IOException("Could not encode Azkaban project description", e);
-    }
+    // Create post request
+    HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager");
+    HttpEntity entity = MultipartEntityBuilder
+        .create()
+        .addTextBody("session.id", sessionId)
+        .addTextBody("ajax", "upload")
+        .addBinaryBody("file", new File(jobZipFile),
+            ContentType.create("application/zip"), azkabanProjectName + ".zip")
+        .addTextBody("project", azkabanProjectName)
+        .build();
+    postRequest.setEntity(entity);
 
     // Make the call, get response
     @Cleanup CloseableHttpClient httpClient = getHttpClient();
-    HttpResponse response = httpClient.execute(getRequest);
-    handleResponse(response);
-  }
+    HttpResponse response = httpClient.execute(postRequest);
 
-  public static void notifyUberdistcp2ToolServer(String uberdistcp2ToolServer,
-      AzkabanProjectConfig azkabanProjectConfig)
-      throws IOException {
-    boolean isGoUrl = false;
-    if (!StringUtils.isBlank(uberdistcp2ToolServer)) {
-      if (uberdistcp2ToolServer.startsWith("https://go") || uberdistcp2ToolServer.startsWith("http://go")) {
-        isGoUrl = true;
-      }
-    }
+    // Obtaining projectId is hard. Uploading zip file is one avenue to get it from Azkaban
+    return handleResponse(response, "projectId").get("projectId");
   }
 
   private static CloseableHttpClient getHttpClient()
@@ -350,11 +418,9 @@ public class AzkabanAjaxAPIClient {
       // Handle error if any
       handleResponseError(jsonObject);
 
-      // Get required responseKeys
-      if (ArrayUtils.isNotEmpty(responseKeys)) {
-        for (String responseKey : responseKeys) {
-          responseMap.put(responseKey, jsonObject.get(responseKey).toString().replaceAll("\"", ""));
-        }
+      // Get all responseKeys
+      for(Map.Entry<String, JsonElement> entry : jsonObject.entrySet()) {
+        responseMap.put(entry.getKey(), entry.getValue().toString().replaceAll("\"", ""));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
index 627761e..a74a6ad 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
@@ -46,6 +46,13 @@ import com.google.common.collect.Lists;
 @Slf4j
 public class AzkabanJobHelper {
 
+  /***
+   * Checks if an Azkaban project exists by name.
+   * @param sessionId Session Id.
+   * @param azkabanProjectConfig Azkaban Project Config that contains project name.
+   * @return true if project exists else false.
+   * @throws IOException
+   */
   public static boolean isAzkabanJobPresent(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
     log.info("Checking if Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName() + " exists");
@@ -74,6 +81,13 @@ public class AzkabanJobHelper {
     }
   }
 
+  /***
+   * Get Project Id by an Azkaban Project Name.
+   * @param sessionId Session Id.
+   * @param azkabanProjectConfig Azkaban Project Config that contains project Name.
+   * @return Project Id.
+   * @throws IOException
+   */
   public static String getProjectId(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
     log.info("Getting project Id for project: " + azkabanProjectConfig.getAzkabanProjectName());
@@ -83,6 +97,14 @@ public class AzkabanJobHelper {
     return projectId;
   }
 
+  /***
+   * Create project on Azkaban based on Azkaban config. This includes preparing the zip file and uploading it to
+   * Azkaban, setting permissions and schedule.
+   * @param sessionId Session Id.
+   * @param azkabanProjectConfig Azkaban Project Config.
+   * @return Project Id.
+   * @throws IOException
+   */
   public static String createAzkabanJob(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
     log.info("Creating Azkaban project for: " + azkabanProjectConfig.getAzkabanProjectName());
@@ -98,6 +120,15 @@ public class AzkabanJobHelper {
     return projectId;
   }
 
+  /***
+   * Replace project on Azkaban based on Azkaban config. This includes preparing the zip file and uploading it to
+   * Azkaban, setting permissions and schedule.
+   * @param sessionId Session Id.
+   * @param azkabanProjectId Project Id.
+   * @param azkabanProjectConfig Azkaban Project Config.
+   * @return Project Id.
+   * @throws IOException
+   */
   public static String replaceAzkabanJob(String sessionId, String azkabanProjectId,
       AzkabanProjectConfig azkabanProjectConfig) throws IOException {
     log.info("Replacing zip for Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName());
@@ -113,6 +144,13 @@ public class AzkabanJobHelper {
     return projectId;
   }
 
+  /***
+   * Schedule an already created Azkaban project.
+   * @param sessionId Session Id.
+   * @param azkabanProjectId Project Id.
+   * @param azkabanProjectConfig Azkaban Project Config that contains schedule information.
+   * @throws IOException
+   */
   public static void scheduleJob(String sessionId, String azkabanProjectId,
       AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
@@ -120,6 +158,13 @@ public class AzkabanJobHelper {
     AzkabanAjaxAPIClient.scheduleAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig);
   }
 
+  /***
+   * Change the schedule of an already created Azkaban project.
+   * @param sessionId Session Id.
+   * @param azkabanProjectId Project Id.
+   * @param azkabanProjectConfig Azkaban Project Config that contains schedule information.
+   * @throws IOException
+   */
   public static void changeJobSchedule(String sessionId, String azkabanProjectId,
       AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
@@ -127,7 +172,28 @@ public class AzkabanJobHelper {
     AzkabanAjaxAPIClient.scheduleAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig);
   }
 
-  public static String createAzkabanJobZip(AzkabanProjectConfig azkabanProjectConfig)
+  /***
+   * Execute an already created Azkaban project.
+   * @param sessionId Session Id.
+   * @param azkabanProjectId Project Id.
+   * @param azkabanProjectConfig Azkaban Project Config that contains schedule information.
+   * @throws IOException
+   */
+  public static void executeJob(String sessionId, String azkabanProjectId,
+      AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    log.info("Executing Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName());
+    AzkabanAjaxAPIClient.executeAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig);
+  }
+
+  /***
+   * Create Azkaban project zip file.
+   * @param azkabanProjectConfig Azkaban Project Config that contains information about what to include in
+   *                             zip file.
+   * @return Zip file path.
+   * @throws IOException
+   */
+  private static String createAzkabanJobZip(AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
     log.info("Creating Azkaban job zip file for project: " + azkabanProjectConfig.getAzkabanProjectName());
     String workDir = azkabanProjectConfig.getWorkDir();
@@ -153,7 +219,7 @@ public class AzkabanJobHelper {
           jobJarFile = downloadAzkabanJobJar(workDir, jobJarUrl);
           filesToAdd.add(jobJarFile);
         } catch (IOException e) {
-          if(failIfJarNotFound) {
+          if (failIfJarNotFound) {
             throw e;
           }
           log.warn("Could not download: " + jobJarFile);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
index ddae3d9..2bac65d 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
@@ -69,7 +69,7 @@ public class AzkabanProjectConfig {
     this.azkabanProjectName = constructProjectName(jobSpec, config);
     this.azkabanProjectDescription = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_DESCRIPTION_KEY);
     this.azkabanProjectFlowName = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_FLOW_NAME_KEY);
-    this.azkabanGroupAdminUsers = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_GROUP_ADMINS_KEY);
+    this.azkabanGroupAdminUsers = ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_GROUP_ADMINS_KEY, "");
     this.azkabanUserToProxy = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_USER_TO_PROXY_KEY, null));
 
     // Azkaban Project Zip

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
index 5471f0c..f73bc6c 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
@@ -39,7 +39,7 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
     implements SpecExecutorInstanceProducer<Spec>, Closeable {
 
   // Session Id for GaaS User
-  private String sessionId;
+  private String _sessionId;
 
 
   public AzkabanSpecExecutorInstanceProducer(Config config, Optional<Logger> log) {
@@ -51,7 +51,7 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
       String azkabanPassword = getAzkabanPassword(config);
       String azkabanServerUrl = config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY);
 
-      sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl);
+      _sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl);
     } catch (IOException | EncoderException e) {
       throw new RuntimeException("Could not authenticate with Azkaban", e);
     }
@@ -82,37 +82,46 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
   @Override
   public Future<?> addSpec(Spec addedSpec) {
     // If project already exists, execute it
-
-    // If project does not already exists, create and execute it
-    AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) addedSpec);
     try {
-      _log.info("Setting up your Azkaban Project for: " + azkabanProjectConfig.getAzkabanProjectName());
-
-      // Deleted project also returns true if-project-exists check, so optimistically first create the project
-      // .. (it will create project if it was never created or deleted), if project exists it will fail with
-      // .. appropriate exception message, catch that and run in replace project mode if force overwrite is
-      // .. specified
-      try {
-        createNewAzkabanProject(sessionId, azkabanProjectConfig);
-      } catch (IOException e) {
-        if ("Project already exists.".equalsIgnoreCase(e.getMessage())) {
-          if (ConfigUtils.getBoolean(((JobSpec) addedSpec).getConfig(),
-              ServiceAzkabanConfigKeys.AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY, false)) {
-            _log.info("Project already exists for this Spec, but force overwrite specified");
-            updateExistingAzkabanProject(sessionId, azkabanProjectConfig);
+      AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) addedSpec);
+      boolean azkabanProjectExists = AzkabanJobHelper.isAzkabanJobPresent(_sessionId, azkabanProjectConfig);
+
+      // If project does not already exists, create and execute it
+      if (azkabanProjectExists) {
+        _log.info("Executing Azkaban Project: " + azkabanProjectConfig.getAzkabanProjectName());
+        AzkabanJobHelper.executeJob(_sessionId, AzkabanJobHelper.getProjectId(_sessionId, azkabanProjectConfig),
+            azkabanProjectConfig);
+      } else {
+        _log.info("Setting up Azkaban Project: " + azkabanProjectConfig.getAzkabanProjectName());
+
+        // Deleted project also returns true if-project-exists check, so optimistically first create the project
+        // .. (it will create project if it was never created or deleted), if project exists it will fail with
+        // .. appropriate exception message, catch that and run in replace project mode if force overwrite is
+        // .. specified
+        try {
+          createNewAzkabanProject(_sessionId, azkabanProjectConfig);
+        } catch (IOException e) {
+          if ("Project already exists.".equalsIgnoreCase(e.getMessage())) {
+            if (ConfigUtils.getBoolean(((JobSpec) addedSpec).getConfig(),
+                ServiceAzkabanConfigKeys.AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY, false)) {
+              _log.info("Project already exists for this Spec, but force overwrite specified");
+              updateExistingAzkabanProject(_sessionId, azkabanProjectConfig);
+            } else {
+              _log.info(String.format("Azkaban project already exists: " + "%smanager?project=%s",
+                  azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName()));
+            }
           } else {
-            _log.info(String.format("Azkaban project already exists: " + "%smanager?project=%s",
-                azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName()));
+            throw e;
           }
-        } else {
-          throw e;
         }
       }
+
+
     } catch (IOException e) {
       throw new RuntimeException("Issue in setting up Azkaban project.", e);
     }
 
-    return null;
+    return new CompletedFuture<>(_config, null);
   }
 
   @Override
@@ -121,7 +130,7 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
     AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) updatedSpec);
 
     try {
-      updateExistingAzkabanProject(sessionId, azkabanProjectConfig);
+      updateExistingAzkabanProject(_sessionId, azkabanProjectConfig);
     } catch (IOException e) {
       throw new RuntimeException("Issue in setting up Azkaban project.", e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gradle/scripts/dependencyDefinitions.gradle
----------------------------------------------------------------------
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index 41e1485..1e86b96 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -69,6 +69,7 @@ ext.externalDependency = [
     "hiveExec": "org.apache.hive:hive-exec:" + hiveVersion + ":core",
     "hiveSerDe": "org.apache.hive:hive-serde:" + hiveVersion,
     "httpclient": "org.apache.httpcomponents:httpclient:4.5.2",
+    "httpmime": "org.apache.httpcomponents:httpmime:4.5.2",
     "httpcore": "org.apache.httpcomponents:httpcore:4.4.4",
     "httpasyncclient": "org.apache.httpcomponents:httpasyncclient:4.1.3",
     "jgit":"org.eclipse.jgit:org.eclipse.jgit:4.8.0.201706111038-r",