You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/08/30 15:55:32 UTC
[6/9] incubator-gobblin git commit: Support to execut Azkaban project
from Orchestrator
Support to execut Azkaban project from Orchestrator
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0bb5139c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0bb5139c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0bb5139c
Branch: refs/heads/master
Commit: 0bb5139c8822ded33295b9eb118b67df1cb9f418
Parents: e285202
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Wed Aug 30 03:36:04 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Aug 30 03:36:04 2017 -0700
----------------------------------------------------------------------
.../orchestration/AzkabanAjaxAPIClient.java | 340 +++++++++++--------
.../modules/orchestration/AzkabanJobHelper.java | 70 +++-
.../orchestration/AzkabanProjectConfig.java | 2 +-
.../AzkabanSpecExecutorInstanceProducer.java | 59 ++--
gradle/scripts/dependencyDefinitions.gradle | 1 +
5 files changed, 307 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
index 31fc753..d0b8471 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
@@ -19,8 +19,6 @@ package org.apache.gobblin.service.modules.orchestration;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
@@ -37,7 +35,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.EncoderException;
import org.apache.commons.codec.net.URLCodec;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
@@ -55,13 +52,14 @@ import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.TrustStrategy;
import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
@Slf4j
public class AzkabanAjaxAPIClient {
-
private static Splitter SPLIT_ON_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
// TODO: Ensure GET call urls do not grow too big
@@ -71,93 +69,107 @@ public class AzkabanAjaxAPIClient {
private static final long MILLISECONDS_IN_HOUR = 60 * 60 * 1000;
private static final URLCodec codec = new URLCodec();
+ /***
+ * Authenticate a user and obtain a session.id from response. Once a session.id has been obtained,
+ * until the session expires, this id can be used to do any API requests with a proper permission granted.
+ * A session expires if user log's out, changes machine, browser or location, if Azkaban is restarted,
+ * or if the session expires. The default session timeout is 24 hours (one day). User can re-login irrespective
+ * of wheter the session has expired or not. For the same user, a new session will always override the old one.
+ * @param username Username.
+ * @param password Password.
+ * @param azkabanServerUrl Azkaban Server Url.
+ * @return Session Id.
+ * @throws IOException
+ * @throws EncoderException
+ */
public static String authenticateAndGetSessionId(String username, String password, String azkabanServerUrl)
throws IOException, EncoderException {
// Create post request
- HttpPost postRequest = new HttpPost(azkabanServerUrl);
- StringEntity input = new StringEntity(String.format("action=%s&username=%s&password=%s", "login",
- username, codec.encode(password)));
- input.setContentType("application/x-www-form-urlencoded");
- postRequest.setEntity(input);
- postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
+ Map<String, String> params = Maps.newHashMap();
+ params.put("action", "login");
+ params.put("username", username);
+ params.put("password", codec.encode(password));
- // Make the call, get response
- @Cleanup CloseableHttpClient httpClient = getHttpClient();
- HttpResponse response = httpClient.execute(postRequest);
-
- return handleResponse(response, "session.id").get("session.id");
+ return executePostRequest(preparePostRequest(azkabanServerUrl, null, params)).get("session.id");
}
+ /***
+ * Get project.id for a Project Name.
+ * @param sessionId Session Id.
+ * @param azkabanProjectConfig Azkaban Project Config.
+ * @return Project Id.
+ * @throws IOException
+ */
public static String getProjectId(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException {
// Note: Every get call to Azkaban provides a projectId in response, so we have are using fetchProjectFlows call
// .. because it does not need any additional params other than project name
- // Create get request
- HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=fetchprojectflows&session.id=%s&"
- + "project=%s", azkabanProjectConfig.getAzkabanServerUrl(), sessionId,
- azkabanProjectConfig.getAzkabanProjectName()));
+ Map<String, String> params = Maps.newHashMap();
+ params.put("ajax", "fetchprojectflows");
+ params.put("project", azkabanProjectConfig.getAzkabanProjectName());
- // Make the call, get response
- @Cleanup CloseableHttpClient httpClient = getHttpClient();
- HttpResponse response = httpClient.execute(getRequest);
- return handleResponse(response, "projectId").get("projectId");
+ return executeGetRequest(prepareGetRequest(azkabanProjectConfig.getAzkabanServerUrl() + "/manager",
+ sessionId, params)).get("projectId");
}
+ /***
+ * Creates an Azkaban project and uploads the zip file. If proxy user and group permissions are specified in
+ * Azkaban Project Config, then this method also adds it to the project configuration.
+ * @param sessionId Session Id.
+ * @param zipFilePath Zip file to upload.
+ * @param azkabanProjectConfig Azkaban Project Config.
+ * @return Project Id.
+ * @throws IOException
+ */
public static String createAzkabanProject(String sessionId, String zipFilePath,
AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
+ Map<String, String> params = Maps.newHashMap();
+ params.put("ajax", "executeFlow");
+ params.put("name", azkabanProjectConfig.getAzkabanProjectName());
+ params.put("description", azkabanProjectConfig.getAzkabanProjectDescription());
- String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
- String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
- String azkabanProjectDescription = azkabanProjectConfig.getAzkabanProjectDescription();
- String groupAdminUsers = azkabanProjectConfig.getAzkabanGroupAdminUsers();
-
- // Create post request
- HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager?action=create");
- StringEntity input = new StringEntity(String.format("session.id=%s&name=%s&description=%s", sessionId,
- azkabanProjectName, azkabanProjectDescription));
- input.setContentType("application/x-www-form-urlencoded");
- postRequest.setEntity(input);
- postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
-
- // Make the call, get response
- @Cleanup CloseableHttpClient httpClient = getHttpClient();
- HttpResponse response = httpClient.execute(postRequest);
- handleResponse(response);
+ executePostRequest(preparePostRequest(azkabanProjectConfig.getAzkabanServerUrl() +
+ "/manager?action=create", sessionId, params));
// Add proxy user if any
if (azkabanProjectConfig.getAzkabanUserToProxy().isPresent()) {
Iterable<String> proxyUsers = SPLIT_ON_COMMA.split(azkabanProjectConfig.getAzkabanUserToProxy().get());
for (String user : proxyUsers) {
- addProxyUser(sessionId, azkabanServerUrl, azkabanProjectName, user);
+ addProxyUser(sessionId, azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName(), user);
}
}
// Add group permissions if any
// TODO: Support users (not just groups), and different permission types
// (though we can add users, we only support groups at the moment and award them with admin permissions)
- if (StringUtils.isNotBlank(groupAdminUsers)) {
- String [] groups = StringUtils.split(groupAdminUsers, ",");
+ if (StringUtils.isNotBlank(azkabanProjectConfig.getAzkabanGroupAdminUsers())) {
+ String [] groups = StringUtils.split(azkabanProjectConfig.getAzkabanGroupAdminUsers(), ",");
for (String group : groups) {
- addUserPermission(sessionId, azkabanServerUrl, azkabanProjectName, group, true, true, false, false,
- false, false);
+ addUserPermission(sessionId, azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName(),
+ group, true, true, false, false,false,
+ false);
}
}
// Upload zip file to azkaban and return projectId
- return uploadZipFileToAzkaban(sessionId, azkabanServerUrl, azkabanProjectName, zipFilePath);
+ return uploadZipFileToAzkaban(sessionId, azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName(), zipFilePath);
}
+ /***
+ * Replace an existing Azkaban Project. If proxy user and group permissions are specified in
+ * Azkaban Project Config, then this method also adds it to the project configuration.
+ * @param sessionId Session Id.
+ * @param zipFilePath Zip file to upload.
+ * @param azkabanProjectConfig Azkaban Project Config.
+ * @return Project Id.
+ * @throws IOException
+ */
public static String replaceAzkabanProject(String sessionId, String zipFilePath,
AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
-
- String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
- String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
- String azkabanProjectDescription = azkabanProjectConfig.getAzkabanProjectDescription();
- String groupAdminUsers = azkabanProjectConfig.getAzkabanGroupAdminUsers();
-
// Change project description
- changeProjectDescription(sessionId, azkabanServerUrl, azkabanProjectName, azkabanProjectDescription);
+ changeProjectDescription(sessionId, azkabanProjectConfig.getAzkabanServerUrl(),
+ azkabanProjectConfig.getAzkabanProjectName(), azkabanProjectConfig.getAzkabanProjectDescription());
// Add proxy user if any
// Note: 1. We cannot remove previous proxy-user because there is no way to read it from Azkaban
@@ -166,7 +178,8 @@ public class AzkabanAjaxAPIClient {
if (azkabanProjectConfig.getAzkabanUserToProxy().isPresent()) {
Iterable<String> proxyUsers = SPLIT_ON_COMMA.split(azkabanProjectConfig.getAzkabanUserToProxy().get());
for (String user : proxyUsers) {
- addProxyUser(sessionId, azkabanServerUrl, azkabanProjectName, user);
+ addProxyUser(sessionId, azkabanProjectConfig.getAzkabanServerUrl(),
+ azkabanProjectConfig.getAzkabanProjectName(), user);
}
}
@@ -175,12 +188,13 @@ public class AzkabanAjaxAPIClient {
// Note: 1. We cannot remove previous group-user because there is no way to read it from Azkaban
// 2. Adding same group-user will return an error message, but we will ignore it
// (though we can add users, we only support groups at the moment and award them with admin permissions)
- if (StringUtils.isNotBlank(groupAdminUsers)) {
- String [] groups = StringUtils.split(groupAdminUsers, ",");
+ if (StringUtils.isNotBlank(azkabanProjectConfig.getAzkabanGroupAdminUsers())) {
+ String [] groups = StringUtils.split(azkabanProjectConfig.getAzkabanGroupAdminUsers(), ",");
for (String group : groups) {
try {
- addUserPermission(sessionId, azkabanServerUrl, azkabanProjectName, group, true, true, false, false, false,
- false);
+ addUserPermission(sessionId, azkabanProjectConfig.getAzkabanServerUrl(),
+ azkabanProjectConfig.getAzkabanProjectName(), group, true, true,
+ false, false, false,false);
} catch (IOException e) {
// Ignore if group already exists, we cannot list existing groups; so its okay to attempt adding exiting
// .. groups
@@ -192,21 +206,20 @@ public class AzkabanAjaxAPIClient {
}
// Upload zip file to azkaban and return projectId
- return uploadZipFileToAzkaban(sessionId, azkabanServerUrl, azkabanProjectName, zipFilePath);
+ return uploadZipFileToAzkaban(sessionId, azkabanProjectConfig.getAzkabanServerUrl(),
+ azkabanProjectConfig.getAzkabanProjectName(), zipFilePath);
}
private static void addProxyUser(String sessionId, String azkabanServerUrl, String azkabanProjectName,
String proxyUser)
throws IOException {
-
// Create get request (adding same proxy user multiple times is a non-issue, Azkaban handles it)
- HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=addProxyUser&session.id=%s&"
- + "project=%s&name=%s", azkabanServerUrl, sessionId, azkabanProjectName, proxyUser));
+ Map<String, String> params = Maps.newHashMap();
+ params.put("ajax", "addProxyUser");
+ params.put("project", azkabanProjectName);
+ params.put("name", proxyUser);
- // Make the call, get response
- @Cleanup CloseableHttpClient httpClient = getHttpClient();
- HttpResponse response = httpClient.execute(getRequest);
- handleResponse(response);
+ executeGetRequest(prepareGetRequest(azkabanServerUrl + "/manager", sessionId, params));
}
private static void addUserPermission(String sessionId, String azkabanServerUrl, String azkabanProjectName,
@@ -219,100 +232,155 @@ public class AzkabanAjaxAPIClient {
// Create get request (adding same normal user permission multiple times will throw an error, but we cannot
// list whole list of permissions anyways)
- HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=addPermission&session.id=%s&"
- + "project=%s&name=%s&group=%s&permissions[admin]=%s&permissions[read]=%s&permissions[write]=%s"
- + "&permissions[execute]=%s&permissions[schedule]=%s", azkabanServerUrl, sessionId, azkabanProjectName, name,
- isGroup, adminPermission, readPermission, writePermission, executePermission, schedulePermission));
-
- // Make the call, get response
- @Cleanup CloseableHttpClient httpClient = getHttpClient();
- HttpResponse response = httpClient.execute(getRequest);
- handleResponse(response);
+ Map<String, String> params = Maps.newHashMap();
+ params.put("ajax", "addPermission");
+ params.put("project", azkabanProjectName);
+ params.put("name", name);
+ params.put("group", Boolean.toString(isGroup));
+ params.put("permissions[admin]", Boolean.toString(adminPermission));
+ params.put("permissions[read]", Boolean.toString(readPermission));
+ params.put("permissions[write]", Boolean.toString(writePermission));
+ params.put("permissions[execute]", Boolean.toString(executePermission));
+ params.put("permissions[schedule]", Boolean.toString(schedulePermission));
+
+ executeGetRequest(prepareGetRequest(azkabanServerUrl + "/manager", sessionId, params));
}
- private static String uploadZipFileToAzkaban(String sessionId, String azkabanServerUrl, String azkabanProjectName,
- String jobZipFile)
+ /***
+ * Schedule the Azkaban Project to run with a schedule.
+ * @param sessionId Session Id.
+ * @param azkabanProjectId Project Id.
+ * @param azkabanProjectConfig Azkaban Project Config.
+ * @throws IOException
+ */
+ public static void scheduleAzkabanProject(String sessionId, String azkabanProjectId,
+ AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
+ Map<String, String> params = Maps.newHashMap();
+ params.put("ajax", "scheduleFlow");
+ params.put("projectName", azkabanProjectConfig.getAzkabanProjectName());
+ params.put("flow", azkabanProjectConfig.getAzkabanProjectFlowName());
+ params.put("projectId", azkabanProjectId);
+ params.put("scheduleTime", getScheduledTimeInAzkabanFormat(LOW_NETWORK_TRAFFIC_BEGIN_HOUR,
+ LOW_NETWORK_TRAFFIC_END_HOUR, JOB_START_DELAY_MINUTES));
+ params.put("scheduleDate", getScheduledDateInAzkabanFormat());
+ params.put("is_recurring", "off");
+
+ // Run once OR push down schedule (TODO: Enable when push down is finalized)
+ // if (azkabanProjectConfig.isScheduled()) {
+ // params.put("is_recurring", "on");
+ // params.put("period", "1d");
+ // } else {
+ // params.put("is_recurring", "off");
+ // }
+
+ executePostRequest(preparePostRequest(azkabanProjectConfig.getAzkabanServerUrl() + "/schedule", sessionId, params));
+ }
- // Create post request
- HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager");
- HttpEntity entity = MultipartEntityBuilder
- .create()
- .addTextBody("session.id", sessionId)
- .addTextBody("ajax", "upload")
- .addBinaryBody("file", new File(jobZipFile),
- ContentType.create("application/zip"), azkabanProjectName + ".zip")
- .addTextBody("project", azkabanProjectName)
- .build();
- postRequest.setEntity(entity);
+ private static void changeProjectDescription(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+ String projectDescription)
+ throws IOException {
+ String encodedProjectDescription;
+ try {
+ encodedProjectDescription = new URLCodec().encode(projectDescription);
+ } catch (EncoderException e) {
+ throw new IOException("Could not encode Azkaban project description", e);
+ }
- // Make the call, get response
- @Cleanup CloseableHttpClient httpClient = getHttpClient();
- HttpResponse response = httpClient.execute(postRequest);
+ Map<String, String> params = Maps.newHashMap();
+ params.put("ajax", "changeDescription");
+ params.put("project", azkabanProjectName);
+ params.put("description", encodedProjectDescription);
- // Obtaining projectId is hard. Uploading zip file is one avenue to get it from Azkaban
- return handleResponse(response, "projectId").get("projectId");
+ executeGetRequest(prepareGetRequest(azkabanServerUrl + "/manager", sessionId, params));
}
- public static void scheduleAzkabanProject(String sessionId, String azkabanProjectId,
+ /***
+ * Execute an existing Azkaban project.
+ * @param sessionId Session Id.
+ * @param azkabanProjectId Project Id.
+ * @param azkabanProjectConfig Azkaban Project Config.
+ * @throws IOException
+ */
+ public static void executeAzkabanProject(String sessionId, String azkabanProjectId,
AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
- String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
- String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
- String azkabanProjectFlowName = azkabanProjectConfig.getAzkabanProjectFlowName();
+ Map<String, String> params = Maps.newHashMap();
+ params.put("ajax", "executeFlow");
+ params.put("projectName", azkabanProjectConfig.getAzkabanProjectName());
+ params.put("flow", azkabanProjectConfig.getAzkabanProjectFlowName());
+
+ executePostRequest(preparePostRequest(azkabanProjectConfig.getAzkabanServerUrl() + "/executor", sessionId, params));
+ }
+
+ private static HttpGet prepareGetRequest(String requestUrl, String sessionId, Map<String, String> params)
+ throws IOException {
+ // Create get request
+ StringBuilder stringEntityBuilder = new StringBuilder();
+ stringEntityBuilder.append(String.format("?session.id=%s", sessionId));
+ for (Map.Entry<String, String> entry : params.entrySet()) {
+ stringEntityBuilder.append(String.format("&%s=%s", entry.getKey(), entry.getValue()));
+ }
- String scheduleString = "is_recurring=off"; // run only once
- // TODO: Enable scheduling on Azkaban, when we are ready to push down the schedule
-// if (azkabanProjectConfig.isScheduled()) {
-// scheduleString = "is_recurring=on&period=1d"; // schedule once every day
-// }
+ return new HttpGet(requestUrl + stringEntityBuilder);
+ }
+ private static HttpPost preparePostRequest(String requestUrl, String sessionId, Map<String, String> params)
+ throws IOException {
// Create post request
- HttpPost postRequest = new HttpPost(azkabanServerUrl + "/schedule");
- StringEntity input = new StringEntity(String.format("session.id=%s&ajax=scheduleFlow"
- + "&projectName=%s&flow=%s&projectId=%s&scheduleTime=%s&scheduleDate=%s&%s",
- sessionId, azkabanProjectName, azkabanProjectFlowName, azkabanProjectId,
- getScheduledTimeInAzkabanFormat(LOW_NETWORK_TRAFFIC_BEGIN_HOUR, LOW_NETWORK_TRAFFIC_END_HOUR,
- JOB_START_DELAY_MINUTES), getScheduledDateInAzkabanFormat(), scheduleString));
+ HttpPost postRequest = new HttpPost(requestUrl);
+ StringBuilder stringEntityBuilder = new StringBuilder();
+ stringEntityBuilder.append(String.format("session.id=%s", sessionId));
+ for (Map.Entry<String, String> entry : params.entrySet()) {
+ if (stringEntityBuilder.length() > 0) {
+ stringEntityBuilder.append("&");
+ }
+ stringEntityBuilder.append(String.format("%s=%s", entry.getKey(), entry.getValue()));
+ }
+ StringEntity input = new StringEntity(stringEntityBuilder.toString());
input.setContentType("application/x-www-form-urlencoded");
postRequest.setEntity(input);
postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
+ return postRequest;
+ }
+
+ private static Map<String, String> executeGetRequest(HttpGet getRequest) throws IOException {
+ // Make the call, get response
+ @Cleanup CloseableHttpClient httpClient = getHttpClient();
+ HttpResponse response = httpClient.execute(getRequest);
+ return handleResponse(response);
+ }
+
+ private static Map<String, String> executePostRequest(HttpPost postRequest) throws IOException {
// Make the call, get response
@Cleanup CloseableHttpClient httpClient = getHttpClient();
HttpResponse response = httpClient.execute(postRequest);
- handleResponse(response);
+ return handleResponse(response);
}
- private static void changeProjectDescription(String sessionId, String azkabanServerUrl, String azkabanProjectName,
- String projectDescription)
+ private static String uploadZipFileToAzkaban(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+ String jobZipFile)
throws IOException {
- HttpGet getRequest;
- try {
- // Create get request (adding same proxy user multiple times is a non-issue, Azkaban handles it)
- getRequest = new HttpGet(String
- .format("%s/manager?ajax=changeDescription&session.id=%s&" + "project=%s&description=%s", azkabanServerUrl,
- sessionId, azkabanProjectName, new URLCodec().encode(projectDescription)));
- } catch (EncoderException e) {
- throw new IOException("Could not encode Azkaban project description", e);
- }
+ // Create post request
+ HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager");
+ HttpEntity entity = MultipartEntityBuilder
+ .create()
+ .addTextBody("session.id", sessionId)
+ .addTextBody("ajax", "upload")
+ .addBinaryBody("file", new File(jobZipFile),
+ ContentType.create("application/zip"), azkabanProjectName + ".zip")
+ .addTextBody("project", azkabanProjectName)
+ .build();
+ postRequest.setEntity(entity);
// Make the call, get response
@Cleanup CloseableHttpClient httpClient = getHttpClient();
- HttpResponse response = httpClient.execute(getRequest);
- handleResponse(response);
- }
+ HttpResponse response = httpClient.execute(postRequest);
- public static void notifyUberdistcp2ToolServer(String uberdistcp2ToolServer,
- AzkabanProjectConfig azkabanProjectConfig)
- throws IOException {
- boolean isGoUrl = false;
- if (!StringUtils.isBlank(uberdistcp2ToolServer)) {
- if (uberdistcp2ToolServer.startsWith("https://go") || uberdistcp2ToolServer.startsWith("http://go")) {
- isGoUrl = true;
- }
- }
+ // Obtaining projectId is hard. Uploading zip file is one avenue to get it from Azkaban
+ return handleResponse(response, "projectId").get("projectId");
}
private static CloseableHttpClient getHttpClient()
@@ -350,11 +418,9 @@ public class AzkabanAjaxAPIClient {
// Handle error if any
handleResponseError(jsonObject);
- // Get required responseKeys
- if (ArrayUtils.isNotEmpty(responseKeys)) {
- for (String responseKey : responseKeys) {
- responseMap.put(responseKey, jsonObject.get(responseKey).toString().replaceAll("\"", ""));
- }
+ // Get all responseKeys
+ for(Map.Entry<String, JsonElement> entry : jsonObject.entrySet()) {
+ responseMap.put(entry.getKey(), entry.getValue().toString().replaceAll("\"", ""));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
index 627761e..a74a6ad 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
@@ -46,6 +46,13 @@ import com.google.common.collect.Lists;
@Slf4j
public class AzkabanJobHelper {
+ /***
+ * Checks if an Azkaban project exists by name.
+ * @param sessionId Session Id.
+ * @param azkabanProjectConfig Azkaban Project Config that contains project name.
+ * @return true if project exists else false.
+ * @throws IOException
+ */
public static boolean isAzkabanJobPresent(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
log.info("Checking if Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName() + " exists");
@@ -74,6 +81,13 @@ public class AzkabanJobHelper {
}
}
+ /***
+ * Get Project Id by an Azkaban Project Name.
+ * @param sessionId Session Id.
+ * @param azkabanProjectConfig Azkaban Project Config that contains project Name.
+ * @return Project Id.
+ * @throws IOException
+ */
public static String getProjectId(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
log.info("Getting project Id for project: " + azkabanProjectConfig.getAzkabanProjectName());
@@ -83,6 +97,14 @@ public class AzkabanJobHelper {
return projectId;
}
+ /***
+ * Create project on Azkaban based on Azkaban config. This includes preparing the zip file and uploading it to
+ * Azkaban, setting permissions and schedule.
+ * @param sessionId Session Id.
+ * @param azkabanProjectConfig Azkaban Project Config.
+ * @return Project Id.
+ * @throws IOException
+ */
public static String createAzkabanJob(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
log.info("Creating Azkaban project for: " + azkabanProjectConfig.getAzkabanProjectName());
@@ -98,6 +120,15 @@ public class AzkabanJobHelper {
return projectId;
}
+ /***
+ * Replace project on Azkaban based on Azkaban config. This includes preparing the zip file and uploading it to
+ * Azkaban, setting permissions and schedule.
+ * @param sessionId Session Id.
+ * @param azkabanProjectId Project Id.
+ * @param azkabanProjectConfig Azkaban Project Config.
+ * @return Project Id.
+ * @throws IOException
+ */
public static String replaceAzkabanJob(String sessionId, String azkabanProjectId,
AzkabanProjectConfig azkabanProjectConfig) throws IOException {
log.info("Replacing zip for Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName());
@@ -113,6 +144,13 @@ public class AzkabanJobHelper {
return projectId;
}
+ /***
+ * Schedule an already created Azkaban project.
+ * @param sessionId Session Id.
+ * @param azkabanProjectId Project Id.
+ * @param azkabanProjectConfig Azkaban Project Config that contains schedule information.
+ * @throws IOException
+ */
public static void scheduleJob(String sessionId, String azkabanProjectId,
AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
@@ -120,6 +158,13 @@ public class AzkabanJobHelper {
AzkabanAjaxAPIClient.scheduleAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig);
}
+ /***
+ * Change the schedule of an already created Azkaban project.
+ * @param sessionId Session Id.
+ * @param azkabanProjectId Project Id.
+ * @param azkabanProjectConfig Azkaban Project Config that contains schedule information.
+ * @throws IOException
+ */
public static void changeJobSchedule(String sessionId, String azkabanProjectId,
AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
@@ -127,7 +172,28 @@ public class AzkabanJobHelper {
AzkabanAjaxAPIClient.scheduleAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig);
}
- public static String createAzkabanJobZip(AzkabanProjectConfig azkabanProjectConfig)
+ /***
+ * Execute an already created Azkaban project.
+ * @param sessionId Session Id.
+ * @param azkabanProjectId Project Id.
+ * @param azkabanProjectConfig Azkaban Project Config that contains schedule information.
+ * @throws IOException
+ */
+ public static void executeJob(String sessionId, String azkabanProjectId,
+ AzkabanProjectConfig azkabanProjectConfig)
+ throws IOException {
+ log.info("Executing Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName());
+ AzkabanAjaxAPIClient.executeAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig);
+ }
+
+ /***
+ * Create Azkaban project zip file.
+ * @param azkabanProjectConfig Azkaban Project Config that contains information about what to include in
+ * zip file.
+ * @return Zip file path.
+ * @throws IOException
+ */
+ private static String createAzkabanJobZip(AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
log.info("Creating Azkaban job zip file for project: " + azkabanProjectConfig.getAzkabanProjectName());
String workDir = azkabanProjectConfig.getWorkDir();
@@ -153,7 +219,7 @@ public class AzkabanJobHelper {
jobJarFile = downloadAzkabanJobJar(workDir, jobJarUrl);
filesToAdd.add(jobJarFile);
} catch (IOException e) {
- if(failIfJarNotFound) {
+ if (failIfJarNotFound) {
throw e;
}
log.warn("Could not download: " + jobJarFile);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
index ddae3d9..2bac65d 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
@@ -69,7 +69,7 @@ public class AzkabanProjectConfig {
this.azkabanProjectName = constructProjectName(jobSpec, config);
this.azkabanProjectDescription = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_DESCRIPTION_KEY);
this.azkabanProjectFlowName = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_FLOW_NAME_KEY);
- this.azkabanGroupAdminUsers = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_GROUP_ADMINS_KEY);
+ this.azkabanGroupAdminUsers = ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_GROUP_ADMINS_KEY, "");
this.azkabanUserToProxy = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_USER_TO_PROXY_KEY, null));
// Azkaban Project Zip
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
index 5471f0c..f73bc6c 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
@@ -39,7 +39,7 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
implements SpecExecutorInstanceProducer<Spec>, Closeable {
// Session Id for GaaS User
- private String sessionId;
+ private String _sessionId;
public AzkabanSpecExecutorInstanceProducer(Config config, Optional<Logger> log) {
@@ -51,7 +51,7 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
String azkabanPassword = getAzkabanPassword(config);
String azkabanServerUrl = config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY);
- sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl);
+ _sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl);
} catch (IOException | EncoderException e) {
throw new RuntimeException("Could not authenticate with Azkaban", e);
}
@@ -82,37 +82,46 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
@Override
public Future<?> addSpec(Spec addedSpec) {
// If project already exists, execute it
-
- // If project does not already exists, create and execute it
- AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) addedSpec);
try {
- _log.info("Setting up your Azkaban Project for: " + azkabanProjectConfig.getAzkabanProjectName());
-
- // Deleted project also returns true if-project-exists check, so optimistically first create the project
- // .. (it will create project if it was never created or deleted), if project exists it will fail with
- // .. appropriate exception message, catch that and run in replace project mode if force overwrite is
- // .. specified
- try {
- createNewAzkabanProject(sessionId, azkabanProjectConfig);
- } catch (IOException e) {
- if ("Project already exists.".equalsIgnoreCase(e.getMessage())) {
- if (ConfigUtils.getBoolean(((JobSpec) addedSpec).getConfig(),
- ServiceAzkabanConfigKeys.AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY, false)) {
- _log.info("Project already exists for this Spec, but force overwrite specified");
- updateExistingAzkabanProject(sessionId, azkabanProjectConfig);
+ AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) addedSpec);
+ boolean azkabanProjectExists = AzkabanJobHelper.isAzkabanJobPresent(_sessionId, azkabanProjectConfig);
+
+ // If project does not already exists, create and execute it
+ if (azkabanProjectExists) {
+ _log.info("Executing Azkaban Project: " + azkabanProjectConfig.getAzkabanProjectName());
+ AzkabanJobHelper.executeJob(_sessionId, AzkabanJobHelper.getProjectId(_sessionId, azkabanProjectConfig),
+ azkabanProjectConfig);
+ } else {
+ _log.info("Setting up Azkaban Project: " + azkabanProjectConfig.getAzkabanProjectName());
+
+ // Deleted project also returns true if-project-exists check, so optimistically first create the project
+ // .. (it will create project if it was never created or deleted), if project exists it will fail with
+ // .. appropriate exception message, catch that and run in replace project mode if force overwrite is
+ // .. specified
+ try {
+ createNewAzkabanProject(_sessionId, azkabanProjectConfig);
+ } catch (IOException e) {
+ if ("Project already exists.".equalsIgnoreCase(e.getMessage())) {
+ if (ConfigUtils.getBoolean(((JobSpec) addedSpec).getConfig(),
+ ServiceAzkabanConfigKeys.AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY, false)) {
+ _log.info("Project already exists for this Spec, but force overwrite specified");
+ updateExistingAzkabanProject(_sessionId, azkabanProjectConfig);
+ } else {
+ _log.info(String.format("Azkaban project already exists: " + "%smanager?project=%s",
+ azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName()));
+ }
} else {
- _log.info(String.format("Azkaban project already exists: " + "%smanager?project=%s",
- azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName()));
+ throw e;
}
- } else {
- throw e;
}
}
+
+
} catch (IOException e) {
throw new RuntimeException("Issue in setting up Azkaban project.", e);
}
- return null;
+ return new CompletedFuture<>(_config, null);
}
@Override
@@ -121,7 +130,7 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) updatedSpec);
try {
- updateExistingAzkabanProject(sessionId, azkabanProjectConfig);
+ updateExistingAzkabanProject(_sessionId, azkabanProjectConfig);
} catch (IOException e) {
throw new RuntimeException("Issue in setting up Azkaban project.", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gradle/scripts/dependencyDefinitions.gradle
----------------------------------------------------------------------
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index 41e1485..1e86b96 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -69,6 +69,7 @@ ext.externalDependency = [
"hiveExec": "org.apache.hive:hive-exec:" + hiveVersion + ":core",
"hiveSerDe": "org.apache.hive:hive-serde:" + hiveVersion,
"httpclient": "org.apache.httpcomponents:httpclient:4.5.2",
+ "httpmime": "org.apache.httpcomponents:httpmime:4.5.2",
"httpcore": "org.apache.httpcomponents:httpcore:4.4.4",
"httpasyncclient": "org.apache.httpcomponents:httpasyncclient:4.1.3",
"jgit":"org.eclipse.jgit:org.eclipse.jgit:4.8.0.201706111038-r",