You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/08/30 15:55:27 UTC
[1/9] incubator-gobblin git commit: Azkaban Orchestrator for GaaS
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 90be15f47 -> 9b9fec817
Azkaban Orchestrator for GaaS
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/08e60efd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/08e60efd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/08e60efd
Branch: refs/heads/master
Commit: 08e60efd328485554344b179da2a54466d84628b
Parents: f96379e
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Tue Aug 8 12:41:02 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Tue Aug 8 12:41:02 2017 -0700
----------------------------------------------------------------------
.../service-azkaban-hello-world.template | 24 +
.../orchestration/AzkabanAjaxAPIClient.java | 435 +++++++++++++++++++
.../modules/orchestration/AzkabanJobHelper.java | 272 ++++++++++++
.../orchestration/AzkabanProjectConfig.java | 123 ++++++
.../AzkabanSpecExecutorInstance.java | 106 +++++
.../AzkabanSpecExecutorInstanceProducer.java | 158 +++++++
.../orchestration/ServiceAzkabanConfigKeys.java | 38 ++
.../main/resources/default-service-azkaban.conf | 33 ++
.../service-azkaban-hello-world.template | 0
9 files changed, 1189 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-example/src/main/resources/service-azkaban-hello-world.template
----------------------------------------------------------------------
diff --git a/gobblin-example/src/main/resources/service-azkaban-hello-world.template b/gobblin-example/src/main/resources/service-azkaban-hello-world.template
new file mode 100644
index 0000000..36a7418
--- /dev/null
+++ b/gobblin-example/src/main/resources/service-azkaban-hello-world.template
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+gobblin.service.azkaban.username=<CHANGE ME>
+gobblin.service.azkaban.password=<CHANGE ME>
+gobblin.service.azkaban.server.url=<CHANGE ME>
+gobblin.service.azkaban.project.namePrefix=GobblinService_
+gobblin.service.azkaban.project.description="Gobblin Service has setup this project"
+gobblin.service.azkaban.project.flowName="GobblinServiceFlow"
+gobblin.service.azkaban.project.groupAdmins=""
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
new file mode 100644
index 0000000..31fc753
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.codec.EncoderException;
+import org.apache.commons.codec.net.URLCodec;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.entity.mime.MultipartEntityBuilder;
+import org.apache.http.impl.client.BasicCookieStore;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.TrustStrategy;
+
+import com.google.common.base.Splitter;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+
+@Slf4j
+public class AzkabanAjaxAPIClient {
+
+ private static Splitter SPLIT_ON_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
+
+ // TODO: Ensure GET call urls do not grow too big
+ private static final int LOW_NETWORK_TRAFFIC_BEGIN_HOUR = 17;
+ private static final int LOW_NETWORK_TRAFFIC_END_HOUR = 22;
+ private static final int JOB_START_DELAY_MINUTES = 5;
+ private static final long MILLISECONDS_IN_HOUR = 60 * 60 * 1000;
+ private static final URLCodec codec = new URLCodec();
+
+ public static String authenticateAndGetSessionId(String username, String password, String azkabanServerUrl)
+ throws IOException, EncoderException {
+ // Create post request
+ HttpPost postRequest = new HttpPost(azkabanServerUrl);
+ StringEntity input = new StringEntity(String.format("action=%s&username=%s&password=%s", "login",
+ username, codec.encode(password)));
+ input.setContentType("application/x-www-form-urlencoded");
+ postRequest.setEntity(input);
+ postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
+
+ // Make the call, get response
+ @Cleanup CloseableHttpClient httpClient = getHttpClient();
+ HttpResponse response = httpClient.execute(postRequest);
+
+ return handleResponse(response, "session.id").get("session.id");
+ }
+
+ public static String getProjectId(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException {
+ // Note: Every get call to Azkaban provides a projectId in response, so we have are using fetchProjectFlows call
+ // .. because it does not need any additional params other than project name
+ // Create get request
+ HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=fetchprojectflows&session.id=%s&"
+ + "project=%s", azkabanProjectConfig.getAzkabanServerUrl(), sessionId,
+ azkabanProjectConfig.getAzkabanProjectName()));
+
+ // Make the call, get response
+ @Cleanup CloseableHttpClient httpClient = getHttpClient();
+ HttpResponse response = httpClient.execute(getRequest);
+ return handleResponse(response, "projectId").get("projectId");
+ }
+
+ public static String createAzkabanProject(String sessionId, String zipFilePath,
+ AzkabanProjectConfig azkabanProjectConfig)
+ throws IOException {
+
+ String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
+ String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
+ String azkabanProjectDescription = azkabanProjectConfig.getAzkabanProjectDescription();
+ String groupAdminUsers = azkabanProjectConfig.getAzkabanGroupAdminUsers();
+
+ // Create post request
+ HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager?action=create");
+ StringEntity input = new StringEntity(String.format("session.id=%s&name=%s&description=%s", sessionId,
+ azkabanProjectName, azkabanProjectDescription));
+ input.setContentType("application/x-www-form-urlencoded");
+ postRequest.setEntity(input);
+ postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
+
+ // Make the call, get response
+ @Cleanup CloseableHttpClient httpClient = getHttpClient();
+ HttpResponse response = httpClient.execute(postRequest);
+ handleResponse(response);
+
+ // Add proxy user if any
+ if (azkabanProjectConfig.getAzkabanUserToProxy().isPresent()) {
+ Iterable<String> proxyUsers = SPLIT_ON_COMMA.split(azkabanProjectConfig.getAzkabanUserToProxy().get());
+ for (String user : proxyUsers) {
+ addProxyUser(sessionId, azkabanServerUrl, azkabanProjectName, user);
+ }
+ }
+
+ // Add group permissions if any
+ // TODO: Support users (not just groups), and different permission types
+ // (though we can add users, we only support groups at the moment and award them with admin permissions)
+ if (StringUtils.isNotBlank(groupAdminUsers)) {
+ String [] groups = StringUtils.split(groupAdminUsers, ",");
+ for (String group : groups) {
+ addUserPermission(sessionId, azkabanServerUrl, azkabanProjectName, group, true, true, false, false,
+ false, false);
+ }
+ }
+
+ // Upload zip file to azkaban and return projectId
+ return uploadZipFileToAzkaban(sessionId, azkabanServerUrl, azkabanProjectName, zipFilePath);
+ }
+
+ public static String replaceAzkabanProject(String sessionId, String zipFilePath,
+ AzkabanProjectConfig azkabanProjectConfig)
+ throws IOException {
+
+ String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
+ String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
+ String azkabanProjectDescription = azkabanProjectConfig.getAzkabanProjectDescription();
+ String groupAdminUsers = azkabanProjectConfig.getAzkabanGroupAdminUsers();
+
+ // Change project description
+ changeProjectDescription(sessionId, azkabanServerUrl, azkabanProjectName, azkabanProjectDescription);
+
+ // Add proxy user if any
+ // Note: 1. We cannot remove previous proxy-user because there is no way to read it from Azkaban
+ // 2. Adding same proxy user multiple times is a non-issue
+ // Add proxy user if any
+ if (azkabanProjectConfig.getAzkabanUserToProxy().isPresent()) {
+ Iterable<String> proxyUsers = SPLIT_ON_COMMA.split(azkabanProjectConfig.getAzkabanUserToProxy().get());
+ for (String user : proxyUsers) {
+ addProxyUser(sessionId, azkabanServerUrl, azkabanProjectName, user);
+ }
+ }
+
+ // Add group permissions if any
+ // TODO: Support users (not just groups), and different permission types
+ // Note: 1. We cannot remove previous group-user because there is no way to read it from Azkaban
+ // 2. Adding same group-user will return an error message, but we will ignore it
+ // (though we can add users, we only support groups at the moment and award them with admin permissions)
+ if (StringUtils.isNotBlank(groupAdminUsers)) {
+ String [] groups = StringUtils.split(groupAdminUsers, ",");
+ for (String group : groups) {
+ try {
+ addUserPermission(sessionId, azkabanServerUrl, azkabanProjectName, group, true, true, false, false, false,
+ false);
+ } catch (IOException e) {
+ // Ignore if group already exists, we cannot list existing groups; so its okay to attempt adding exiting
+ // .. groups
+ if (!"Group permission already exists.".equalsIgnoreCase(e.getMessage())) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ // Upload zip file to azkaban and return projectId
+ return uploadZipFileToAzkaban(sessionId, azkabanServerUrl, azkabanProjectName, zipFilePath);
+ }
+
+ private static void addProxyUser(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+ String proxyUser)
+ throws IOException {
+
+ // Create get request (adding same proxy user multiple times is a non-issue, Azkaban handles it)
+ HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=addProxyUser&session.id=%s&"
+ + "project=%s&name=%s", azkabanServerUrl, sessionId, azkabanProjectName, proxyUser));
+
+ // Make the call, get response
+ @Cleanup CloseableHttpClient httpClient = getHttpClient();
+ HttpResponse response = httpClient.execute(getRequest);
+ handleResponse(response);
+ }
+
+ private static void addUserPermission(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+ String name, boolean isGroup, boolean adminPermission, boolean readPermission, boolean writePermission,
+ boolean executePermission, boolean schedulePermission)
+ throws IOException {
+
+ // NOTE: We are not listing the permissions before adding them, because Azkaban in its current state only
+ // .. returns user permissions and not group permissions
+
+ // Create get request (adding same normal user permission multiple times will throw an error, but we cannot
+ // list whole list of permissions anyways)
+ HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=addPermission&session.id=%s&"
+ + "project=%s&name=%s&group=%s&permissions[admin]=%s&permissions[read]=%s&permissions[write]=%s"
+ + "&permissions[execute]=%s&permissions[schedule]=%s", azkabanServerUrl, sessionId, azkabanProjectName, name,
+ isGroup, adminPermission, readPermission, writePermission, executePermission, schedulePermission));
+
+ // Make the call, get response
+ @Cleanup CloseableHttpClient httpClient = getHttpClient();
+ HttpResponse response = httpClient.execute(getRequest);
+ handleResponse(response);
+ }
+
+ private static String uploadZipFileToAzkaban(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+ String jobZipFile)
+ throws IOException {
+
+ // Create post request
+ HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager");
+ HttpEntity entity = MultipartEntityBuilder
+ .create()
+ .addTextBody("session.id", sessionId)
+ .addTextBody("ajax", "upload")
+ .addBinaryBody("file", new File(jobZipFile),
+ ContentType.create("application/zip"), azkabanProjectName + ".zip")
+ .addTextBody("project", azkabanProjectName)
+ .build();
+ postRequest.setEntity(entity);
+
+ // Make the call, get response
+ @Cleanup CloseableHttpClient httpClient = getHttpClient();
+ HttpResponse response = httpClient.execute(postRequest);
+
+ // Obtaining projectId is hard. Uploading zip file is one avenue to get it from Azkaban
+ return handleResponse(response, "projectId").get("projectId");
+ }
+
+ public static void scheduleAzkabanProject(String sessionId, String azkabanProjectId,
+ AzkabanProjectConfig azkabanProjectConfig)
+ throws IOException {
+ String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
+ String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
+ String azkabanProjectFlowName = azkabanProjectConfig.getAzkabanProjectFlowName();
+
+ String scheduleString = "is_recurring=off"; // run only once
+ // TODO: Enable scheduling on Azkaban, when we are ready to push down the schedule
+// if (azkabanProjectConfig.isScheduled()) {
+// scheduleString = "is_recurring=on&period=1d"; // schedule once every day
+// }
+
+ // Create post request
+ HttpPost postRequest = new HttpPost(azkabanServerUrl + "/schedule");
+ StringEntity input = new StringEntity(String.format("session.id=%s&ajax=scheduleFlow"
+ + "&projectName=%s&flow=%s&projectId=%s&scheduleTime=%s&scheduleDate=%s&%s",
+ sessionId, azkabanProjectName, azkabanProjectFlowName, azkabanProjectId,
+ getScheduledTimeInAzkabanFormat(LOW_NETWORK_TRAFFIC_BEGIN_HOUR, LOW_NETWORK_TRAFFIC_END_HOUR,
+ JOB_START_DELAY_MINUTES), getScheduledDateInAzkabanFormat(), scheduleString));
+ input.setContentType("application/x-www-form-urlencoded");
+ postRequest.setEntity(input);
+ postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
+
+ // Make the call, get response
+ @Cleanup CloseableHttpClient httpClient = getHttpClient();
+ HttpResponse response = httpClient.execute(postRequest);
+ handleResponse(response);
+ }
+
+ private static void changeProjectDescription(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+ String projectDescription)
+ throws IOException {
+
+ HttpGet getRequest;
+ try {
+ // Create get request (adding same proxy user multiple times is a non-issue, Azkaban handles it)
+ getRequest = new HttpGet(String
+ .format("%s/manager?ajax=changeDescription&session.id=%s&" + "project=%s&description=%s", azkabanServerUrl,
+ sessionId, azkabanProjectName, new URLCodec().encode(projectDescription)));
+ } catch (EncoderException e) {
+ throw new IOException("Could not encode Azkaban project description", e);
+ }
+
+ // Make the call, get response
+ @Cleanup CloseableHttpClient httpClient = getHttpClient();
+ HttpResponse response = httpClient.execute(getRequest);
+ handleResponse(response);
+ }
+
+ public static void notifyUberdistcp2ToolServer(String uberdistcp2ToolServer,
+ AzkabanProjectConfig azkabanProjectConfig)
+ throws IOException {
+ boolean isGoUrl = false;
+ if (!StringUtils.isBlank(uberdistcp2ToolServer)) {
+ if (uberdistcp2ToolServer.startsWith("https://go") || uberdistcp2ToolServer.startsWith("http://go")) {
+ isGoUrl = true;
+ }
+ }
+ }
+
+ private static CloseableHttpClient getHttpClient()
+ throws IOException {
+ try {
+ // Self sign SSL
+ SSLContextBuilder builder = new SSLContextBuilder();
+ builder.loadTrustMaterial(null, (TrustStrategy) new TrustSelfSignedStrategy());
+ SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(builder.build());
+
+ // Create client
+ return HttpClients.custom().setSSLSocketFactory(sslsf).setDefaultCookieStore(new BasicCookieStore()).build();
+ } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) {
+ throw new IOException("Issue with creating http client", e);
+ }
+ }
+
+ private static Map<String, String> handleResponse(HttpResponse response, String... responseKeys)
+ throws IOException {
+ if (response.getStatusLine().getStatusCode() != 201 && response.getStatusLine().getStatusCode()!= 200) {
+ log.error("Failed : HTTP error code : " + response.getStatusLine().getStatusCode());
+ throw new RuntimeException("Failed : HTTP error code : " + response.getStatusLine().getStatusCode());
+ }
+
+ // Get response in string
+ InputStream in = response.getEntity().getContent();
+ String jsonResponseString = IOUtils.toString(in, "UTF-8");
+ log.info("Response string: " + jsonResponseString);
+
+ // Parse Json
+ Map<String, String> responseMap = new HashMap<>();
+ if (StringUtils.isNotBlank(jsonResponseString)) {
+ JsonObject jsonObject = new JsonParser().parse(jsonResponseString).getAsJsonObject();
+
+ // Handle error if any
+ handleResponseError(jsonObject);
+
+ // Get required responseKeys
+ if (ArrayUtils.isNotEmpty(responseKeys)) {
+ for (String responseKey : responseKeys) {
+ responseMap.put(responseKey, jsonObject.get(responseKey).toString().replaceAll("\"", ""));
+ }
+ }
+ }
+
+ return responseMap;
+ }
+
+ private static void handleResponseError(JsonObject jsonObject) throws IOException {
+ // Azkaban does not has a standard for error messages tag
+ if (null != jsonObject.get("status") && "error".equalsIgnoreCase(jsonObject.get("status").toString()
+ .replaceAll("\"", ""))) {
+ String message = (null != jsonObject.get("message")) ?
+ jsonObject.get("message").toString().replaceAll("\"", "") : "Issue in creating project";
+ throw new IOException(message);
+ }
+
+ if (null != jsonObject.get("error")) {
+ String error = jsonObject.get("error").toString().replaceAll("\"", "");
+ throw new IOException(error);
+ }
+ }
+
+ /***
+ * Generate a random scheduled time between specified execution time window in the Azkaban compatible format
+ * which is: hh,mm,a,z Eg. ScheduleTime=12,00,PM,PDT
+ *
+ * @param windowStartHour Window start hour in 24 hr (HH) format (inclusive)
+ * @param windowEndHour Window end hour in 24 hr (HH) format (exclusive)
+ * @param delayMinutes If current time is within window, then additional delay for bootstrapping if desired
+ * @return Scheduled time string of the format hh,mm,a,z
+ */
+ public static String getScheduledTimeInAzkabanFormat(int windowStartHour, int windowEndHour, int delayMinutes) {
+ // Validate
+ if (windowStartHour < 0 || windowEndHour > 23 || windowStartHour >= windowEndHour) {
+ throw new IllegalArgumentException("Window start should be less than window end, and both should be between "
+ + "0 and 23");
+ }
+ if (delayMinutes < 0 || delayMinutes > 59) {
+ throw new IllegalArgumentException("Delay in minutes should be between 0 and 59 (inclusive)");
+ }
+
+ // Setup window
+ Calendar windowStartTime = Calendar.getInstance();
+ windowStartTime.set(Calendar.HOUR_OF_DAY, windowStartHour);
+ windowStartTime.set(Calendar.MINUTE, 0);
+ windowStartTime.set(Calendar.SECOND, 0);
+
+ Calendar windowEndTime = Calendar.getInstance();
+ windowEndTime.set(Calendar.HOUR_OF_DAY, windowEndHour);
+ windowEndTime.set(Calendar.MINUTE, 0);
+ windowEndTime.set(Calendar.SECOND, 0);
+
+ // Check if current time is between windowStartTime and windowEndTime, then let the execution happen
+ // after delayMinutes minutes
+ Calendar now = Calendar.getInstance();
+ if (now.after(windowStartTime) && now.before(windowEndTime)) {
+ // Azkaban takes a few seconds / a minute to bootstrap,
+ // so extra few minutes get the first execution to run instantly
+ now.add(Calendar.MINUTE, delayMinutes);
+
+ return new SimpleDateFormat("hh,mm,a,z").format(now.getTime());
+ }
+
+ // Current time is not between windowStartTime and windowEndTime, so get random execution time for next day
+ int allowedSchedulingWindow = (int)((windowEndTime.getTimeInMillis() - windowStartTime.getTimeInMillis()) /
+ MILLISECONDS_IN_HOUR);
+ int randomHourInWindow = new Random(System.currentTimeMillis()).nextInt(allowedSchedulingWindow);
+ int randomMinute = new Random(System.currentTimeMillis()).nextInt(60);
+ windowStartTime.add(Calendar.HOUR, randomHourInWindow);
+ windowStartTime.set(Calendar.MINUTE, randomMinute);
+
+ return new SimpleDateFormat("hh,mm,a,z").format(windowStartTime.getTime());
+ }
+
+ private static String getScheduledDateInAzkabanFormat() {
+ // Eg. ScheduleDate=07/22/2014"
+ return new SimpleDateFormat("MM/dd/yyyy").format(new Date());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
new file mode 100644
index 0000000..627761e
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.compress.archivers.ArchiveException;
+import org.apache.commons.compress.archivers.ArchiveOutputStream;
+import org.apache.commons.compress.archivers.ArchiveStreamFactory;
+import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.common.collect.Lists;
+
+
+@Slf4j
+public class AzkabanJobHelper {
+
+ public static boolean isAzkabanJobPresent(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
+ throws IOException {
+ log.info("Checking if Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName() + " exists");
+ try {
+ // NOTE: hacky way to determine if project already exists because Azkaban does not provides a way to
+ // .. check if the project already exists or not
+ boolean isPresent = StringUtils.isNotBlank(AzkabanAjaxAPIClient.getProjectId(sessionId, azkabanProjectConfig));
+ log.info("Project exists: " + isPresent);
+
+ return isPresent;
+ } catch (IOException e) {
+ // Project doesn't exists
+ if (String.format("Project %s doesn't exist.", azkabanProjectConfig.getAzkabanProjectName())
+ .equalsIgnoreCase(e.getMessage())) {
+ log.info("Project does not exists.");
+ return false;
+ }
+ // Project exists but with no read access to current user
+ if ("Permission denied. Need READ access.".equalsIgnoreCase(e.getMessage())) {
+ log.info("Project exists, but current user does not has READ access.");
+ return true;
+ }
+ // Some other error
+ log.error("Issue in checking if project is present", e);
+ throw e;
+ }
+ }
+
+ public static String getProjectId(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
+ throws IOException {
+ log.info("Getting project Id for project: " + azkabanProjectConfig.getAzkabanProjectName());
+ String projectId = AzkabanAjaxAPIClient.getProjectId(sessionId, azkabanProjectConfig);
+ log.info("Project id: " + projectId);
+
+ return projectId;
+ }
+
+ public static String createAzkabanJob(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
+ throws IOException {
+ log.info("Creating Azkaban project for: " + azkabanProjectConfig.getAzkabanProjectName());
+
+ // Create zip file
+ String zipFilePath = createAzkabanJobZip(azkabanProjectConfig);
+ log.info("Zip file path: " + zipFilePath);
+
+ // Upload zip file to Azkaban
+ String projectId = AzkabanAjaxAPIClient.createAzkabanProject(sessionId, zipFilePath, azkabanProjectConfig);
+ log.info("Project Id: " + projectId);
+
+ return projectId;
+ }
+
+ public static String replaceAzkabanJob(String sessionId, String azkabanProjectId,
+ AzkabanProjectConfig azkabanProjectConfig) throws IOException {
+ log.info("Replacing zip for Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName());
+
+ // Create zip file
+ String zipFilePath = createAzkabanJobZip(azkabanProjectConfig);
+ log.info("Zip file path: " + zipFilePath);
+
+ // Replace the zip file on Azkaban
+ String projectId = AzkabanAjaxAPIClient.replaceAzkabanProject(sessionId, zipFilePath, azkabanProjectConfig);
+ log.info("Project Id: " + projectId);
+
+ return projectId;
+ }
+
+ public static void scheduleJob(String sessionId, String azkabanProjectId,
+ AzkabanProjectConfig azkabanProjectConfig)
+ throws IOException {
+ log.info("Scheduling Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName());
+ AzkabanAjaxAPIClient.scheduleAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig);
+ }
+
+ public static void changeJobSchedule(String sessionId, String azkabanProjectId,
+ AzkabanProjectConfig azkabanProjectConfig)
+ throws IOException {
+ log.info("Changing schedule for Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName());
+ AzkabanAjaxAPIClient.scheduleAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig);
+ }
+
+ public static String createAzkabanJobZip(AzkabanProjectConfig azkabanProjectConfig)
+ throws IOException {
+ log.info("Creating Azkaban job zip file for project: " + azkabanProjectConfig.getAzkabanProjectName());
+ String workDir = azkabanProjectConfig.getWorkDir();
+
+ Optional<String> jarUrlTemplate = azkabanProjectConfig.getAzkabanZipJarUrlTemplate();
+ Optional<List<String>> jarNames = azkabanProjectConfig.getAzkabanZipJarNames();
+ Optional<String> jarVersion = azkabanProjectConfig.getAzkabanZipJarVersion();
+ Optional<List<String>> additionalFiles = azkabanProjectConfig.getAzkabanZipAdditionalFiles();
+ boolean failIfJarNotFound = azkabanProjectConfig.getFailIfJarNotFound();
+ String jobFlowName = azkabanProjectConfig.getAzkabanProjectFlowName();
+ String zipFilename = azkabanProjectConfig.getAzkabanProjectZipFilename();
+
+ // Download the job jars
+ List<File> filesToAdd = Lists.newArrayList();
+ if (jarNames.isPresent() && jarUrlTemplate.isPresent() && jarVersion.isPresent()) {
+ String urlTemplate = jarUrlTemplate.get();
+ String version = jarVersion.get();
+ for (String jarName : jarNames.get()) {
+ String jobJarUrl = urlTemplate.replaceAll("<module-version>", version).replaceAll("<module-name>", jarName);
+ log.info("Downloading job jar from: " + jobJarUrl + " to: " + workDir);
+ File jobJarFile = null;
+ try {
+ jobJarFile = downloadAzkabanJobJar(workDir, jobJarUrl);
+ filesToAdd.add(jobJarFile);
+ } catch (IOException e) {
+ if(failIfJarNotFound) {
+ throw e;
+ }
+ log.warn("Could not download: " + jobJarFile);
+ }
+ }
+ }
+
+ // Download additional files
+ if (additionalFiles.isPresent()) {
+ List<String> files = additionalFiles.get();
+ for (String fileName : files) {
+ log.info("Downloading additional file from: " + fileName + " to: " + workDir);
+ File additionalFile = null;
+ try {
+ additionalFile = downloadAzkabanJobJar(workDir, fileName);
+ filesToAdd.add(additionalFile);
+ } catch (IOException e) {
+ if(failIfJarNotFound) {
+ throw e;
+ }
+ log.warn("Could not download: " + additionalFile);
+ }
+ }
+ }
+
+ // Write the config files
+ log.info("Writing Azkaban config files");
+ File [] jobConfigFile = writeAzkabanConfigFiles(workDir, jobFlowName, azkabanProjectConfig);
+ filesToAdd.add(jobConfigFile[0]);
+
+ // Create the zip file
+ log.info("Writing zip file");
+ String zipfile = createZipFile(workDir, zipFilename, filesToAdd);
+ log.info("Wrote zip file: " + zipfile);
+
+ return zipfile;
+ }
+
+ private static String createZipFile(String directory, String zipFilename, List<File> filesToAdd)
+ throws IOException {
+ // Determine final zip file path
+ String zipFilePath = String.format("%s/%s", directory, zipFilename);
+ File zipFile = new File(zipFilePath);
+ zipFile.delete();
+
+ // Create and add files to zip file
+ addFilesToZip(zipFile, filesToAdd);
+
+ return zipFilePath;
+ }
+
+ private static void addFilesToZip(File zipFile, List<File> filesToAdd) throws IOException {
+ try {
+ @Cleanup
+ OutputStream archiveStream = new FileOutputStream(zipFile);
+ @Cleanup
+ ArchiveOutputStream archive =
+ new ArchiveStreamFactory().createArchiveOutputStream(ArchiveStreamFactory.ZIP, archiveStream);
+
+ for (File fileToAdd : filesToAdd) {
+ ZipArchiveEntry entry = new ZipArchiveEntry(fileToAdd.getName());
+ archive.putArchiveEntry(entry);
+
+ @Cleanup
+ BufferedInputStream input = new BufferedInputStream(new FileInputStream(fileToAdd));
+ IOUtils.copy(input, archive);
+ archive.closeArchiveEntry();
+ }
+
+ archive.finish();
+ } catch (ArchiveException e) {
+ throw new IOException("Issue with creating archive", e);
+ }
+ }
+
+ private static File[] writeAzkabanConfigFiles(String workDir, String flowName, AzkabanProjectConfig azkabanProjectConfig)
+ throws IOException {
+ // Determine final config file path
+ String jobFilePath = String.format("%s/%s.job", workDir, flowName);
+ File jobFile = new File(jobFilePath);
+ jobFile.delete();
+
+ StringBuilder propertyFileContent = new StringBuilder();
+ for (Map.Entry entry : azkabanProjectConfig.getJobSpec().getConfigAsProperties().entrySet()) {
+ propertyFileContent.append(String.format("%s=%s", entry.getKey(), entry.getValue())).append("\n");
+ }
+
+ // Write the job file
+ FileUtils.writeStringToFile(jobFile, propertyFileContent.toString(), Charset.forName("UTF-8"),true);
+
+ return new File[] {jobFile};
+ }
+
+ private static File downloadAzkabanJobJar(String workDir, String jobJarUrl)
+ throws IOException {
+ // Determine final jar file path
+ String[] jobJarUrlParts = jobJarUrl.trim().split("/");
+ String jobJarName = jobJarUrlParts[jobJarUrlParts.length-1];
+ String jobJarFilePath = String.format("%s/%s", workDir, jobJarName);
+ File jobJarFile = new File(jobJarFilePath);
+ jobJarFile.delete();
+
+ // Create work directory if not already exists
+ FileUtils.forceMkdir(new File(workDir));
+
+ // Download jar file from artifactory
+ @Cleanup InputStream jobJarInputStream = new URL(jobJarUrl).openStream();
+ @Cleanup OutputStream jobJarOutputStream = new FileOutputStream(jobJarFile);
+ IOUtils.copy(jobJarInputStream, jobJarOutputStream);
+
+ // TODO: compare checksum
+
+ return jobJarFile;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
new file mode 100644
index 0000000..ddae3d9
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.List;
+import java.util.Optional;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
+
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.util.ConfigUtils;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+@Getter
+@ToString
+@AllArgsConstructor
+@Builder(builderMethodName = "hiddenBuilder")
+/***
+ * Class to hold Azkaban project specific configs
+ */
+public class AzkabanProjectConfig {
+ private static final String DEFAULT_AZKABAN_PROJECT_CONFIG_FILE = "default-service-azkaban.conf";
+
+ private final String azkabanServerUrl;
+
+ private final String azkabanProjectName;
+ private final String azkabanProjectDescription;
+ private final String azkabanProjectFlowName;
+ private final String azkabanGroupAdminUsers;
+ private final Optional<String> azkabanUserToProxy;
+
+ private final Optional<List<String>> azkabanZipJarNames;
+ private final Optional<String> azkabanZipJarUrlTemplate;
+ private final Optional<String> azkabanZipJarVersion;
+ private final Optional<List<String>> azkabanZipAdditionalFiles;
+ private final Boolean failIfJarNotFound;
+
+ private final JobSpec jobSpec;
+
+ public AzkabanProjectConfig(JobSpec jobSpec) {
+ // Extract config objects
+ this.jobSpec = jobSpec;
+ Config defaultConfig = ConfigFactory.load(DEFAULT_AZKABAN_PROJECT_CONFIG_FILE);
+ Config config = jobSpec.getConfig().withFallback(defaultConfig);
+
+ // Azkaban Infrastructure
+ this.azkabanServerUrl = config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY);
+
+ // Azkaban Project Metadata
+ this.azkabanProjectName = constructProjectName(jobSpec, config);
+ this.azkabanProjectDescription = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_DESCRIPTION_KEY);
+ this.azkabanProjectFlowName = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_FLOW_NAME_KEY);
+ this.azkabanGroupAdminUsers = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_GROUP_ADMINS_KEY);
+ this.azkabanUserToProxy = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_USER_TO_PROXY_KEY, null));
+
+ // Azkaban Project Zip
+ this.azkabanZipJarNames = Optional.ofNullable(ConfigUtils.getStringList(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_JAR_NAMES_KEY));
+ this.azkabanZipJarUrlTemplate = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_JAR_URL_TEMPLATE_KEY, null));
+ this.azkabanZipJarVersion = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_JAR_VERSION_KEY, null));
+ this.azkabanZipAdditionalFiles = Optional.ofNullable(
+ ConfigUtils.getStringList(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_ADDITIONAL_FILE_URLS_KEY));
+ this.failIfJarNotFound = ConfigUtils.getBoolean(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_FAIL_IF_JARNOTFOUND_KEY, false);
+ }
+
+ private String constructProjectName(JobSpec jobSpec, Config config) {
+ String projectNamePrefix = ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_NAME_PREFIX_KEY, "");
+ String projectNamePostfix = null == jobSpec.getUri() ? "" :
+ jobSpec.getUri().toString().replaceAll("_", "-").replaceAll("[^A-Za-z0-9\\-]", "_");
+
+ return trimProjectName(String.format("%s_%s", projectNamePrefix, projectNamePostfix));
+ }
+
+ /***
+ * Get Azkaban project zip file name
+ * @return Azkaban project zip file name
+ */
+ public String getAzkabanProjectZipFilename() {
+ return String.format("%s.zip", azkabanProjectName);
+ }
+
+ /***
+ * Get Azkaban project working directory, generated by prefixing a temp name
+ * @return Azkaban project working directory
+ */
+ public String getWorkDir() {
+ return String.format("%s/%s/%s/%s", System.getProperty("user.dir"), "serviceAzkaban", azkabanProjectName, System.currentTimeMillis());
+ }
+
+ private static String trimProjectName(String projectName) {
+ // Azkaban does not support name greater than 64 chars, so limit it to 64 chars
+ if (projectName.length() > 64) {
+ // We are using string.hashcode() so that for same path the generated project name is same (and hence checking
+ // .. for path duplicates is deterministic. Using UUID or currentMillis will produce different shortened path
+ // .. for the same path every time)
+ int pathHash = projectName.hashCode();
+ if (pathHash < 0) {
+ pathHash *= -1;
+ }
+ projectName = String.format("%s_%s", projectName.substring(0, 53), pathHash);
+ }
+
+ return projectName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
new file mode 100644
index 0000000..65209c3
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.SpecExecutorInstance;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+
+public class AzkabanSpecExecutorInstance extends AbstractIdleService implements SpecExecutorInstance {
+ protected static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
+ protected static final Splitter SPLIT_BY_COLON = Splitter.on(":").omitEmptyStrings().trimResults();
+
+ // Executor Instance
+ protected final Config _config;
+ protected final Logger _log;
+ protected final URI _specExecutorInstanceUri;
+ protected final Map<String, String> _capabilities;
+
+ public AzkabanSpecExecutorInstance(Config config, Optional<Logger> log) {
+ _config = config;
+ _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
+ try {
+ _specExecutorInstanceUri = new URI(ConfigUtils.getString(config, ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
+ "NA"));
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ _capabilities = Maps.newHashMap();
+ if (config.hasPath(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY)) {
+ String capabilitiesStr = config.getString(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY);
+ List<String> capabilities = SPLIT_BY_COMMA.splitToList(capabilitiesStr);
+ for (String capability : capabilities) {
+ List<String> currentCapability = SPLIT_BY_COLON.splitToList(capability);
+ Preconditions.checkArgument(currentCapability.size() == 2, "Only one source:destination pair is supported "
+ + "per capability, found: " + currentCapability);
+ _capabilities.put(currentCapability.get(0), currentCapability.get(1));
+ }
+ }
+ }
+
+ @Override
+ public URI getUri() {
+ return _specExecutorInstanceUri;
+ }
+
+ @Override
+ public Future<String> getDescription() {
+ return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + _specExecutorInstanceUri, null);
+ }
+
+ @Override
+ public Future<Config> getConfig() {
+ return new CompletedFuture<>(_config, null);
+ }
+
+ @Override
+ public Future<String> getHealth() {
+ return new CompletedFuture<>("Healthy", null);
+ }
+
+ @Override
+ public Future<? extends Map<String, String>> getCapabilities() {
+ return new CompletedFuture<>(_capabilities, null);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ // nothing to do in default implementation
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ // nothing to do in default implementation
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
new file mode 100644
index 0000000..f093af2
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import org.apache.commons.codec.EncoderException;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
+
+public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInstance
+ implements SpecExecutorInstanceProducer<Spec>, Closeable {
+
+ // Session Id for GaaS User
+ private String sessionId;
+
+
+ public AzkabanSpecExecutorInstanceProducer(Config config, Optional<Logger> log) {
+ super(config, log);
+
+ try {
+ // Initialize Azkaban client / producer and cache credentials
+ String azkabanUsername = config.getString(ServiceAzkabanConfigKeys.AZKABAN_USERNAME_KEY);
+ String azkabanPassword = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_KEY);
+ String azkabanServerUrl = config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY);
+
+ sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl);
+ } catch (IOException | EncoderException e) {
+ throw new RuntimeException("Could not authenticate with Azkaban", e);
+ }
+ }
+
+ public AzkabanSpecExecutorInstanceProducer(Config config, Logger log) {
+ this(config, Optional.of(log));
+ }
+
+ /** Constructor with no logging */
+ public AzkabanSpecExecutorInstanceProducer(Config config) {
+ this(config, Optional.<Logger>absent());
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public Future<?> addSpec(Spec addedSpec) {
+ // If project already exists, execute it
+
+ // If project does not already exists, create and execute it
+ AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) addedSpec);
+ try {
+ _log.info("Setting up your Azkaban Project for: " + azkabanProjectConfig.getAzkabanProjectName());
+
+ // Deleted project also returns true if-project-exists check, so optimistically first create the project
+ // .. (it will create project if it was never created or deleted), if project exists it will fail with
+ // .. appropriate exception message, catch that and run in replace project mode if force overwrite is
+ // .. specified
+ try {
+ createNewAzkabanProject(sessionId, azkabanProjectConfig);
+ } catch (IOException e) {
+ if ("Project already exists.".equalsIgnoreCase(e.getMessage())) {
+ if (ConfigUtils.getBoolean(((JobSpec) addedSpec).getConfig(),
+ ServiceAzkabanConfigKeys.AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY, false)) {
+ _log.info("Project already exists for this Spec, but force overwrite specified");
+ updateExistingAzkabanProject(sessionId, azkabanProjectConfig);
+ } else {
+ _log.info(String.format("Azkaban project already exists: " + "%smanager?project=%s",
+ azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName()));
+ }
+ } else {
+ throw e;
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Issue in setting up Azkaban project.", e);
+ }
+
+ return null;
+ }
+
+ @Override
+ public Future<?> updateSpec(Spec updatedSpec) {
+ // Re-create project
+ AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) updatedSpec);
+
+ try {
+ updateExistingAzkabanProject(sessionId, azkabanProjectConfig);
+ } catch (IOException e) {
+ throw new RuntimeException("Issue in setting up Azkaban project.", e);
+ }
+
+ return new CompletedFuture<>(_config, null);
+ }
+
+ @Override
+ public Future<?> deleteSpec(URI deletedSpecURI) {
+ // Delete project
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Future<? extends List<Spec>> listSpecs() {
+ throw new UnsupportedOperationException();
+ }
+
+ private void createNewAzkabanProject(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException {
+ // Create Azkaban Job
+ String azkabanProjectId = AzkabanJobHelper.createAzkabanJob(sessionId, azkabanProjectConfig);
+
+ // Schedule Azkaban Job
+ AzkabanJobHelper.scheduleJob(sessionId, azkabanProjectId, azkabanProjectConfig);
+
+ _log.info(String.format("Azkaban project created: %smanager?project=%s",
+ azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName()));
+ }
+
+ private void updateExistingAzkabanProject(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException {
+ _log.info(String.format("Updating project: %smanager?project=%s", azkabanProjectConfig.getAzkabanServerUrl(),
+ azkabanProjectConfig.getAzkabanProjectName()));
+
+ // Get project Id
+ String azkabanProjectId = AzkabanJobHelper.getProjectId(sessionId, azkabanProjectConfig);
+
+ // Replace Azkaban Job
+ AzkabanJobHelper.replaceAzkabanJob(sessionId, azkabanProjectId, azkabanProjectConfig);
+
+ // Change schedule
+ AzkabanJobHelper.changeJobSchedule(sessionId, azkabanProjectId, azkabanProjectConfig);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
new file mode 100644
index 0000000..762561c
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+public class ServiceAzkabanConfigKeys {
+ public static final String GOBBLIN_SERVICE_AZKABAN_PREFIX = "gobblin.service.azkaban.";
+
+ // Azkaban Session Specifics
+ public static final String AZKABAN_USERNAME_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "username";
+ public static final String AZKABAN_PASSWORD_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "password";
+ public static final String AZKABAN_SERVER_URL_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "server.url";
+ public static final String AZKABAN_PROJECT_NAME_PREFIX_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.namePrefix";
+ public static final String AZKABAN_PROJECT_DESCRIPTION_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.description";
+ public static final String AZKABAN_PROJECT_USER_TO_PROXY_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.userToProxy";
+ public static final String AZKABAN_PROJECT_FLOW_NAME_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.flowName";
+ public static final String AZKABAN_PROJECT_GROUP_ADMINS_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.groupAdmins";
+ public static final String AZKABAN_PROJECT_ZIP_JAR_URL_TEMPLATE_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.jarUrlTemplate";
+ public static final String AZKABAN_PROJECT_ZIP_JAR_NAMES_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.jarNames";
+ public static final String AZKABAN_PROJECT_ZIP_JAR_VERSION_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.jarVersion";
+ public static final String AZKABAN_PROJECT_ZIP_FAIL_IF_JARNOTFOUND_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.failIfJarNotFound";
+ public static final String AZKABAN_PROJECT_ZIP_ADDITIONAL_FILE_URLS_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.additionalFilesUrl";
+ public static final String AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.overwriteIfExists";
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf b/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
new file mode 100644
index 0000000..01a26f1
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Default values
+# These values should generally come from template being used by the Service
+gobblin.service.azkaban.project.namePrefix=GobblinService_
+gobblin.service.azkaban.project.description="Gobblin Service has setup this project"
+gobblin.service.azkaban.project.flowName="GobblinServiceFlow"
+
+gobblin.service.azkaban.project.zip.jarUrlTemplate=${gobblin.service.azkaban.project.job.jar.mavenUrlTemplate}
+gobblin.service.azkaban.project.zip.jarNames=${gobblin.service.azkaban.project.job.jar.mavenUrlTemplateModules}
+gobblin.service.azkaban.project.zip.jarVersion=${gobblin.service.azkaban.project.job.jar.mavenGobblinVersion}
+
+gobblin.service.azkaban.project.zip.failIfJarNotFound=false
+gobblin.service.azkaban.project.zip.additionalFilesUrl=""
+
+gobblin.service.azkaban.project.job.jar.mavenUrlTemplate="https://repo.maven.apache.org/maven2/com/linkedin/gobblin/<module-name>/<module-version>/<module-name>-<module-version>.jar"
+gobblin.service.azkaban.project.job.jar.mavenUrlTemplateModules="gobblin-admin,gobblin-api,gobblin-compaction,gobblin-config-management,gobblin-core,gobblin-core-base,gobblin-distribution,gobblin-example,gobblin-hive-registration,gobblin-metrics-libs,gobblin-metastore,gobblin-modules,gobblin-rest-service,gobblin-runtime,gobblin-runtime-hadoop,gobblin-utility,gobblin-salesforce,gobblin-test-harness,gobblin-tunnel,gobblin-data-management,gobblin-config-management,gobblin-audit,gobblin-yarn,gobblin-cluster,gobblin-aws,gobblin-service,gobblin-test-utils"
+gobblin.service.azkaban.project.job.jar.mavenGobblinVersion="0.11.0"
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-runtime/src/main/resources/templates/service-azkaban-hello-world.template
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/resources/templates/service-azkaban-hello-world.template b/gobblin-runtime/src/main/resources/templates/service-azkaban-hello-world.template
new file mode 100644
index 0000000..e69de29
[9/9] incubator-gobblin git commit: Merge pull request #2085 from
abti/service_azkaban_orchestrator
Posted by ab...@apache.org.
Merge pull request #2085 from abti/service_azkaban_orchestrator
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/9b9fec81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/9b9fec81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/9b9fec81
Branch: refs/heads/master
Commit: 9b9fec8176007ff985a69639dd058035606c93fc
Parents: 90be15f b0c96ac
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Wed Aug 30 08:55:18 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Aug 30 08:55:18 2017 -0700
----------------------------------------------------------------------
conf/service/log4j-cluster.properties | 27 -
conf/service/log4j-service.properties | 27 +
.../service-azkaban-hello-world.template | 24 +
gobblin-modules/gobblin-azkaban/build.gradle | 5 +
.../orchestration/AzkabanAjaxAPIClient.java | 507 +++++++++++++++++++
.../modules/orchestration/AzkabanJobHelper.java | 362 +++++++++++++
.../orchestration/AzkabanProjectConfig.java | 128 +++++
.../AzkabanSpecExecutorInstance.java | 108 ++++
.../AzkabanSpecExecutorInstanceProducer.java | 176 +++++++
.../orchestration/ServiceAzkabanConfigKeys.java | 42 ++
.../main/resources/default-service-azkaban.conf | 41 ++
.../orchestration/AzkabanAjaxAPIClientTest.java | 97 ++++
.../orchestration/AzkabanProjectConfigTest.java | 109 ++++
.../src/test/resources/reference.conf | 15 +
.../service-azkaban-hello-world.template | 0
.../gobblin/service/ServiceConfigKeys.java | 2 +
.../modules/core/GobblinServiceManager.java | 1 +
gradle/scripts/dependencyDefinitions.gradle | 1 +
18 files changed, 1645 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
[7/9] incubator-gobblin git commit: Tests for Azkaban Orchestrator
Posted by ab...@apache.org.
Tests for Azkaban Orchestrator
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ee3e5481
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ee3e5481
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ee3e5481
Branch: refs/heads/master
Commit: ee3e5481a1ef538fd5d5610e30254cb1fc80cad1
Parents: 0bb5139
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Wed Aug 30 04:43:20 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Aug 30 04:43:20 2017 -0700
----------------------------------------------------------------------
gobblin-modules/gobblin-azkaban/build.gradle | 3 +
.../orchestration/AzkabanAjaxAPIClient.java | 7 +-
.../orchestration/AzkabanProjectConfig.java | 11 +-
.../orchestration/AzkabanAjaxAPIClientTest.java | 97 +++++++++++++++++
.../orchestration/AzkabanProjectConfigTest.java | 109 +++++++++++++++++++
.../src/test/resources/reference.conf | 15 +++
6 files changed, 238 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee3e5481/gobblin-modules/gobblin-azkaban/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/build.gradle b/gobblin-modules/gobblin-azkaban/build.gradle
index 4bebecc..2f1dde6 100644
--- a/gobblin-modules/gobblin-azkaban/build.gradle
+++ b/gobblin-modules/gobblin-azkaban/build.gradle
@@ -41,6 +41,9 @@ dependencies {
compile externalDependency.typesafeConfig
compile externalDependency.hadoopYarnApi
compile externalDependency.findBugsAnnotations
+
+ testCompile externalDependency.mockito
+ testCompile externalDependency.powermock
}
test {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee3e5481/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
index d0b8471..90bf005 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
@@ -51,6 +51,7 @@ import org.apache.http.impl.client.HttpClients;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.TrustStrategy;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.collect.Maps;
import com.google.gson.JsonElement;
@@ -345,14 +346,16 @@ public class AzkabanAjaxAPIClient {
return postRequest;
}
- private static Map<String, String> executeGetRequest(HttpGet getRequest) throws IOException {
+ @VisibleForTesting
+ protected static Map<String, String> executeGetRequest(HttpGet getRequest) throws IOException {
// Make the call, get response
@Cleanup CloseableHttpClient httpClient = getHttpClient();
HttpResponse response = httpClient.execute(getRequest);
return handleResponse(response);
}
- private static Map<String, String> executePostRequest(HttpPost postRequest) throws IOException {
+ @VisibleForTesting
+ protected static Map<String, String> executePostRequest(HttpPost postRequest) throws IOException {
// Make the call, get response
@Cleanup CloseableHttpClient httpClient = getHttpClient();
HttpResponse response = httpClient.execute(postRequest);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee3e5481/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
index 2bac65d..b99683d 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
@@ -24,6 +24,7 @@ import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
+import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.util.ConfigUtils;
@@ -76,8 +77,14 @@ public class AzkabanProjectConfig {
this.azkabanZipJarNames = Optional.ofNullable(ConfigUtils.getStringList(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_JAR_NAMES_KEY));
this.azkabanZipJarUrlTemplate = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_JAR_URL_TEMPLATE_KEY, null));
this.azkabanZipJarVersion = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_JAR_VERSION_KEY, null));
- this.azkabanZipAdditionalFiles = Optional.ofNullable(
- ConfigUtils.getStringList(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_ADDITIONAL_FILE_URLS_KEY));
+ if (config.hasPath(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_ADDITIONAL_FILE_URLS_KEY) &&
+ StringUtils.isNotBlank(config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_ADDITIONAL_FILE_URLS_KEY))) {
+ this.azkabanZipAdditionalFiles = Optional.ofNullable(
+ ConfigUtils.getStringList(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_ADDITIONAL_FILE_URLS_KEY));
+ } else {
+ this.azkabanZipAdditionalFiles = Optional.empty();
+ }
+
this.failIfJarNotFound = ConfigUtils.getBoolean(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_FAIL_IF_JARNOTFOUND_KEY, false);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee3e5481/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClientTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClientTest.java b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClientTest.java
new file mode 100644
index 0000000..d7dda91
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClientTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Slf4j
+@Test(groups = { "org.apache.gobblin.service.modules.orchestration" })
+public class AzkabanAjaxAPIClientTest {
+
+ @Test
+ public void testCurrentTimeWithinWindow()
+ throws ParseException {
+ // Current hour
+ Calendar now = Calendar.getInstance();
+ int currentHour = now.get(Calendar.HOUR_OF_DAY);
+
+ // Generate a window encapsulating the current time
+ int windowStartInHours = currentHour < 2 ? 24 + (currentHour - 2) : currentHour - 2;
+ int windowEndInHours = (currentHour + 3) % 24;
+ int delayInMinutes = 0;
+
+ // Get computed scheduled time
+ String outputScheduledString =
+ AzkabanAjaxAPIClient.getScheduledTimeInAzkabanFormat(windowStartInHours, windowEndInHours, delayInMinutes);
+
+ // Verify that output schedule time is within window
+ Assert.assertTrue(isWithinWindow(windowStartInHours, windowEndInHours, outputScheduledString));
+ }
+
+ @Test
+ public void testCurrentTimeOutsideWindow()
+ throws ParseException {
+ // Current hour
+ Calendar now = Calendar.getInstance();
+ int currentHour = now.get(Calendar.HOUR_OF_DAY);
+
+ // Generate a window NOT encapsulating the current time
+ int windowStartInHours = currentHour > 10 ? 1 : 11;
+ int windowEndInHours = currentHour > 10 ? 6 : 16;
+ int delayInMinutes = 0;
+
+ // Get computed scheduled time
+ String outputScheduledString =
+ AzkabanAjaxAPIClient.getScheduledTimeInAzkabanFormat(windowStartInHours, windowEndInHours, delayInMinutes);
+
+ // Verify that output schedule time is within window
+ Assert.assertTrue(isWithinWindow(windowStartInHours, windowEndInHours, outputScheduledString));
+ }
+
+ private boolean isWithinWindow(int windowStartInHours, int windowEndInHours, String outputScheduledString)
+ throws ParseException {
+ Calendar windowStart = Calendar.getInstance();
+ windowStart.set(Calendar.HOUR_OF_DAY, windowStartInHours);
+ windowStart.set(Calendar.MINUTE, 0);
+ windowStart.set(Calendar.SECOND, 0);
+
+ Calendar windowEnd = Calendar.getInstance();
+ windowEnd.set(Calendar.HOUR_OF_DAY, windowEndInHours);
+ windowEnd.set(Calendar.MINUTE, 0);
+ windowEnd.set(Calendar.SECOND, 0);
+
+ Date outputDate = new SimpleDateFormat("hh,mm,a,z").parse(outputScheduledString);
+ Calendar receivedTime = Calendar.getInstance();
+ receivedTime.set(Calendar.HOUR_OF_DAY, Integer.parseInt(new SimpleDateFormat("HH").format(outputDate)));
+ receivedTime.set(Calendar.MINUTE, Integer.parseInt(new SimpleDateFormat("mm").format(outputDate)));
+
+ log.info("Window start time is: " + new SimpleDateFormat("MM/dd/yyyy hh,mm,a,z").format(windowStart.getTime()));
+ log.info("Window end time is: " + new SimpleDateFormat("MM/dd/yyyy hh,mm,a,z").format(windowEnd.getTime()));
+ log.info("Output time is: " + new SimpleDateFormat("MM/dd/yyyy hh,mm,a,z").format(receivedTime.getTime()));
+
+ return receivedTime.after(windowStart) && receivedTime.before(windowEnd);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee3e5481/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
new file mode 100644
index 0000000..9e189ab
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.Properties;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.util.ConfigUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+
+
+@Slf4j
+@Test(groups = { "org.apache.gobblin.service.modules.orchestration" })
+public class AzkabanProjectConfigTest {
+
+ @Test
+ public void testProjectNameDefault() throws Exception {
+ String expectedProjectName = "GobblinService__uri";
+
+ Properties properties = new Properties();
+ JobSpec jobSpec = new JobSpec(new URI("uri"), "0.0", "test job spec",
+ ConfigUtils.propertiesToConfig(properties), properties, Optional.absent());
+ AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
+
+ String actualProjectName = azkabanProjectConfig.getAzkabanProjectName();
+
+ Assert.assertEquals(actualProjectName, expectedProjectName);
+ }
+
+ @Test
+ public void testProjectNameWithConfig() throws Exception {
+ String expectedProjectName = "randomPrefix_http___localhost_8000_context";
+
+ Properties properties = new Properties();
+ properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefix");
+ JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context"), "0.0", "test job spec",
+ ConfigUtils.propertiesToConfig(properties), properties, Optional.absent());
+ AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
+
+ String actualProjectName = azkabanProjectConfig.getAzkabanProjectName();
+
+ Assert.assertEquals(actualProjectName, expectedProjectName);
+ }
+
+ @Test
+ public void testProjectNameWithReallyLongName() throws Exception {
+ String expectedProjectName = "randomPrefixWithReallyLongName_http___localhost_8000__55490420";
+
+ Properties properties = new Properties();
+ properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefixWithReallyLongName");
+ JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context/that-keeps-expanding-and-explanding"),
+ "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent());
+ AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
+
+ String actualProjectName = azkabanProjectConfig.getAzkabanProjectName();
+
+ Assert.assertEquals(actualProjectName, expectedProjectName);
+ }
+
+ @Test
+ public void testProjectZipFileName() throws Exception {
+ String expectedZipFileName = "randomPrefix_http___localhost_8000_context.zip";
+
+ Properties properties = new Properties();
+ properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefix");
+ JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context"), "0.0", "test job spec",
+ ConfigUtils.propertiesToConfig(properties), properties, Optional.absent());
+ AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
+
+ String actualZipFileName = azkabanProjectConfig.getAzkabanProjectZipFilename();
+
+ Assert.assertEquals(actualZipFileName, expectedZipFileName);
+ }
+
+ @Test
+ public void testProjectZipFileNameForLongName() throws Exception {
+ String expectedZipFileName = "randomPrefixWithReallyLongName_http___localhost_8000__55490420.zip";
+
+ Properties properties = new Properties();
+ properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefixWithReallyLongName");
+ JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context/that-keeps-expanding-and-explanding"),
+ "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent());
+ AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
+
+ String actualZipFileName = azkabanProjectConfig.getAzkabanProjectZipFilename();
+
+ Assert.assertEquals(actualZipFileName, expectedZipFileName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee3e5481/gobblin-modules/gobblin-azkaban/src/test/resources/reference.conf
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/test/resources/reference.conf b/gobblin-modules/gobblin-azkaban/src/test/resources/reference.conf
new file mode 100644
index 0000000..936b8a1
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/test/resources/reference.conf
@@ -0,0 +1,15 @@
+# Sample configuration properties with default values
+
+# Cluster configuration properties
+gobblin.cluster.app.name=GobblinStandaloneCluster
+gobblin.cluster.email.notification.on.shutdown=false
+gobblin.cluster.helix.instance.max.retries=2
+gobblin.cluster.work.dir=/tmp/gobblin-cluster
+
+# Helix/Zookeeper configuration properties
+gobblin.cluster.helix.cluster.name=GobblinStandaloneCluster
+gobblin.cluster.zk.connection.string="localhost:2181"
+
+fs.uri="file:///"
+
+job.execinfo.server.enabled=false
[8/9] incubator-gobblin git commit: Azkaban orchestrator findbugs fix,
addition of default config fallback, updated config values
Posted by ab...@apache.org.
Azkaban orchestrator findbugs fix, addition of default config fallback, updated config values
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b0c96aca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b0c96aca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b0c96aca
Branch: refs/heads/master
Commit: b0c96acabf9056b8caa8e565f737be989f449056
Parents: ee3e548
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Wed Aug 30 08:07:38 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Aug 30 08:07:38 2017 -0700
----------------------------------------------------------------------
conf/service/log4j-cluster.properties | 27 ------------------
conf/service/log4j-service.properties | 27 ++++++++++++++++++
.../orchestration/AzkabanAjaxAPIClient.java | 5 +++-
.../modules/orchestration/AzkabanJobHelper.java | 30 ++++++++++++++++++--
.../orchestration/AzkabanProjectConfig.java | 4 +--
.../AzkabanSpecExecutorInstance.java | 4 ++-
.../AzkabanSpecExecutorInstanceProducer.java | 6 ++--
.../orchestration/ServiceAzkabanConfigKeys.java | 1 +
.../main/resources/default-service-azkaban.conf | 2 +-
.../gobblin/service/ServiceConfigKeys.java | 2 ++
.../modules/core/GobblinServiceManager.java | 1 +
11 files changed, 70 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/conf/service/log4j-cluster.properties
----------------------------------------------------------------------
diff --git a/conf/service/log4j-cluster.properties b/conf/service/log4j-cluster.properties
deleted file mode 100755
index a7ffb68..0000000
--- a/conf/service/log4j-cluster.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# log4j configuration used during build and unit tests
-
-log4j.rootLogger=info,stdout
-log4j.threshhold=ALL
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss z} %-5p [%t] %C %X{tableName} - %m%n
-
-# Suppressed loggers
-log4j.logger.org.apache.helix.controller.GenericHelixController=ERROR
-log4j.logger.org.apache.helix.controller.stages=ERROR
-log4j.logger.org.apache.helix.controller.strategy.AutoRebalanceStrategy=ERROR
-log4j.logger.org.apache.helix.manager.zk=ERROR
-log4j.logger.org.apache.helix.monitoring.mbeans.ClusterStatusMonitor=ERROR
-log4j.logger.org.apache.helix.store.zk.AutoFallbackPropertyStore=ERROR
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/conf/service/log4j-service.properties
----------------------------------------------------------------------
diff --git a/conf/service/log4j-service.properties b/conf/service/log4j-service.properties
new file mode 100755
index 0000000..a7ffb68
--- /dev/null
+++ b/conf/service/log4j-service.properties
@@ -0,0 +1,27 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshhold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss z} %-5p [%t] %C %X{tableName} - %m%n
+
+# Suppressed loggers
+log4j.logger.org.apache.helix.controller.GenericHelixController=ERROR
+log4j.logger.org.apache.helix.controller.stages=ERROR
+log4j.logger.org.apache.helix.controller.strategy.AutoRebalanceStrategy=ERROR
+log4j.logger.org.apache.helix.manager.zk=ERROR
+log4j.logger.org.apache.helix.monitoring.mbeans.ClusterStatusMonitor=ERROR
+log4j.logger.org.apache.helix.store.zk.AutoFallbackPropertyStore=ERROR
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
index 90bf005..9c3cee4 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
@@ -308,7 +308,7 @@ public class AzkabanAjaxAPIClient {
throws IOException {
Map<String, String> params = Maps.newHashMap();
params.put("ajax", "executeFlow");
- params.put("projectName", azkabanProjectConfig.getAzkabanProjectName());
+ params.put("project", azkabanProjectConfig.getAzkabanProjectName());
params.put("flow", azkabanProjectConfig.getAzkabanProjectFlowName());
executePostRequest(preparePostRequest(azkabanProjectConfig.getAzkabanServerUrl() + "/executor", sessionId, params));
@@ -454,6 +454,9 @@ public class AzkabanAjaxAPIClient {
* @param delayMinutes If current time is within window, then additional delay for bootstrapping if desired
* @return Scheduled time string of the format hh,mm,a,z
*/
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+ value = "DMI_RANDOM_USED_ONLY_ONCE",
+ justification = "As expected for randomization")
public static String getScheduledTimeInAzkabanFormat(int windowStartHour, int windowEndHour, int delayMinutes) {
// Validate
if (windowStartHour < 0 || windowEndHour > 23 || windowStartHour >= windowEndHour) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
index a74a6ad..4fbe32b 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
@@ -263,7 +263,13 @@ public class AzkabanJobHelper {
// Determine final zip file path
String zipFilePath = String.format("%s/%s", directory, zipFilename);
File zipFile = new File(zipFilePath);
- zipFile.delete();
+ if (zipFile.exists()) {
+ if (zipFile.delete()) {
+ log.info("Zipfile existed and was deleted: " + zipFilePath);
+ } else {
+ log.warn("Zipfile exists but was not deleted: " + zipFilePath);
+ }
+ }
// Create and add files to zip file
addFilesToZip(zipFile, filesToAdd);
@@ -271,6 +277,9 @@ public class AzkabanJobHelper {
return zipFilePath;
}
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+ value = "OBL_UNSATISFIED_OBLIGATION",
+ justification = "Lombok construct of @Cleanup is handing this, but not detected by FindBugs")
private static void addFilesToZip(File zipFile, List<File> filesToAdd) throws IOException {
try {
@Cleanup
@@ -300,7 +309,13 @@ public class AzkabanJobHelper {
// Determine final config file path
String jobFilePath = String.format("%s/%s.job", workDir, flowName);
File jobFile = new File(jobFilePath);
- jobFile.delete();
+ if (jobFile.exists()) {
+ if (jobFile.delete()) {
+ log.info("JobFile existed and was deleted: " + jobFilePath);
+ } else {
+ log.warn("JobFile exists but was not deleted: " + jobFilePath);
+ }
+ }
StringBuilder propertyFileContent = new StringBuilder();
for (Map.Entry entry : azkabanProjectConfig.getJobSpec().getConfigAsProperties().entrySet()) {
@@ -313,6 +328,9 @@ public class AzkabanJobHelper {
return new File[] {jobFile};
}
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+ value = "OBL_UNSATISFIED_OBLIGATION",
+ justification = "Lombok construct of @Cleanup is handing this, but not detected by FindBugs")
private static File downloadAzkabanJobJar(String workDir, String jobJarUrl)
throws IOException {
// Determine final jar file path
@@ -320,7 +338,13 @@ public class AzkabanJobHelper {
String jobJarName = jobJarUrlParts[jobJarUrlParts.length-1];
String jobJarFilePath = String.format("%s/%s", workDir, jobJarName);
File jobJarFile = new File(jobJarFilePath);
- jobJarFile.delete();
+ if (jobJarFile.exists()) {
+ if (jobJarFile.delete()) {
+ log.info("JobJarFilePath existed and was deleted: " + jobJarFilePath);
+ } else {
+ log.warn("JobJarFilePath exists but was not deleted: " + jobJarFilePath);
+ }
+ }
// Create work directory if not already exists
FileUtils.forceMkdir(new File(workDir));
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
index b99683d..583988b 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
@@ -39,8 +39,6 @@ import com.typesafe.config.ConfigFactory;
* Class to hold Azkaban project specific configs
*/
public class AzkabanProjectConfig {
- private static final String DEFAULT_AZKABAN_PROJECT_CONFIG_FILE = "default-service-azkaban.conf";
-
private final String azkabanServerUrl;
private final String azkabanProjectName;
@@ -60,7 +58,7 @@ public class AzkabanProjectConfig {
public AzkabanProjectConfig(JobSpec jobSpec) {
// Extract config objects
this.jobSpec = jobSpec;
- Config defaultConfig = ConfigFactory.load(DEFAULT_AZKABAN_PROJECT_CONFIG_FILE);
+ Config defaultConfig = ConfigFactory.load(ServiceAzkabanConfigKeys.DEFAULT_AZKABAN_PROJECT_CONFIG_FILE);
Config config = jobSpec.getConfig().withFallback(defaultConfig);
// Azkaban Infrastructure
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
index 65209c3..dcc89cc 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
@@ -35,6 +35,7 @@ import com.google.common.base.Splitter;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
public class AzkabanSpecExecutorInstance extends AbstractIdleService implements SpecExecutorInstance {
@@ -48,7 +49,8 @@ public class AzkabanSpecExecutorInstance extends AbstractIdleService implements
protected final Map<String, String> _capabilities;
public AzkabanSpecExecutorInstance(Config config, Optional<Logger> log) {
- _config = config;
+ Config defaultConfig = ConfigFactory.load(ServiceAzkabanConfigKeys.DEFAULT_AZKABAN_PROJECT_CONFIG_FILE);
+ _config = config.withFallback(defaultConfig);
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
try {
_specExecutorInstanceUri = new URI(ConfigUtils.getString(config, ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
index f73bc6c..47df250 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
@@ -47,9 +47,9 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
try {
// Initialize Azkaban client / producer and cache credentials
- String azkabanUsername = config.getString(ServiceAzkabanConfigKeys.AZKABAN_USERNAME_KEY);
- String azkabanPassword = getAzkabanPassword(config);
- String azkabanServerUrl = config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY);
+ String azkabanUsername = _config.getString(ServiceAzkabanConfigKeys.AZKABAN_USERNAME_KEY);
+ String azkabanPassword = getAzkabanPassword(_config);
+ String azkabanServerUrl = _config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY);
_sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl);
} catch (IOException | EncoderException e) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
index b712a5a..4c24944 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
@@ -37,5 +37,6 @@ public class ServiceAzkabanConfigKeys {
// Azkaban System Environment
public static final String AZKABAN_PASSWORD_SYSTEM_KEY = "GOBBLIN_SERVICE_AZKABAN_PASSWORD";
+ public static final String DEFAULT_AZKABAN_PROJECT_CONFIG_FILE = "default-service-azkaban.conf";
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf b/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
index 6d31984..caf6ebe 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
+++ b/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
@@ -37,5 +37,5 @@ gobblin.service.azkaban.project.zip.failIfJarNotFound=false
gobblin.service.azkaban.project.zip.additionalFilesUrl=""
gobblin.service.azkaban.project.job.jar.mavenUrlTemplate="https://repo.maven.apache.org/maven2/com/linkedin/gobblin/<module-name>/<module-version>/<module-name>-<module-version>.jar"
-gobblin.service.azkaban.project.job.jar.mavenUrlTemplateModules="gobblin-admin.jar,gobblin-api.jar,gobblin-compaction.jar,gobblin-config-management.jar,gobblin-core.jar,gobblin-core-base.jar,gobblin-distribution.jar,gobblin-example.jar,gobblin-hive-registration.jar,gobblin-metrics-libs.jar,gobblin-metastore.jar,gobblin-modules.jar,gobblin-rest-service.jar,gobblin-runtime.jar,gobblin-runtime-hadoop.jar,gobblin-utility.jar,gobblin-salesforce.jar,gobblin-test-harness.jar,gobblin-tunnel.jar,gobblin-data-management.jar,gobblin-config-management.jar,gobblin-audit.jar,gobblin-yarn.jar,gobblin-cluster.jar,gobblin-aws.jar,gobblin-service.jar,gobblin-test-utils.jar"
+gobblin.service.azkaban.project.job.jar.mavenUrlTemplateModules="gobblin-admin,gobblin-api,gobblin-compaction,gobblin-config-management,gobblin-core,gobblin-core-base,gobblin-distribution,gobblin-example,gobblin-hive-registration,gobblin-metrics-libs,gobblin-metastore,gobblin-modules,gobblin-rest-service,gobblin-runtime,gobblin-runtime-hadoop,gobblin-utility,gobblin-salesforce,gobblin-test-harness,gobblin-tunnel,gobblin-data-management,gobblin-config-management,gobblin-audit,gobblin-yarn,gobblin-cluster,gobblin-aws,gobblin-service,gobblin-test-utils"
gobblin.service.azkaban.project.job.jar.mavenGobblinVersion="0.11.0"
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index a6f0199..8ea19c4 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -75,4 +75,6 @@ public class ServiceConfigKeys {
// Template Catalog Keys
public static final String TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY = GOBBLIN_SERVICE_PREFIX + "templateCatalogs.fullyQualifiedPath";
+ // Logging
+ public static final String GOBBLIN_SERVICE_LOG4J_CONFIGURATION_FILE = "log4j-service.properties";
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 4707361..c2591e1 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -37,6 +37,7 @@ import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
[4/9] incubator-gobblin git commit: Add ability to read and override
Azkaban password from environment
Posted by ab...@apache.org.
Add ability to read and override Azkaban password from environment
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/2164a1e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/2164a1e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/2164a1e6
Branch: refs/heads/master
Commit: 2164a1e6a8c2cc556f070bfab11409a701bcddd5
Parents: f423d7d
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Wed Aug 30 00:36:12 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Aug 30 00:36:12 2017 -0700
----------------------------------------------------------------------
.../AzkabanSpecExecutorInstanceProducer.java | 11 ++++++++++-
.../modules/orchestration/ServiceAzkabanConfigKeys.java | 3 +++
2 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2164a1e6/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
index f093af2..5471f0c 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.concurrent.Future;
import org.apache.commons.codec.EncoderException;
+import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer;
@@ -47,7 +48,7 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
try {
// Initialize Azkaban client / producer and cache credentials
String azkabanUsername = config.getString(ServiceAzkabanConfigKeys.AZKABAN_USERNAME_KEY);
- String azkabanPassword = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_KEY);
+ String azkabanPassword = getAzkabanPassword(config);
String azkabanServerUrl = config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY);
sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl);
@@ -56,6 +57,14 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
}
}
+ private String getAzkabanPassword(Config config) {
+ if (StringUtils.isNotBlank(System.getProperty(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_SYSTEM_KEY))) {
+ return System.getProperty(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_SYSTEM_KEY);
+ }
+
+ return ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_KEY, StringUtils.EMPTY);
+ }
+
public AzkabanSpecExecutorInstanceProducer(Config config, Logger log) {
this(config, Optional.of(log));
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2164a1e6/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
index 762561c..b712a5a 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
@@ -34,5 +34,8 @@ public class ServiceAzkabanConfigKeys {
public static final String AZKABAN_PROJECT_ZIP_FAIL_IF_JARNOTFOUND_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.failIfJarNotFound";
public static final String AZKABAN_PROJECT_ZIP_ADDITIONAL_FILE_URLS_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.additionalFilesUrl";
public static final String AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.overwriteIfExists";
+
+ // Azkaban System Environment
+ public static final String AZKABAN_PASSWORD_SYSTEM_KEY = "GOBBLIN_SERVICE_AZKABAN_PASSWORD";
}
[5/9] incubator-gobblin git commit: Add config to support Azkaban
solo server default deployment (as per Azkaban website)
Posted by ab...@apache.org.
Add config to support Azkaban solo server default deployment (as per Azkaban website)
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/e285202d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/e285202d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/e285202d
Branch: refs/heads/master
Commit: e285202de01f2fa2e7b0f5d8132e25fc87f7a0fb
Parents: 2164a1e
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Wed Aug 30 02:12:46 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Aug 30 02:12:46 2017 -0700
----------------------------------------------------------------------
.../src/main/resources/default-service-azkaban.conf | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e285202d/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf b/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
index 01a26f1..6d31984 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
+++ b/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
@@ -17,6 +17,14 @@
# Default values
# These values should generally come from template being used by the Service
+
+# Default config for Azkaban Infrastructure
+# (Current defaults are for default local Azkaban setup)
+gobblin.service.azkaban.username=azkaban
+gobblin.service.azkaban.password=azkaban
+gobblin.service.azkaban.server.url="http://localhost:8081/"
+
+# Default config for Azkaban Projects
gobblin.service.azkaban.project.namePrefix=GobblinService_
gobblin.service.azkaban.project.description="Gobblin Service has setup this project"
gobblin.service.azkaban.project.flowName="GobblinServiceFlow"
@@ -29,5 +37,5 @@ gobblin.service.azkaban.project.zip.failIfJarNotFound=false
gobblin.service.azkaban.project.zip.additionalFilesUrl=""
gobblin.service.azkaban.project.job.jar.mavenUrlTemplate="https://repo.maven.apache.org/maven2/com/linkedin/gobblin/<module-name>/<module-version>/<module-name>-<module-version>.jar"
-gobblin.service.azkaban.project.job.jar.mavenUrlTemplateModules="gobblin-admin,gobblin-api,gobblin-compaction,gobblin-config-management,gobblin-core,gobblin-core-base,gobblin-distribution,gobblin-example,gobblin-hive-registration,gobblin-metrics-libs,gobblin-metastore,gobblin-modules,gobblin-rest-service,gobblin-runtime,gobblin-runtime-hadoop,gobblin-utility,gobblin-salesforce,gobblin-test-harness,gobblin-tunnel,gobblin-data-management,gobblin-config-management,gobblin-audit,gobblin-yarn,gobblin-cluster,gobblin-aws,gobblin-service,gobblin-test-utils"
+gobblin.service.azkaban.project.job.jar.mavenUrlTemplateModules="gobblin-admin.jar,gobblin-api.jar,gobblin-compaction.jar,gobblin-config-management.jar,gobblin-core.jar,gobblin-core-base.jar,gobblin-distribution.jar,gobblin-example.jar,gobblin-hive-registration.jar,gobblin-metrics-libs.jar,gobblin-metastore.jar,gobblin-modules.jar,gobblin-rest-service.jar,gobblin-runtime.jar,gobblin-runtime-hadoop.jar,gobblin-utility.jar,gobblin-salesforce.jar,gobblin-test-harness.jar,gobblin-tunnel.jar,gobblin-data-management.jar,gobblin-config-management.jar,gobblin-audit.jar,gobblin-yarn.jar,gobblin-cluster.jar,gobblin-aws.jar,gobblin-service.jar,gobblin-test-utils.jar"
gobblin.service.azkaban.project.job.jar.mavenGobblinVersion="0.11.0"
\ No newline at end of file
[6/9] incubator-gobblin git commit: Support to execut Azkaban project
from Orchestrator
Posted by ab...@apache.org.
Support to execut Azkaban project from Orchestrator
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0bb5139c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0bb5139c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0bb5139c
Branch: refs/heads/master
Commit: 0bb5139c8822ded33295b9eb118b67df1cb9f418
Parents: e285202
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Wed Aug 30 03:36:04 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Aug 30 03:36:04 2017 -0700
----------------------------------------------------------------------
.../orchestration/AzkabanAjaxAPIClient.java | 340 +++++++++++--------
.../modules/orchestration/AzkabanJobHelper.java | 70 +++-
.../orchestration/AzkabanProjectConfig.java | 2 +-
.../AzkabanSpecExecutorInstanceProducer.java | 59 ++--
gradle/scripts/dependencyDefinitions.gradle | 1 +
5 files changed, 307 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
index 31fc753..d0b8471 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
@@ -19,8 +19,6 @@ package org.apache.gobblin.service.modules.orchestration;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
@@ -37,7 +35,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.EncoderException;
import org.apache.commons.codec.net.URLCodec;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
@@ -55,13 +52,14 @@ import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.TrustStrategy;
import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
@Slf4j
public class AzkabanAjaxAPIClient {
-
private static Splitter SPLIT_ON_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
// TODO: Ensure GET call urls do not grow too big
@@ -71,93 +69,107 @@ public class AzkabanAjaxAPIClient {
private static final long MILLISECONDS_IN_HOUR = 60 * 60 * 1000;
private static final URLCodec codec = new URLCodec();
+ /***
+ * Authenticate a user and obtain a session.id from response. Once a session.id has been obtained,
+ * until the session expires, this id can be used to do any API requests with a proper permission granted.
+ * A session expires if user log's out, changes machine, browser or location, if Azkaban is restarted,
+ * or if the session expires. The default session timeout is 24 hours (one day). User can re-login irrespective
+ * of wheter the session has expired or not. For the same user, a new session will always override the old one.
+ * @param username Username.
+ * @param password Password.
+ * @param azkabanServerUrl Azkaban Server Url.
+ * @return Session Id.
+ * @throws IOException
+ * @throws EncoderException
+ */
public static String authenticateAndGetSessionId(String username, String password, String azkabanServerUrl)
throws IOException, EncoderException {
// Create post request
- HttpPost postRequest = new HttpPost(azkabanServerUrl);
- StringEntity input = new StringEntity(String.format("action=%s&username=%s&password=%s", "login",
- username, codec.encode(password)));
- input.setContentType("application/x-www-form-urlencoded");
- postRequest.setEntity(input);
- postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
+ Map<String, String> params = Maps.newHashMap();
+ params.put("action", "login");
+ params.put("username", username);
+ params.put("password", codec.encode(password));
- // Make the call, get response
- @Cleanup CloseableHttpClient httpClient = getHttpClient();
- HttpResponse response = httpClient.execute(postRequest);
-
- return handleResponse(response, "session.id").get("session.id");
+ return executePostRequest(preparePostRequest(azkabanServerUrl, null, params)).get("session.id");
}
+ /***
+ * Get project.id for a Project Name.
+ * @param sessionId Session Id.
+ * @param azkabanProjectConfig Azkaban Project Config.
+ * @return Project Id.
+ * @throws IOException
+ */
public static String getProjectId(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException {
// Note: Every get call to Azkaban provides a projectId in response, so we have are using fetchProjectFlows call
// .. because it does not need any additional params other than project name
- // Create get request
- HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=fetchprojectflows&session.id=%s&"
- + "project=%s", azkabanProjectConfig.getAzkabanServerUrl(), sessionId,
- azkabanProjectConfig.getAzkabanProjectName()));
+ Map<String, String> params = Maps.newHashMap();
+ params.put("ajax", "fetchprojectflows");
+ params.put("project", azkabanProjectConfig.getAzkabanProjectName());
- // Make the call, get response
- @Cleanup CloseableHttpClient httpClient = getHttpClient();
- HttpResponse response = httpClient.execute(getRequest);
- return handleResponse(response, "projectId").get("projectId");
+ return executeGetRequest(prepareGetRequest(azkabanProjectConfig.getAzkabanServerUrl() + "/manager",
+ sessionId, params)).get("projectId");
}
+ /***
+ * Creates an Azkaban project and uploads the zip file. If proxy user and group permissions are specified in
+ * Azkaban Project Config, then this method also adds it to the project configuration.
+ * @param sessionId Session Id.
+ * @param zipFilePath Zip file to upload.
+ * @param azkabanProjectConfig Azkaban Project Config.
+ * @return Project Id.
+ * @throws IOException
+ */
public static String createAzkabanProject(String sessionId, String zipFilePath,
AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
+ Map<String, String> params = Maps.newHashMap();
+ params.put("ajax", "executeFlow");
+ params.put("name", azkabanProjectConfig.getAzkabanProjectName());
+ params.put("description", azkabanProjectConfig.getAzkabanProjectDescription());
- String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
- String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
- String azkabanProjectDescription = azkabanProjectConfig.getAzkabanProjectDescription();
- String groupAdminUsers = azkabanProjectConfig.getAzkabanGroupAdminUsers();
-
- // Create post request
- HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager?action=create");
- StringEntity input = new StringEntity(String.format("session.id=%s&name=%s&description=%s", sessionId,
- azkabanProjectName, azkabanProjectDescription));
- input.setContentType("application/x-www-form-urlencoded");
- postRequest.setEntity(input);
- postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
-
- // Make the call, get response
- @Cleanup CloseableHttpClient httpClient = getHttpClient();
- HttpResponse response = httpClient.execute(postRequest);
- handleResponse(response);
+ executePostRequest(preparePostRequest(azkabanProjectConfig.getAzkabanServerUrl() +
+ "/manager?action=create", sessionId, params));
// Add proxy user if any
if (azkabanProjectConfig.getAzkabanUserToProxy().isPresent()) {
Iterable<String> proxyUsers = SPLIT_ON_COMMA.split(azkabanProjectConfig.getAzkabanUserToProxy().get());
for (String user : proxyUsers) {
- addProxyUser(sessionId, azkabanServerUrl, azkabanProjectName, user);
+ addProxyUser(sessionId, azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName(), user);
}
}
// Add group permissions if any
// TODO: Support users (not just groups), and different permission types
// (though we can add users, we only support groups at the moment and award them with admin permissions)
- if (StringUtils.isNotBlank(groupAdminUsers)) {
- String [] groups = StringUtils.split(groupAdminUsers, ",");
+ if (StringUtils.isNotBlank(azkabanProjectConfig.getAzkabanGroupAdminUsers())) {
+ String [] groups = StringUtils.split(azkabanProjectConfig.getAzkabanGroupAdminUsers(), ",");
for (String group : groups) {
- addUserPermission(sessionId, azkabanServerUrl, azkabanProjectName, group, true, true, false, false,
- false, false);
+ addUserPermission(sessionId, azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName(),
+ group, true, true, false, false,false,
+ false);
}
}
// Upload zip file to azkaban and return projectId
- return uploadZipFileToAzkaban(sessionId, azkabanServerUrl, azkabanProjectName, zipFilePath);
+ return uploadZipFileToAzkaban(sessionId, azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName(), zipFilePath);
}
+ /***
+ * Replace an existing Azkaban Project. If proxy user and group permissions are specified in
+ * Azkaban Project Config, then this method also adds it to the project configuration.
+ * @param sessionId Session Id.
+ * @param zipFilePath Zip file to upload.
+ * @param azkabanProjectConfig Azkaban Project Config.
+ * @return Project Id.
+ * @throws IOException
+ */
public static String replaceAzkabanProject(String sessionId, String zipFilePath,
AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
-
- String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
- String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
- String azkabanProjectDescription = azkabanProjectConfig.getAzkabanProjectDescription();
- String groupAdminUsers = azkabanProjectConfig.getAzkabanGroupAdminUsers();
-
// Change project description
- changeProjectDescription(sessionId, azkabanServerUrl, azkabanProjectName, azkabanProjectDescription);
+ changeProjectDescription(sessionId, azkabanProjectConfig.getAzkabanServerUrl(),
+ azkabanProjectConfig.getAzkabanProjectName(), azkabanProjectConfig.getAzkabanProjectDescription());
// Add proxy user if any
// Note: 1. We cannot remove previous proxy-user because there is no way to read it from Azkaban
@@ -166,7 +178,8 @@ public class AzkabanAjaxAPIClient {
if (azkabanProjectConfig.getAzkabanUserToProxy().isPresent()) {
Iterable<String> proxyUsers = SPLIT_ON_COMMA.split(azkabanProjectConfig.getAzkabanUserToProxy().get());
for (String user : proxyUsers) {
- addProxyUser(sessionId, azkabanServerUrl, azkabanProjectName, user);
+ addProxyUser(sessionId, azkabanProjectConfig.getAzkabanServerUrl(),
+ azkabanProjectConfig.getAzkabanProjectName(), user);
}
}
@@ -175,12 +188,13 @@ public class AzkabanAjaxAPIClient {
// Note: 1. We cannot remove previous group-user because there is no way to read it from Azkaban
// 2. Adding same group-user will return an error message, but we will ignore it
// (though we can add users, we only support groups at the moment and award them with admin permissions)
- if (StringUtils.isNotBlank(groupAdminUsers)) {
- String [] groups = StringUtils.split(groupAdminUsers, ",");
+ if (StringUtils.isNotBlank(azkabanProjectConfig.getAzkabanGroupAdminUsers())) {
+ String [] groups = StringUtils.split(azkabanProjectConfig.getAzkabanGroupAdminUsers(), ",");
for (String group : groups) {
try {
- addUserPermission(sessionId, azkabanServerUrl, azkabanProjectName, group, true, true, false, false, false,
- false);
+ addUserPermission(sessionId, azkabanProjectConfig.getAzkabanServerUrl(),
+ azkabanProjectConfig.getAzkabanProjectName(), group, true, true,
+ false, false, false,false);
} catch (IOException e) {
// Ignore if group already exists, we cannot list existing groups; so its okay to attempt adding exiting
// .. groups
@@ -192,21 +206,20 @@ public class AzkabanAjaxAPIClient {
}
// Upload zip file to azkaban and return projectId
- return uploadZipFileToAzkaban(sessionId, azkabanServerUrl, azkabanProjectName, zipFilePath);
+ return uploadZipFileToAzkaban(sessionId, azkabanProjectConfig.getAzkabanServerUrl(),
+ azkabanProjectConfig.getAzkabanProjectName(), zipFilePath);
}
private static void addProxyUser(String sessionId, String azkabanServerUrl, String azkabanProjectName,
String proxyUser)
throws IOException {
-
// Create get request (adding same proxy user multiple times is a non-issue, Azkaban handles it)
- HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=addProxyUser&session.id=%s&"
- + "project=%s&name=%s", azkabanServerUrl, sessionId, azkabanProjectName, proxyUser));
+ Map<String, String> params = Maps.newHashMap();
+ params.put("ajax", "addProxyUser");
+ params.put("project", azkabanProjectName);
+ params.put("name", proxyUser);
- // Make the call, get response
- @Cleanup CloseableHttpClient httpClient = getHttpClient();
- HttpResponse response = httpClient.execute(getRequest);
- handleResponse(response);
+ executeGetRequest(prepareGetRequest(azkabanServerUrl + "/manager", sessionId, params));
}
private static void addUserPermission(String sessionId, String azkabanServerUrl, String azkabanProjectName,
@@ -219,100 +232,155 @@ public class AzkabanAjaxAPIClient {
// Create get request (adding same normal user permission multiple times will throw an error, but we cannot
// list whole list of permissions anyways)
- HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=addPermission&session.id=%s&"
- + "project=%s&name=%s&group=%s&permissions[admin]=%s&permissions[read]=%s&permissions[write]=%s"
- + "&permissions[execute]=%s&permissions[schedule]=%s", azkabanServerUrl, sessionId, azkabanProjectName, name,
- isGroup, adminPermission, readPermission, writePermission, executePermission, schedulePermission));
-
- // Make the call, get response
- @Cleanup CloseableHttpClient httpClient = getHttpClient();
- HttpResponse response = httpClient.execute(getRequest);
- handleResponse(response);
+ Map<String, String> params = Maps.newHashMap();
+ params.put("ajax", "addPermission");
+ params.put("project", azkabanProjectName);
+ params.put("name", name);
+ params.put("group", Boolean.toString(isGroup));
+ params.put("permissions[admin]", Boolean.toString(adminPermission));
+ params.put("permissions[read]", Boolean.toString(readPermission));
+ params.put("permissions[write]", Boolean.toString(writePermission));
+ params.put("permissions[execute]", Boolean.toString(executePermission));
+ params.put("permissions[schedule]", Boolean.toString(schedulePermission));
+
+ executeGetRequest(prepareGetRequest(azkabanServerUrl + "/manager", sessionId, params));
}
- private static String uploadZipFileToAzkaban(String sessionId, String azkabanServerUrl, String azkabanProjectName,
- String jobZipFile)
+ /***
+ * Schedule the Azkaban Project to run with a schedule.
+ * @param sessionId Session Id.
+ * @param azkabanProjectId Project Id.
+ * @param azkabanProjectConfig Azkaban Project Config.
+ * @throws IOException
+ */
+ public static void scheduleAzkabanProject(String sessionId, String azkabanProjectId,
+ AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
+ Map<String, String> params = Maps.newHashMap();
+ params.put("ajax", "scheduleFlow");
+ params.put("projectName", azkabanProjectConfig.getAzkabanProjectName());
+ params.put("flow", azkabanProjectConfig.getAzkabanProjectFlowName());
+ params.put("projectId", azkabanProjectId);
+ params.put("scheduleTime", getScheduledTimeInAzkabanFormat(LOW_NETWORK_TRAFFIC_BEGIN_HOUR,
+ LOW_NETWORK_TRAFFIC_END_HOUR, JOB_START_DELAY_MINUTES));
+ params.put("scheduleDate", getScheduledDateInAzkabanFormat());
+ params.put("is_recurring", "off");
+
+ // Run once OR push down schedule (TODO: Enable when push down is finalized)
+ // if (azkabanProjectConfig.isScheduled()) {
+ // params.put("is_recurring", "on");
+ // params.put("period", "1d");
+ // } else {
+ // params.put("is_recurring", "off");
+ // }
+
+ executePostRequest(preparePostRequest(azkabanProjectConfig.getAzkabanServerUrl() + "/schedule", sessionId, params));
+ }
- // Create post request
- HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager");
- HttpEntity entity = MultipartEntityBuilder
- .create()
- .addTextBody("session.id", sessionId)
- .addTextBody("ajax", "upload")
- .addBinaryBody("file", new File(jobZipFile),
- ContentType.create("application/zip"), azkabanProjectName + ".zip")
- .addTextBody("project", azkabanProjectName)
- .build();
- postRequest.setEntity(entity);
+ private static void changeProjectDescription(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+ String projectDescription)
+ throws IOException {
+ String encodedProjectDescription;
+ try {
+ encodedProjectDescription = new URLCodec().encode(projectDescription);
+ } catch (EncoderException e) {
+ throw new IOException("Could not encode Azkaban project description", e);
+ }
- // Make the call, get response
- @Cleanup CloseableHttpClient httpClient = getHttpClient();
- HttpResponse response = httpClient.execute(postRequest);
+ Map<String, String> params = Maps.newHashMap();
+ params.put("ajax", "changeDescription");
+ params.put("project", azkabanProjectName);
+ params.put("description", encodedProjectDescription);
- // Obtaining projectId is hard. Uploading zip file is one avenue to get it from Azkaban
- return handleResponse(response, "projectId").get("projectId");
+ executeGetRequest(prepareGetRequest(azkabanServerUrl + "/manager", sessionId, params));
}
- public static void scheduleAzkabanProject(String sessionId, String azkabanProjectId,
+ /***
+ * Execute an existing Azkaban project.
+ * @param sessionId Session Id.
+ * @param azkabanProjectId Project Id.
+ * @param azkabanProjectConfig Azkaban Project Config.
+ * @throws IOException
+ */
+ public static void executeAzkabanProject(String sessionId, String azkabanProjectId,
AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
- String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
- String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
- String azkabanProjectFlowName = azkabanProjectConfig.getAzkabanProjectFlowName();
+ Map<String, String> params = Maps.newHashMap();
+ params.put("ajax", "executeFlow");
+ params.put("projectName", azkabanProjectConfig.getAzkabanProjectName());
+ params.put("flow", azkabanProjectConfig.getAzkabanProjectFlowName());
+
+ executePostRequest(preparePostRequest(azkabanProjectConfig.getAzkabanServerUrl() + "/executor", sessionId, params));
+ }
+
+ private static HttpGet prepareGetRequest(String requestUrl, String sessionId, Map<String, String> params)
+ throws IOException {
+ // Create get request
+ StringBuilder stringEntityBuilder = new StringBuilder();
+ stringEntityBuilder.append(String.format("?session.id=%s", sessionId));
+ for (Map.Entry<String, String> entry : params.entrySet()) {
+ stringEntityBuilder.append(String.format("&%s=%s", entry.getKey(), entry.getValue()));
+ }
- String scheduleString = "is_recurring=off"; // run only once
- // TODO: Enable scheduling on Azkaban, when we are ready to push down the schedule
-// if (azkabanProjectConfig.isScheduled()) {
-// scheduleString = "is_recurring=on&period=1d"; // schedule once every day
-// }
+ return new HttpGet(requestUrl + stringEntityBuilder);
+ }
+ private static HttpPost preparePostRequest(String requestUrl, String sessionId, Map<String, String> params)
+ throws IOException {
// Create post request
- HttpPost postRequest = new HttpPost(azkabanServerUrl + "/schedule");
- StringEntity input = new StringEntity(String.format("session.id=%s&ajax=scheduleFlow"
- + "&projectName=%s&flow=%s&projectId=%s&scheduleTime=%s&scheduleDate=%s&%s",
- sessionId, azkabanProjectName, azkabanProjectFlowName, azkabanProjectId,
- getScheduledTimeInAzkabanFormat(LOW_NETWORK_TRAFFIC_BEGIN_HOUR, LOW_NETWORK_TRAFFIC_END_HOUR,
- JOB_START_DELAY_MINUTES), getScheduledDateInAzkabanFormat(), scheduleString));
+ HttpPost postRequest = new HttpPost(requestUrl);
+ StringBuilder stringEntityBuilder = new StringBuilder();
+ stringEntityBuilder.append(String.format("session.id=%s", sessionId));
+ for (Map.Entry<String, String> entry : params.entrySet()) {
+ if (stringEntityBuilder.length() > 0) {
+ stringEntityBuilder.append("&");
+ }
+ stringEntityBuilder.append(String.format("%s=%s", entry.getKey(), entry.getValue()));
+ }
+ StringEntity input = new StringEntity(stringEntityBuilder.toString());
input.setContentType("application/x-www-form-urlencoded");
postRequest.setEntity(input);
postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
+ return postRequest;
+ }
+
+ private static Map<String, String> executeGetRequest(HttpGet getRequest) throws IOException {
+ // Make the call, get response
+ @Cleanup CloseableHttpClient httpClient = getHttpClient();
+ HttpResponse response = httpClient.execute(getRequest);
+ return handleResponse(response);
+ }
+
+ private static Map<String, String> executePostRequest(HttpPost postRequest) throws IOException {
// Make the call, get response
@Cleanup CloseableHttpClient httpClient = getHttpClient();
HttpResponse response = httpClient.execute(postRequest);
- handleResponse(response);
+ return handleResponse(response);
}
- private static void changeProjectDescription(String sessionId, String azkabanServerUrl, String azkabanProjectName,
- String projectDescription)
+ private static String uploadZipFileToAzkaban(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+ String jobZipFile)
throws IOException {
- HttpGet getRequest;
- try {
- // Create get request (adding same proxy user multiple times is a non-issue, Azkaban handles it)
- getRequest = new HttpGet(String
- .format("%s/manager?ajax=changeDescription&session.id=%s&" + "project=%s&description=%s", azkabanServerUrl,
- sessionId, azkabanProjectName, new URLCodec().encode(projectDescription)));
- } catch (EncoderException e) {
- throw new IOException("Could not encode Azkaban project description", e);
- }
+ // Create post request
+ HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager");
+ HttpEntity entity = MultipartEntityBuilder
+ .create()
+ .addTextBody("session.id", sessionId)
+ .addTextBody("ajax", "upload")
+ .addBinaryBody("file", new File(jobZipFile),
+ ContentType.create("application/zip"), azkabanProjectName + ".zip")
+ .addTextBody("project", azkabanProjectName)
+ .build();
+ postRequest.setEntity(entity);
// Make the call, get response
@Cleanup CloseableHttpClient httpClient = getHttpClient();
- HttpResponse response = httpClient.execute(getRequest);
- handleResponse(response);
- }
+ HttpResponse response = httpClient.execute(postRequest);
- public static void notifyUberdistcp2ToolServer(String uberdistcp2ToolServer,
- AzkabanProjectConfig azkabanProjectConfig)
- throws IOException {
- boolean isGoUrl = false;
- if (!StringUtils.isBlank(uberdistcp2ToolServer)) {
- if (uberdistcp2ToolServer.startsWith("https://go") || uberdistcp2ToolServer.startsWith("http://go")) {
- isGoUrl = true;
- }
- }
+ // Obtaining projectId is hard. Uploading zip file is one avenue to get it from Azkaban
+ return handleResponse(response, "projectId").get("projectId");
}
private static CloseableHttpClient getHttpClient()
@@ -350,11 +418,9 @@ public class AzkabanAjaxAPIClient {
// Handle error if any
handleResponseError(jsonObject);
- // Get required responseKeys
- if (ArrayUtils.isNotEmpty(responseKeys)) {
- for (String responseKey : responseKeys) {
- responseMap.put(responseKey, jsonObject.get(responseKey).toString().replaceAll("\"", ""));
- }
+ // Get all responseKeys
+ for(Map.Entry<String, JsonElement> entry : jsonObject.entrySet()) {
+ responseMap.put(entry.getKey(), entry.getValue().toString().replaceAll("\"", ""));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
index 627761e..a74a6ad 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
@@ -46,6 +46,13 @@ import com.google.common.collect.Lists;
@Slf4j
public class AzkabanJobHelper {
+ /***
+ * Checks if an Azkaban project exists by name.
+ * @param sessionId Session Id.
+ * @param azkabanProjectConfig Azkaban Project Config that contains project name.
+ * @return true if project exists else false.
+ * @throws IOException
+ */
public static boolean isAzkabanJobPresent(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
log.info("Checking if Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName() + " exists");
@@ -74,6 +81,13 @@ public class AzkabanJobHelper {
}
}
+ /***
+ * Get Project Id by an Azkaban Project Name.
+ * @param sessionId Session Id.
+ * @param azkabanProjectConfig Azkaban Project Config that contains project Name.
+ * @return Project Id.
+ * @throws IOException
+ */
public static String getProjectId(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
log.info("Getting project Id for project: " + azkabanProjectConfig.getAzkabanProjectName());
@@ -83,6 +97,14 @@ public class AzkabanJobHelper {
return projectId;
}
+ /***
+ * Create project on Azkaban based on Azkaban config. This includes preparing the zip file and uploading it to
+ * Azkaban, setting permissions and schedule.
+ * @param sessionId Session Id.
+ * @param azkabanProjectConfig Azkaban Project Config.
+ * @return Project Id.
+ * @throws IOException
+ */
public static String createAzkabanJob(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
log.info("Creating Azkaban project for: " + azkabanProjectConfig.getAzkabanProjectName());
@@ -98,6 +120,15 @@ public class AzkabanJobHelper {
return projectId;
}
+ /***
+ * Replace project on Azkaban based on Azkaban config. This includes preparing the zip file and uploading it to
+ * Azkaban, setting permissions and schedule.
+ * @param sessionId Session Id.
+ * @param azkabanProjectId Project Id.
+ * @param azkabanProjectConfig Azkaban Project Config.
+ * @return Project Id.
+ * @throws IOException
+ */
public static String replaceAzkabanJob(String sessionId, String azkabanProjectId,
AzkabanProjectConfig azkabanProjectConfig) throws IOException {
log.info("Replacing zip for Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName());
@@ -113,6 +144,13 @@ public class AzkabanJobHelper {
return projectId;
}
+ /***
+ * Schedule an already created Azkaban project.
+ * @param sessionId Session Id.
+ * @param azkabanProjectId Project Id.
+ * @param azkabanProjectConfig Azkaban Project Config that contains schedule information.
+ * @throws IOException
+ */
public static void scheduleJob(String sessionId, String azkabanProjectId,
AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
@@ -120,6 +158,13 @@ public class AzkabanJobHelper {
AzkabanAjaxAPIClient.scheduleAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig);
}
+ /***
+ * Change the schedule of an already created Azkaban project.
+ * @param sessionId Session Id.
+ * @param azkabanProjectId Project Id.
+ * @param azkabanProjectConfig Azkaban Project Config that contains schedule information.
+ * @throws IOException
+ */
public static void changeJobSchedule(String sessionId, String azkabanProjectId,
AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
@@ -127,7 +172,28 @@ public class AzkabanJobHelper {
AzkabanAjaxAPIClient.scheduleAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig);
}
- public static String createAzkabanJobZip(AzkabanProjectConfig azkabanProjectConfig)
+ /***
+ * Execute an already created Azkaban project.
+ * @param sessionId Session Id.
+ * @param azkabanProjectId Project Id.
+ * @param azkabanProjectConfig Azkaban Project Config that contains schedule information.
+ * @throws IOException
+ */
+ public static void executeJob(String sessionId, String azkabanProjectId,
+ AzkabanProjectConfig azkabanProjectConfig)
+ throws IOException {
+ log.info("Executing Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName());
+ AzkabanAjaxAPIClient.executeAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig);
+ }
+
+ /***
+ * Create Azkaban project zip file.
+ * @param azkabanProjectConfig Azkaban Project Config that contains information about what to include in
+ * zip file.
+ * @return Zip file path.
+ * @throws IOException
+ */
+ private static String createAzkabanJobZip(AzkabanProjectConfig azkabanProjectConfig)
throws IOException {
log.info("Creating Azkaban job zip file for project: " + azkabanProjectConfig.getAzkabanProjectName());
String workDir = azkabanProjectConfig.getWorkDir();
@@ -153,7 +219,7 @@ public class AzkabanJobHelper {
jobJarFile = downloadAzkabanJobJar(workDir, jobJarUrl);
filesToAdd.add(jobJarFile);
} catch (IOException e) {
- if(failIfJarNotFound) {
+ if (failIfJarNotFound) {
throw e;
}
log.warn("Could not download: " + jobJarFile);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
index ddae3d9..2bac65d 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
@@ -69,7 +69,7 @@ public class AzkabanProjectConfig {
this.azkabanProjectName = constructProjectName(jobSpec, config);
this.azkabanProjectDescription = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_DESCRIPTION_KEY);
this.azkabanProjectFlowName = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_FLOW_NAME_KEY);
- this.azkabanGroupAdminUsers = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_GROUP_ADMINS_KEY);
+ this.azkabanGroupAdminUsers = ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_GROUP_ADMINS_KEY, "");
this.azkabanUserToProxy = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_USER_TO_PROXY_KEY, null));
// Azkaban Project Zip
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
index 5471f0c..f73bc6c 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
@@ -39,7 +39,7 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
implements SpecExecutorInstanceProducer<Spec>, Closeable {
// Session Id for GaaS User
- private String sessionId;
+ private String _sessionId;
public AzkabanSpecExecutorInstanceProducer(Config config, Optional<Logger> log) {
@@ -51,7 +51,7 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
String azkabanPassword = getAzkabanPassword(config);
String azkabanServerUrl = config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY);
- sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl);
+ _sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl);
} catch (IOException | EncoderException e) {
throw new RuntimeException("Could not authenticate with Azkaban", e);
}
@@ -82,37 +82,46 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
@Override
public Future<?> addSpec(Spec addedSpec) {
// If project already exists, execute it
-
- // If project does not already exists, create and execute it
- AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) addedSpec);
try {
- _log.info("Setting up your Azkaban Project for: " + azkabanProjectConfig.getAzkabanProjectName());
-
- // Deleted project also returns true if-project-exists check, so optimistically first create the project
- // .. (it will create project if it was never created or deleted), if project exists it will fail with
- // .. appropriate exception message, catch that and run in replace project mode if force overwrite is
- // .. specified
- try {
- createNewAzkabanProject(sessionId, azkabanProjectConfig);
- } catch (IOException e) {
- if ("Project already exists.".equalsIgnoreCase(e.getMessage())) {
- if (ConfigUtils.getBoolean(((JobSpec) addedSpec).getConfig(),
- ServiceAzkabanConfigKeys.AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY, false)) {
- _log.info("Project already exists for this Spec, but force overwrite specified");
- updateExistingAzkabanProject(sessionId, azkabanProjectConfig);
+ AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) addedSpec);
+ boolean azkabanProjectExists = AzkabanJobHelper.isAzkabanJobPresent(_sessionId, azkabanProjectConfig);
+
+ // If project does not already exists, create and execute it
+ if (azkabanProjectExists) {
+ _log.info("Executing Azkaban Project: " + azkabanProjectConfig.getAzkabanProjectName());
+ AzkabanJobHelper.executeJob(_sessionId, AzkabanJobHelper.getProjectId(_sessionId, azkabanProjectConfig),
+ azkabanProjectConfig);
+ } else {
+ _log.info("Setting up Azkaban Project: " + azkabanProjectConfig.getAzkabanProjectName());
+
+ // Deleted project also returns true if-project-exists check, so optimistically first create the project
+ // .. (it will create project if it was never created or deleted), if project exists it will fail with
+ // .. appropriate exception message, catch that and run in replace project mode if force overwrite is
+ // .. specified
+ try {
+ createNewAzkabanProject(_sessionId, azkabanProjectConfig);
+ } catch (IOException e) {
+ if ("Project already exists.".equalsIgnoreCase(e.getMessage())) {
+ if (ConfigUtils.getBoolean(((JobSpec) addedSpec).getConfig(),
+ ServiceAzkabanConfigKeys.AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY, false)) {
+ _log.info("Project already exists for this Spec, but force overwrite specified");
+ updateExistingAzkabanProject(_sessionId, azkabanProjectConfig);
+ } else {
+ _log.info(String.format("Azkaban project already exists: " + "%smanager?project=%s",
+ azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName()));
+ }
} else {
- _log.info(String.format("Azkaban project already exists: " + "%smanager?project=%s",
- azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName()));
+ throw e;
}
- } else {
- throw e;
}
}
+
+
} catch (IOException e) {
throw new RuntimeException("Issue in setting up Azkaban project.", e);
}
- return null;
+ return new CompletedFuture<>(_config, null);
}
@Override
@@ -121,7 +130,7 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) updatedSpec);
try {
- updateExistingAzkabanProject(sessionId, azkabanProjectConfig);
+ updateExistingAzkabanProject(_sessionId, azkabanProjectConfig);
} catch (IOException e) {
throw new RuntimeException("Issue in setting up Azkaban project.", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gradle/scripts/dependencyDefinitions.gradle
----------------------------------------------------------------------
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index 41e1485..1e86b96 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -69,6 +69,7 @@ ext.externalDependency = [
"hiveExec": "org.apache.hive:hive-exec:" + hiveVersion + ":core",
"hiveSerDe": "org.apache.hive:hive-serde:" + hiveVersion,
"httpclient": "org.apache.httpcomponents:httpclient:4.5.2",
+ "httpmime": "org.apache.httpcomponents:httpmime:4.5.2",
"httpcore": "org.apache.httpcomponents:httpcore:4.4.4",
"httpasyncclient": "org.apache.httpcomponents:httpasyncclient:4.1.3",
"jgit":"org.eclipse.jgit:org.eclipse.jgit:4.8.0.201706111038-r",
[2/9] incubator-gobblin git commit: Azkaban Orchestrator gradle
dependencies
Posted by ab...@apache.org.
Azkaban Orchestrator gradle dependencies
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/9864f69b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/9864f69b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/9864f69b
Branch: refs/heads/master
Commit: 9864f69bd3054d8e7dced61e1d1f60544c87775a
Parents: 08e60ef
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Tue Aug 8 12:42:41 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Tue Aug 8 12:42:41 2017 -0700
----------------------------------------------------------------------
gobblin-modules/gobblin-azkaban/build.gradle | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9864f69b/gobblin-modules/gobblin-azkaban/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/build.gradle b/gobblin-modules/gobblin-azkaban/build.gradle
index d2010ba..4bebecc 100644
--- a/gobblin-modules/gobblin-azkaban/build.gradle
+++ b/gobblin-modules/gobblin-azkaban/build.gradle
@@ -33,6 +33,8 @@ dependencies {
compile externalDependency.log4j
compile externalDependency.guava
compile externalDependency.commonsLang
+ compile externalDependency.httpclient
+ compile externalDependency.httpmime
compile externalDependency.jodaTime
compile externalDependency.lombok
compile externalDependency.slf4j
[3/9] incubator-gobblin git commit: Merge branch 'master' into
service_azkaban_orchestrator
Posted by ab...@apache.org.
Merge branch 'master' into service_azkaban_orchestrator
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f423d7da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f423d7da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f423d7da
Branch: refs/heads/master
Commit: f423d7da42fe564ef67d4214bf2d31ef2bf1a32c
Parents: 9864f69 90be15f
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Wed Aug 30 00:20:42 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Aug 30 00:20:42 2017 -0700
----------------------------------------------------------------------
conf/log4j-compaction.xml | 2 +-
conf/log4j-mapreduce.xml | 4 +-
config/checkstyle/checkstyle.xml | 16 +
config/checkstyle/suppressions.xml | 10 +
.../static/js/collections/job-executions.js | 2 +-
.../extractor/CheckpointableWatermark.java | 26 +-
.../gobblin_scopes/GobblinScopeInstance.java | 3 +-
.../gobblin_scopes/GobblinScopeTypes.java | 3 +-
.../broker/gobblin_scopes/JobScopeInstance.java | 3 +-
.../gobblin_scopes/TaskScopeInstance.java | 3 +-
.../broker/iface/SubscopedBrokerBuilder.java | 3 +-
.../configuration/ConfigurationKeys.java | 29 ++
.../org/apache/gobblin/dataset/Dataset.java | 12 +-
.../apache/gobblin/dataset/DatasetsFinder.java | 1 +
.../gobblin/dataset/IterableDatasetFinder.java | 24 +
.../gobblin/dataset/PartitionableDataset.java | 61 +++
.../apache/gobblin/dataset/URNIdentified.java | 29 ++
.../URNLexicographicalComparator.java | 53 +++
.../dataset/comparators/package-info.java | 21 +
.../dataset/test/SimpleDatasetForTesting.java | 38 ++
.../test/SimpleDatasetPartitionForTesting.java | 35 ++
.../SimplePartitionableDatasetForTesting.java | 56 +++
.../test/StaticDatasetsFinderForTesting.java | 61 +++
.../org/apache/gobblin/fork/CopyHelper.java | 27 +-
.../extractor/CheckpointableWatermark.java | 29 +-
.../source/extractor/StreamingExtractor.java | 27 +-
.../source/workunit/BasicWorkUnitStream.java | 13 +-
.../apache/gobblin/writer/WatermarkStorage.java | 27 +-
.../java/com/linkedin/gobblin/TestAlias.java | 19 +-
.../gobblin/ack/HierarchicalAckableTest.java | 1 -
.../gobblin_scopes/GobblinScopesTest.java | 3 +-
.../org/apache/gobblin/fork/CopyHelperTest.java | 27 +-
.../gobblin/aws/AWSJobConfigurationManager.java | 3 +-
.../gobblin/aws/GobblinAWSClusterManager.java | 6 +-
.../gobblin/aws/GobblinAWSTaskRunner.java | 6 +-
.../gobblin/cluster/GobblinClusterManager.java | 5 +-
.../gobblin/cluster/GobblinHelixConstants.java | 26 +-
.../apache/gobblin/cluster/GobblinHelixJob.java | 45 +-
.../cluster/GobblinHelixJobLauncher.java | 17 +-
.../cluster/GobblinHelixMessagingService.java | 16 +
.../gobblin/cluster/GobblinHelixTaskDriver.java | 26 +-
.../gobblin/cluster/GobblinTaskRunner.java | 12 +-
.../apache/helix/task/GobblinJobRebalancer.java | 26 +-
.../mapreduce/MRCompactionTaskFactory.java | 16 +
.../compaction/verify/CompactionVerifier.java | 16 +
.../verify/InputRecordCountHelper.java | 16 +
.../mapreduce/RenameSourceDirectoryTest.java | 16 +
.../suite/TestCompactionSuiteFactories.java | 16 +
.../verify/PinotAuditCountVerifierTest.java | 16 +
.../FineGrainedWatermarkTrackerBenchmark.java | 19 +-
.../java/org/apache/gobblin/async/Callback.java | 16 +
.../gobblin/converter/SamplingConverter.java | 27 +-
...blinTrackingEventFlattenFilterConverter.java | 1 -
.../DefaultCheckpointableWatermark.java | 27 +-
.../limiter/LimiterConfigurationKeys.java | 16 +
.../apache/gobblin/test/AnyToJsonConverter.java | 27 +-
.../gobblin/test/AnyToStringConverter.java | 27 +-
.../org/apache/gobblin/test/TestRecord.java | 27 +-
.../writer/AcknowledgableRecordEnvelope.java | 27 +-
.../gobblin/writer/AcknowledgableWatermark.java | 27 +-
.../gobblin/writer/AsyncWriterManager.java | 27 +-
.../java/org/apache/gobblin/writer/Batch.java | 27 +-
.../apache/gobblin/writer/BatchAccumulator.java | 16 +
.../gobblin/writer/BatchAsyncDataWriter.java | 27 +-
.../gobblin/writer/BufferedAsyncDataWriter.java | 27 +-
.../gobblin/writer/BytesBoundedBatch.java | 27 +-
.../writer/FineGrainedWatermarkTracker.java | 27 +-
.../writer/FutureWrappedWriteCallback.java | 27 +-
.../gobblin/writer/GenericWriteResponse.java | 27 +-
.../writer/GenericWriteResponseWrapper.java | 27 +-
.../gobblin/writer/LastWatermarkTracker.java | 27 +-
.../writer/MultiWriterWatermarkManager.java | 27 +-
.../writer/MultiWriterWatermarkTracker.java | 27 +-
.../org/apache/gobblin/writer/RecordFuture.java | 27 +-
.../apache/gobblin/writer/RecordMetadata.java | 27 +-
.../writer/SequentialBasedBatchAccumulator.java | 27 +-
.../apache/gobblin/writer/SyncDataWriter.java | 27 +-
.../writer/TrackerBasedWatermarkManager.java | 27 +-
.../gobblin/writer/WatermarkAwareWriter.java | 27 +-
.../writer/WatermarkAwareWriterWrapper.java | 27 +-
.../apache/gobblin/writer/WatermarkManager.java | 27 +-
.../apache/gobblin/writer/WatermarkTracker.java | 27 +-
.../gobblin/writer/WatermarkTrackerFactory.java | 27 +-
.../apache/gobblin/writer/WriteResponse.java | 27 +-
.../gobblin/writer/WriteResponseFuture.java | 27 +-
.../gobblin/writer/WriteResponseMapper.java | 27 +-
.../converter/SamplingConverterTest.java | 26 +-
.../writer/FineGrainedWatermarkTrackerTest.java | 26 +-
.../writer/MultiWriterWatermarkManagerTest.java | 26 +-
.../gobblin/writer/WatermarkTrackerTest.java | 26 +-
...nElementConversionWithAvroSchemaFactory.java | 166 +++++++
.../JsonRecordAvroSchemaToAvroConverter.java | 128 ++++++
.../converter/csv/CsvToJsonConverter.java | 48 +-
.../converter/json/BytesToJsonConverter.java | 51 +++
.../gobblin/lineage/LineageException.java | 39 ++
.../org/apache/gobblin/lineage/LineageInfo.java | 234 ++++++++++
.../gobblin/publisher/BaseDataPublisher.java | 18 +-
.../publisher/HiveRegistrationPublisher.java | 36 +-
.../source/DatePartitionedDailyAvroSource.java | 19 +-
.../extractor/extract/QueryBasedSource.java | 6 +
.../source/extractor/partition/Partition.java | 16 +
.../writer/CloseOnFlushWriterWrapper.java | 15 +-
.../gobblin/writer/PartitionedDataWriter.java | 8 +-
.../gobblin/async/AsyncDataDispatcherTest.java | 16 +
.../avro/FlattenNestedKeyConverterTest.java | 16 +
...JsonRecordAvroSchemaToAvroConverterTest.java | 81 ++++
.../json/BytesToJsonConverterTest.java | 43 ++
.../string/ObjectToStringConverterTest.java | 1 -
.../apache/gobblin/lineage/LineageInfoTest.java | 160 +++++++
.../publisher/BaseDataPublisherTest.java | 32 ++
.../AvroGenericRecordAccessorTest.java | 26 +-
.../RecordAccessorProviderFactoryTest.java | 26 +-
.../security/ssl/SSLContextFactoryTest.java | 16 +
.../DatePartitionedAvroFileExtractorTest.java | 19 +-
.../extract/QueryBasedExtractorTest.java | 16 +
.../extractor/partition/PartitionerTest.java | 16 +
.../writer/CloseOnFlushWriterWrapperTest.java | 77 +++-
.../writer/MetadataWriterWrapperTest.java | 16 +
.../gobblin/writer/PartitionedWriterTest.java | 37 ++
.../resources/converter/jsonToAvroRecord.json | 13 +
.../resources/converter/jsonToAvroSchema.avsc | 50 +++
.../management/copy/OwnerAndPermission.java | 29 ++
.../converter/AbstractAvroToOrcConverter.java | 2 +
.../management/copy/CloseableFsCopySource.java | 2 +-
.../data/management/copy/CopyConfiguration.java | 5 +
.../data/management/copy/CopyEntity.java | 17 +-
.../data/management/copy/CopySource.java | 14 +-
.../copy/CopyableDatasetMetadata.java | 16 +-
.../data/management/copy/CopyableFile.java | 12 +-
.../copy/RecursiveCopyableDataset.java | 11 +-
.../management/copy/RecursivePathFinder.java | 9 +-
.../FileAwareInputStreamExtractor.java | 4 +
.../copy/hive/HivePartitionFileSet.java | 3 +-
.../copy/hive/UnpartitionedTableFileSet.java | 2 +-
.../copy/publisher/CopyDataPublisher.java | 8 +
.../replication/ConfigBasedMultiDatasets.java | 15 +-
.../replication/CopyRouteGeneratorBase.java | 7 +
.../copy/replication/DataFlowTopology.java | 10 +
.../replication/ReplicationConfiguration.java | 2 +
.../writer/FileAwareInputStreamDataWriter.java | 7 +
.../data/management/dataset/DatasetUtils.java | 9 +
.../dataset/DefaultFileSystemGlobFinder.java | 16 +
.../management/source/DatasetFinderSource.java | 141 ++++++
.../source/LoopingDatasetFinderSource.java | 226 ++++++++++
.../hive/CopyPartitionParametersTest.java | 16 +
.../copy/ConcurrentBoundedWorkUnitListTest.java | 2 +-
.../copy/CopySourcePrioritizationTest.java | 2 +-
.../data/management/copy/CopyableFileTest.java | 4 +-
.../data/management/copy/CopyableFileUtils.java | 4 +-
.../source/DatasetFinderSourceTest.java | 137 ++++++
.../source/LoopingDatasetFinderSourceTest.java | 218 ++++++++++
gobblin-docs/Powered-By.md | 15 +-
.../case-studies/Kafka-HDFS-Ingestion.md | 4 +-
.../hive/HiveMetaStoreClientFactoryTest.java | 16 +
.../gobblin/metastore/DatasetStateStore.java | 64 +++
.../gobblin/metastore/DatasetStoreDataset.java | 56 +++
.../metastore/DatasetStoreDatasetFinder.java | 93 ++++
.../apache/gobblin/metastore/FsStateStore.java | 13 +-
.../apache/gobblin/metastore/StateStore.java | 19 +
.../metadata/DatasetStateStoreEntryManager.java | 60 +++
.../metadata/StateStoreEntryManager.java | 63 +++
.../metastore/predicates/DatasetPredicate.java | 55 +++
.../predicates/StateStorePredicate.java | 43 ++
.../predicates/StoreNamePredicate.java | 44 ++
.../config/checkstyle/suppressions.xml | 10 +
.../gobblin/metrics/event/sla/SlaEventKeys.java | 1 +
...roToJsonRecordWithMetadataConverterTest.java | 16 +
.../retention/Avro2OrcStaleDatasetCleaner.java | 16 +
.../compliance/purger/HivePurgerWriterTest.java | 16 +
.../converter/AnyToCouchbaseJsonConverter.java | 26 +-
.../writer/CouchbaseEnvironmentFactory.java | 26 +-
.../AnyToCouchbaseJsonConverterTest.java | 26 +-
.../AvroStringFieldEncryptorConverter.java | 26 +-
...ordToEncryptedSerializedRecordConverter.java | 26 +-
.../StringFieldEncryptorConverter.java | 26 +-
.../AvroStringFieldEncryptorConverterTest.java | 16 +
.../crypto/GobblinEncryptionProviderTest.java | 16 +
...oEncryptedSerializedRecordConverterBase.java | 26 +-
.../apache/gobblin/crypto/GPGFileDecryptor.java | 133 +++++-
.../gobblin/crypto/GPGFileDecryptorTest.java | 61 +++
.../crypto/gpg/KeyBasedEncryptionFile.txt.gpg | Bin 0 -> 383 bytes
.../gpg/PasswordBasedEncryptionFile.txt.gpg | 2 +
.../src/test/resources/crypto/gpg/private.key | 59 +++
.../gobblin/eventhub/EventhubMetricNames.java | 16 +
.../writer/BatchedEventhubDataWriter.java | 16 +
.../writer/EventhubBatchAccumulator.java | 16 +
.../eventhub/writer/EventhubDataWriter.java | 27 +-
.../writer/EventhubDataWriterBuilder.java | 26 +-
.../eventhub/writer/EventhubRequest.java | 26 +-
.../writer/BatchedEventhubDataWriterTest.java | 26 +-
.../writer/EventhubAccumulatorTest.java | 16 +
.../eventhub/writer/EventhubBatchTest.java | 16 +
.../eventhub/writer/EventhubDataWriterTest.java | 26 +-
.../converter/AsyncHttpJoinConverter.java | 16 +
.../converter/AvroApacheHttpJoinConverter.java | 16 +
.../converter/AvroHttpJoinConverter.java | 16 +
.../gobblin/converter/AvroR2JoinConverter.java | 16 +
.../gobblin/converter/HttpJoinConverter.java | 16 +
.../gobblin/http/ApacheHttpAsyncClient.java | 16 +
.../apache/gobblin/http/ApacheHttpClient.java | 16 +
.../gobblin/http/ApacheHttpRequestBuilder.java | 16 +
.../gobblin/http/ApacheHttpResponseHandler.java | 16 +
.../gobblin/http/ApacheHttpResponseStatus.java | 16 +
.../gobblin/http/ThrottledHttpClient.java | 16 +
.../org/apache/gobblin/r2/R2ResponseStatus.java | 16 +
.../apache/gobblin/r2/R2RestRequestBuilder.java | 16 +
.../gobblin/r2/R2RestResponseHandler.java | 16 +
.../org/apache/gobblin/utils/HttpConstants.java | 16 +
.../org/apache/gobblin/utils/HttpUtils.java | 16 +
.../gobblin/writer/AvroHttpWriterBuilder.java | 16 +
.../java/org/apache/gobblin/HttpTestUtils.java | 16 +
.../org/apache/gobblin/MockGenericRecord.java | 16 +
.../http/ApacheHttpRequestBuilderTest.java | 16 +
.../apache/gobblin/r2/R2ClientFactoryTest.java | 16 +
.../gobblin/r2/R2RestRequestBuilderTest.java | 16 +
.../org/apache/gobblin/util/HttpUtilsTest.java | 16 +
.../gobblin/writer/AsyncHttpWriterTest.java | 16 +
.../KafkaSchemaRegistryConfigurationKeys.java | 3 +
.../kafka/schemareg/LiKafkaSchemaRegistry.java | 33 +-
.../writer/KafkaWriterConfigurationKeys.java | 2 +
.../gobblin/kafka/writer/KafkaWriterHelper.java | 1 +
.../metrics/kafka/KafkaAvroSchemaRegistry.java | 11 +
.../reporter/util/KafkaAvroReporterUtil.java | 75 ++++
...thMetadataToEnvelopedRecordWithMetadata.java | 26 +-
.../apache/gobblin/metadata/types/Metadata.java | 16 +
.../apache/gobblin/type/ContentTypeUtils.java | 26 +-
.../apache/gobblin/type/RecordWithMetadata.java | 26 +-
.../type/SerializedRecordWithMetadata.java | 26 +-
.../converter/MetadataConverterWrapperTest.java | 26 +-
...tadataToEnvelopedRecordWithMetadataTest.java | 26 +-
.../metadata/GlobalMetadataCollectorTest.java | 16 +
.../extractor/extract/jdbc/MysqlSource.java | 13 +
.../gobblin/source/jdbc/JdbcExtractor.java | 34 +-
.../gobblin/source/jdbc/JdbcExtractorTest.java | 12 +
.../google/AsyncIteratorWithDataSink.java | 16 +
.../GoggleIngestionConfigurationKeys.java | 16 +
.../ingestion/google/util/SchemaUtil.java | 16 +
.../local/gobblin-oozie-example-workflow.xml | 2 +-
.../config/checkstyle/suppressions.xml | 10 +
.../config/checkstyle/suppressions.xml | 10 +
.../config/checkstyle/suppressions.xml | 10 +
.../config/checkstyle/suppressions.xml | 10 +
.../config/checkstyle/suppressions.xml | 10 +
.../ThrottlingGuiceServletConfig.java | 19 +-
...adoopKerberosKeytabAuthenticationPlugin.java | 19 +-
...adoopKerberosKeytabAuthenticationPlugin.java | 14 +
.../gobblin/runtime/fork/MockTaskContext.java | 19 +-
.../main/java/gobblin/runtime/TaskState.java | 16 +
.../mapreduce/GobblinWorkUnitsInputFormat.java | 33 ++
.../runtime/CheckpointableWatermarkState.java | 26 +-
.../apache/gobblin/runtime/ExecutionModel.java | 26 +-
.../gobblin/runtime/FsDatasetStateStore.java | 84 +++-
...RegTaskStateCollectorServiceHandlerImpl.java | 54 +++
.../org/apache/gobblin/runtime/JobContext.java | 4 +-
.../gobblin/runtime/SafeDatasetCommit.java | 39 +-
.../StateStoreBasedWatermarkStorage.java | 27 +-
.../StateStoreBasedWatermarkStorageCli.java | 26 +-
.../java/org/apache/gobblin/runtime/Task.java | 17 +-
.../gobblin/runtime/TaskConfigurationKeys.java | 29 +-
.../apache/gobblin/runtime/TaskExecutor.java | 233 +++++++++-
.../runtime/TaskInstantiationException.java | 26 +-
.../runtime/TaskStateCollectorService.java | 67 ++-
.../TaskStateCollectorServiceHandler.java | 44 ++
...teCollectorServiceHiveRegHandlerFactory.java | 33 ++
.../runtime/commit/DatasetStateCommitStep.java | 4 +-
.../gobblin/runtime/crypto/DecryptCli.java | 16 +
.../gobblin/runtime/fork/AsynchronousFork.java | 19 +-
.../org/apache/gobblin/runtime/fork/Fork.java | 19 +-
.../gobblin/runtime/fork/SynchronousFork.java | 19 +-
.../FsDatasetStateStoreEntryManager.java | 51 +++
.../runtime/services/JMXReportingService.java | 13 +
.../gobblin/runtime/spec_store/FSSpecStore.java | 4 +-
.../apache/gobblin/runtime/task/FailedTask.java | 16 +
.../apache/gobblin/runtime/task/NoopTask.java | 60 +++
.../runtime/util/JobStateToJsonConverter.java | 34 +-
.../gobblin/runtime/util/StateStores.java | 34 +-
.../gobblin/scheduler/BaseGobblinJob.java | 19 +-
.../apache/gobblin/scheduler/JobScheduler.java | 80 +++-
.../runtime/FsDatasetStateStoreTest.java | 92 ++++
.../gobblin/runtime/JobBrokerInjectionTest.java | 2 +-
.../gobblin/runtime/JobLauncherTestHelper.java | 14 +-
.../gobblin/runtime/LimiterStopEventTest.java | 16 +
.../gobblin/runtime/TaskContinuousTest.java | 26 +-
.../runtime/TaskStateCollectorServiceTest.java | 14 +
.../runtime/commit/CommitSequenceTest.java | 2 +-
.../instance/hadoop/TestHadoopConfigLoader.java | 19 +-
.../FileBasedJobLockFactoryManagerTest.java | 15 +-
.../util/JobStateToJsonConverterTest.java | 38 +-
.../salesforce/SalesforceSourceTest.java | 16 +
gobblin-service/build.gradle | 1 +
.../gobblin/service/ServiceConfigKeys.java | 1 +
.../service/modules/core/GitConfigMonitor.java | 434 +++++++++++++++++++
.../modules/core/GobblinServiceManager.java | 31 ++
.../scheduler/GobblinServiceJobScheduler.java | 10 +
.../modules/core/GitConfigMonitorTest.java | 334 ++++++++++++++
.../modules/core/GobblinServiceManagerTest.java | 76 +++-
.../gobblin/GobblinLocalJobLauncherUtils.java | 16 +
.../TaskSkipErrRecordsIntegrationTest.java | 79 ++++
.../org/apache/gobblin/TestAvroConverter.java | 49 +++
.../org/apache/gobblin/TestAvroExtractor.java | 16 +
.../java/org/apache/gobblin/TestAvroSource.java | 16 +
.../WriterOutputFormatIntegrationTest.java | 16 +
.../task_skip_err_records.properties | 42 ++
.../java/org/apache/gobblin/util/AvroUtils.java | 109 +++++
.../apache/gobblin/util/DatePartitionType.java | 20 +-
.../org/apache/gobblin/util/FileListUtils.java | 50 ++-
.../org/apache/gobblin/util/HadoopUtils.java | 20 +
.../java/org/apache/gobblin/util/HostUtils.java | 16 +
.../java/org/apache/gobblin/util/JvmUtils.java | 16 +
.../java/org/apache/gobblin/util/PathUtils.java | 12 +
.../java/org/apache/gobblin/util/PortUtils.java | 20 +-
.../org/apache/gobblin/util/PullFileLoader.java | 29 +-
.../util/executors/IteratorExecutor.java | 31 ++
.../util/executors/MDCPropagatingCallable.java | 20 +-
.../MDCPropagatingExecutorService.java | 20 +-
.../util/executors/MDCPropagatingRunnable.java | 20 +-
.../MDCPropagatingScheduledExecutorService.java | 20 +-
.../util/hadoop/GobblinSequenceFileReader.java | 49 +++
.../gobblin/util/io/EmptyInputStream.java | 36 ++
.../gobblin/util/limiter/LimiterFactory.java | 3 +-
.../gobblin/util/limiter/MultiLimiter.java | 3 +-
.../gobblin/util/limiter/NoopLimiter.java | 3 +-
.../limiter/broker/SharedLimiterFactory.java | 3 +-
.../util/limiter/broker/SharedLimiterKey.java | 3 +-
.../org/apache/gobblin/util/AvroUtilsTest.java | 23 +
.../apache/gobblin/util/FileListUtilsTest.java | 104 +++++
.../org/apache/gobblin/util/PortUtilsTest.java | 20 +-
.../apache/gobblin/util/PullFileLoaderTest.java | 12 +-
.../gobblin/util/limiter/MultiLimiterTest.java | 2 +-
.../broker/SharedLimiterFactoryTest.java | 2 +-
.../pullFileLoaderTest/dir1/dir1.configuration | 19 +
.../pullFileLoaderTest/dir1/dir1.properties | 18 -
gradle/scripts/dependencyDefinitions.gradle | 1 +
gradle/scripts/testSetup.gradle | 5 +
334 files changed, 8363 insertions(+), 1487 deletions(-)
----------------------------------------------------------------------