You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/08/30 15:55:27 UTC

[1/9] incubator-gobblin git commit: Azkaban Orchestrator for GaaS

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 90be15f47 -> 9b9fec817


Azkaban Orchestrator for GaaS


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/08e60efd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/08e60efd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/08e60efd

Branch: refs/heads/master
Commit: 08e60efd328485554344b179da2a54466d84628b
Parents: f96379e
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Tue Aug 8 12:41:02 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Tue Aug 8 12:41:02 2017 -0700

----------------------------------------------------------------------
 .../service-azkaban-hello-world.template        |  24 +
 .../orchestration/AzkabanAjaxAPIClient.java     | 435 +++++++++++++++++++
 .../modules/orchestration/AzkabanJobHelper.java | 272 ++++++++++++
 .../orchestration/AzkabanProjectConfig.java     | 123 ++++++
 .../AzkabanSpecExecutorInstance.java            | 106 +++++
 .../AzkabanSpecExecutorInstanceProducer.java    | 158 +++++++
 .../orchestration/ServiceAzkabanConfigKeys.java |  38 ++
 .../main/resources/default-service-azkaban.conf |  33 ++
 .../service-azkaban-hello-world.template        |   0
 9 files changed, 1189 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-example/src/main/resources/service-azkaban-hello-world.template
----------------------------------------------------------------------
diff --git a/gobblin-example/src/main/resources/service-azkaban-hello-world.template b/gobblin-example/src/main/resources/service-azkaban-hello-world.template
new file mode 100644
index 0000000..36a7418
--- /dev/null
+++ b/gobblin-example/src/main/resources/service-azkaban-hello-world.template
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+gobblin.service.azkaban.username=<CHANGE ME>
+gobblin.service.azkaban.password=<CHANGE ME>
+gobblin.service.azkaban.server.url=<CHANGE ME>
+gobblin.service.azkaban.project.namePrefix=GobblinService_
+gobblin.service.azkaban.project.description="Gobblin Service has setup this project"
+gobblin.service.azkaban.project.flowName="GobblinServiceFlow"
+gobblin.service.azkaban.project.groupAdmins=""
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
new file mode 100644
index 0000000..31fc753
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.codec.EncoderException;
+import org.apache.commons.codec.net.URLCodec;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.entity.mime.MultipartEntityBuilder;
+import org.apache.http.impl.client.BasicCookieStore;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.TrustStrategy;
+
+import com.google.common.base.Splitter;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+
+@Slf4j
+public class AzkabanAjaxAPIClient {
+
+  private static Splitter SPLIT_ON_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
+
+  // TODO: Ensure GET call urls do not grow too big
+  private static final int LOW_NETWORK_TRAFFIC_BEGIN_HOUR = 17;
+  private static final int LOW_NETWORK_TRAFFIC_END_HOUR = 22;
+  private static final int JOB_START_DELAY_MINUTES = 5;
+  private static final long MILLISECONDS_IN_HOUR = 60 * 60 * 1000;
+  private static final URLCodec codec = new URLCodec();
+
+  public static String authenticateAndGetSessionId(String username, String password, String azkabanServerUrl)
+      throws IOException, EncoderException {
+    // Create post request
+    HttpPost postRequest = new HttpPost(azkabanServerUrl);
+    StringEntity input = new StringEntity(String.format("action=%s&username=%s&password=%s", "login",
+        username, codec.encode(password)));
+    input.setContentType("application/x-www-form-urlencoded");
+    postRequest.setEntity(input);
+    postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
+
+    // Make the call, get response
+    @Cleanup CloseableHttpClient httpClient = getHttpClient();
+    HttpResponse response = httpClient.execute(postRequest);
+
+    return handleResponse(response, "session.id").get("session.id");
+  }
+
+  public static String getProjectId(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException {
+    // Note: Every get call to Azkaban provides a projectId in response, so we have are using fetchProjectFlows call
+    // .. because it does not need any additional params other than project name
+    // Create get request
+    HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=fetchprojectflows&session.id=%s&"
+            + "project=%s", azkabanProjectConfig.getAzkabanServerUrl(), sessionId,
+        azkabanProjectConfig.getAzkabanProjectName()));
+
+    // Make the call, get response
+    @Cleanup CloseableHttpClient httpClient = getHttpClient();
+    HttpResponse response = httpClient.execute(getRequest);
+    return handleResponse(response, "projectId").get("projectId");
+  }
+
+  public static String createAzkabanProject(String sessionId, String zipFilePath,
+      AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+
+    String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
+    String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
+    String azkabanProjectDescription = azkabanProjectConfig.getAzkabanProjectDescription();
+    String groupAdminUsers = azkabanProjectConfig.getAzkabanGroupAdminUsers();
+
+    // Create post request
+    HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager?action=create");
+    StringEntity input = new StringEntity(String.format("session.id=%s&name=%s&description=%s", sessionId,
+        azkabanProjectName, azkabanProjectDescription));
+    input.setContentType("application/x-www-form-urlencoded");
+    postRequest.setEntity(input);
+    postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
+
+    // Make the call, get response
+    @Cleanup CloseableHttpClient httpClient = getHttpClient();
+    HttpResponse response = httpClient.execute(postRequest);
+    handleResponse(response);
+
+    // Add proxy user if any
+    if (azkabanProjectConfig.getAzkabanUserToProxy().isPresent()) {
+      Iterable<String> proxyUsers = SPLIT_ON_COMMA.split(azkabanProjectConfig.getAzkabanUserToProxy().get());
+      for (String user : proxyUsers) {
+        addProxyUser(sessionId, azkabanServerUrl, azkabanProjectName, user);
+      }
+    }
+
+    // Add group permissions if any
+    // TODO: Support users (not just groups), and different permission types
+    // (though we can add users, we only support groups at the moment and award them with admin permissions)
+    if (StringUtils.isNotBlank(groupAdminUsers)) {
+      String [] groups = StringUtils.split(groupAdminUsers, ",");
+      for (String group : groups) {
+        addUserPermission(sessionId, azkabanServerUrl, azkabanProjectName, group, true, true, false, false,
+            false, false);
+      }
+    }
+
+    // Upload zip file to azkaban and return projectId
+    return uploadZipFileToAzkaban(sessionId, azkabanServerUrl, azkabanProjectName, zipFilePath);
+  }
+
+  public static String replaceAzkabanProject(String sessionId, String zipFilePath,
+      AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+
+    String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
+    String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
+    String azkabanProjectDescription = azkabanProjectConfig.getAzkabanProjectDescription();
+    String groupAdminUsers = azkabanProjectConfig.getAzkabanGroupAdminUsers();
+
+    // Change project description
+    changeProjectDescription(sessionId, azkabanServerUrl, azkabanProjectName, azkabanProjectDescription);
+
+    // Add proxy user if any
+    // Note: 1. We cannot remove previous proxy-user because there is no way to read it from Azkaban
+    //       2. Adding same proxy user multiple times is a non-issue
+    // Add proxy user if any
+    if (azkabanProjectConfig.getAzkabanUserToProxy().isPresent()) {
+      Iterable<String> proxyUsers = SPLIT_ON_COMMA.split(azkabanProjectConfig.getAzkabanUserToProxy().get());
+      for (String user : proxyUsers) {
+        addProxyUser(sessionId, azkabanServerUrl, azkabanProjectName, user);
+      }
+    }
+
+    // Add group permissions if any
+    // TODO: Support users (not just groups), and different permission types
+    // Note: 1. We cannot remove previous group-user because there is no way to read it from Azkaban
+    //       2. Adding same group-user will return an error message, but we will ignore it
+    // (though we can add users, we only support groups at the moment and award them with admin permissions)
+    if (StringUtils.isNotBlank(groupAdminUsers)) {
+      String [] groups = StringUtils.split(groupAdminUsers, ",");
+      for (String group : groups) {
+        try {
+          addUserPermission(sessionId, azkabanServerUrl, azkabanProjectName, group, true, true, false, false, false,
+              false);
+        } catch (IOException e) {
+          // Ignore if group already exists, we cannot list existing groups; so its okay to attempt adding exiting
+          // .. groups
+          if (!"Group permission already exists.".equalsIgnoreCase(e.getMessage())) {
+            throw e;
+          }
+        }
+      }
+    }
+
+    // Upload zip file to azkaban and return projectId
+    return uploadZipFileToAzkaban(sessionId, azkabanServerUrl, azkabanProjectName, zipFilePath);
+  }
+
+  private static void addProxyUser(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+      String proxyUser)
+      throws IOException {
+
+    // Create get request (adding same proxy user multiple times is a non-issue, Azkaban handles it)
+    HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=addProxyUser&session.id=%s&"
+        + "project=%s&name=%s", azkabanServerUrl, sessionId, azkabanProjectName, proxyUser));
+
+    // Make the call, get response
+    @Cleanup CloseableHttpClient httpClient = getHttpClient();
+    HttpResponse response = httpClient.execute(getRequest);
+    handleResponse(response);
+  }
+
+  private static void addUserPermission(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+      String name, boolean isGroup, boolean adminPermission, boolean readPermission, boolean writePermission,
+      boolean executePermission, boolean schedulePermission)
+      throws IOException {
+
+    // NOTE: We are not listing the permissions before adding them, because Azkaban in its current state only
+    // .. returns user permissions and not group permissions
+
+    // Create get request (adding same normal user permission multiple times will throw an error, but we cannot
+    // list whole list of permissions anyways)
+    HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=addPermission&session.id=%s&"
+            + "project=%s&name=%s&group=%s&permissions[admin]=%s&permissions[read]=%s&permissions[write]=%s"
+            + "&permissions[execute]=%s&permissions[schedule]=%s", azkabanServerUrl, sessionId, azkabanProjectName, name,
+        isGroup, adminPermission, readPermission, writePermission, executePermission, schedulePermission));
+
+    // Make the call, get response
+    @Cleanup CloseableHttpClient httpClient = getHttpClient();
+    HttpResponse response = httpClient.execute(getRequest);
+    handleResponse(response);
+  }
+
+  private static String uploadZipFileToAzkaban(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+      String jobZipFile)
+      throws IOException {
+
+    // Create post request
+    HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager");
+    HttpEntity entity = MultipartEntityBuilder
+        .create()
+        .addTextBody("session.id", sessionId)
+        .addTextBody("ajax", "upload")
+        .addBinaryBody("file", new File(jobZipFile),
+            ContentType.create("application/zip"), azkabanProjectName + ".zip")
+        .addTextBody("project", azkabanProjectName)
+        .build();
+    postRequest.setEntity(entity);
+
+    // Make the call, get response
+    @Cleanup CloseableHttpClient httpClient = getHttpClient();
+    HttpResponse response = httpClient.execute(postRequest);
+
+    // Obtaining projectId is hard. Uploading zip file is one avenue to get it from Azkaban
+    return handleResponse(response, "projectId").get("projectId");
+  }
+
+  public static void scheduleAzkabanProject(String sessionId, String azkabanProjectId,
+      AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
+    String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
+    String azkabanProjectFlowName = azkabanProjectConfig.getAzkabanProjectFlowName();
+
+    String scheduleString = "is_recurring=off"; // run only once
+    // TODO: Enable scheduling on Azkaban, when we are ready to push down the schedule
+//    if (azkabanProjectConfig.isScheduled()) {
+//      scheduleString = "is_recurring=on&period=1d"; // schedule once every day
+//    }
+
+    // Create post request
+    HttpPost postRequest = new HttpPost(azkabanServerUrl + "/schedule");
+    StringEntity input = new StringEntity(String.format("session.id=%s&ajax=scheduleFlow"
+            + "&projectName=%s&flow=%s&projectId=%s&scheduleTime=%s&scheduleDate=%s&%s",
+        sessionId, azkabanProjectName, azkabanProjectFlowName, azkabanProjectId,
+        getScheduledTimeInAzkabanFormat(LOW_NETWORK_TRAFFIC_BEGIN_HOUR, LOW_NETWORK_TRAFFIC_END_HOUR,
+            JOB_START_DELAY_MINUTES), getScheduledDateInAzkabanFormat(), scheduleString));
+    input.setContentType("application/x-www-form-urlencoded");
+    postRequest.setEntity(input);
+    postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
+
+    // Make the call, get response
+    @Cleanup CloseableHttpClient httpClient = getHttpClient();
+    HttpResponse response = httpClient.execute(postRequest);
+    handleResponse(response);
+  }
+
+  private static void changeProjectDescription(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+      String projectDescription)
+      throws IOException {
+
+    HttpGet getRequest;
+    try {
+      // Create get request (adding same proxy user multiple times is a non-issue, Azkaban handles it)
+      getRequest = new HttpGet(String
+          .format("%s/manager?ajax=changeDescription&session.id=%s&" + "project=%s&description=%s", azkabanServerUrl,
+              sessionId, azkabanProjectName, new URLCodec().encode(projectDescription)));
+    } catch (EncoderException e) {
+      throw new IOException("Could not encode Azkaban project description", e);
+    }
+
+    // Make the call, get response
+    @Cleanup CloseableHttpClient httpClient = getHttpClient();
+    HttpResponse response = httpClient.execute(getRequest);
+    handleResponse(response);
+  }
+
+  public static void notifyUberdistcp2ToolServer(String uberdistcp2ToolServer,
+      AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    boolean isGoUrl = false;
+    if (!StringUtils.isBlank(uberdistcp2ToolServer)) {
+      if (uberdistcp2ToolServer.startsWith("https://go") || uberdistcp2ToolServer.startsWith("http://go")) {
+        isGoUrl = true;
+      }
+    }
+  }
+
+  private static CloseableHttpClient getHttpClient()
+      throws IOException {
+    try {
+      // Self sign SSL
+      SSLContextBuilder builder = new SSLContextBuilder();
+      builder.loadTrustMaterial(null, (TrustStrategy) new TrustSelfSignedStrategy());
+      SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(builder.build());
+
+      // Create client
+      return HttpClients.custom().setSSLSocketFactory(sslsf).setDefaultCookieStore(new BasicCookieStore()).build();
+    } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) {
+      throw new IOException("Issue with creating http client", e);
+    }
+  }
+
+  private static Map<String, String> handleResponse(HttpResponse response, String... responseKeys)
+      throws IOException {
+    if (response.getStatusLine().getStatusCode() != 201 && response.getStatusLine().getStatusCode()!= 200) {
+      log.error("Failed : HTTP error code : " + response.getStatusLine().getStatusCode());
+      throw new RuntimeException("Failed : HTTP error code : " + response.getStatusLine().getStatusCode());
+    }
+
+    // Get response in string
+    InputStream in = response.getEntity().getContent();
+    String jsonResponseString = IOUtils.toString(in, "UTF-8");
+    log.info("Response string: " + jsonResponseString);
+
+    // Parse Json
+    Map<String, String> responseMap = new HashMap<>();
+    if (StringUtils.isNotBlank(jsonResponseString)) {
+      JsonObject jsonObject = new JsonParser().parse(jsonResponseString).getAsJsonObject();
+
+      // Handle error if any
+      handleResponseError(jsonObject);
+
+      // Get required responseKeys
+      if (ArrayUtils.isNotEmpty(responseKeys)) {
+        for (String responseKey : responseKeys) {
+          responseMap.put(responseKey, jsonObject.get(responseKey).toString().replaceAll("\"", ""));
+        }
+      }
+    }
+
+    return responseMap;
+  }
+
+  private static void handleResponseError(JsonObject jsonObject) throws IOException {
+    // Azkaban does not has a standard for error messages tag
+    if (null != jsonObject.get("status") && "error".equalsIgnoreCase(jsonObject.get("status").toString()
+        .replaceAll("\"", ""))) {
+      String message = (null != jsonObject.get("message")) ?
+          jsonObject.get("message").toString().replaceAll("\"", "") : "Issue in creating project";
+      throw new IOException(message);
+    }
+
+    if (null != jsonObject.get("error")) {
+      String error = jsonObject.get("error").toString().replaceAll("\"", "");
+      throw new IOException(error);
+    }
+  }
+
+  /***
+   * Generate a random scheduled time between specified execution time window in the Azkaban compatible format
+   * which is: hh,mm,a,z Eg. ScheduleTime=12,00,PM,PDT
+   *
+   * @param windowStartHour Window start hour in 24 hr (HH) format (inclusive)
+   * @param windowEndHour Window end hour in 24 hr (HH) format (exclusive)
+   * @param delayMinutes If current time is within window, then additional delay for bootstrapping if desired
+   * @return Scheduled time string of the format hh,mm,a,z
+   */
+  public static String getScheduledTimeInAzkabanFormat(int windowStartHour, int windowEndHour, int delayMinutes) {
+    // Validate
+    if (windowStartHour < 0 || windowEndHour > 23 || windowStartHour >= windowEndHour) {
+      throw new IllegalArgumentException("Window start should be less than window end, and both should be between "
+          + "0 and 23");
+    }
+    if (delayMinutes < 0 || delayMinutes > 59) {
+      throw new IllegalArgumentException("Delay in minutes should be between 0 and 59 (inclusive)");
+    }
+
+    // Setup window
+    Calendar windowStartTime = Calendar.getInstance();
+    windowStartTime.set(Calendar.HOUR_OF_DAY, windowStartHour);
+    windowStartTime.set(Calendar.MINUTE, 0);
+    windowStartTime.set(Calendar.SECOND, 0);
+
+    Calendar windowEndTime = Calendar.getInstance();
+    windowEndTime.set(Calendar.HOUR_OF_DAY, windowEndHour);
+    windowEndTime.set(Calendar.MINUTE, 0);
+    windowEndTime.set(Calendar.SECOND, 0);
+
+    // Check if current time is between windowStartTime and windowEndTime, then let the execution happen
+    // after delayMinutes minutes
+    Calendar now = Calendar.getInstance();
+    if (now.after(windowStartTime) && now.before(windowEndTime)) {
+      // Azkaban takes a few seconds / a minute to bootstrap,
+      // so extra few minutes get the first execution to run instantly
+      now.add(Calendar.MINUTE, delayMinutes);
+
+      return new SimpleDateFormat("hh,mm,a,z").format(now.getTime());
+    }
+
+    // Current time is not between windowStartTime and windowEndTime, so get random execution time for next day
+    int allowedSchedulingWindow = (int)((windowEndTime.getTimeInMillis() - windowStartTime.getTimeInMillis()) /
+        MILLISECONDS_IN_HOUR);
+    int randomHourInWindow = new Random(System.currentTimeMillis()).nextInt(allowedSchedulingWindow);
+    int randomMinute = new Random(System.currentTimeMillis()).nextInt(60);
+    windowStartTime.add(Calendar.HOUR, randomHourInWindow);
+    windowStartTime.set(Calendar.MINUTE, randomMinute);
+
+    return new SimpleDateFormat("hh,mm,a,z").format(windowStartTime.getTime());
+  }
+
+  private static String getScheduledDateInAzkabanFormat() {
+    // Eg. ScheduleDate=07/22/2014"
+    return new SimpleDateFormat("MM/dd/yyyy").format(new Date());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
new file mode 100644
index 0000000..627761e
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.compress.archivers.ArchiveException;
+import org.apache.commons.compress.archivers.ArchiveOutputStream;
+import org.apache.commons.compress.archivers.ArchiveStreamFactory;
+import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.common.collect.Lists;
+
+
+@Slf4j
+public class AzkabanJobHelper {
+
+  public static boolean isAzkabanJobPresent(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    log.info("Checking if Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName() + " exists");
+    try {
+      // NOTE: hacky way to determine if project already exists because Azkaban does not provides a way to
+      // .. check if the project already exists or not
+      boolean isPresent = StringUtils.isNotBlank(AzkabanAjaxAPIClient.getProjectId(sessionId, azkabanProjectConfig));
+      log.info("Project exists: " + isPresent);
+
+      return isPresent;
+    } catch (IOException e) {
+      // Project doesn't exists
+      if (String.format("Project %s doesn't exist.", azkabanProjectConfig.getAzkabanProjectName())
+          .equalsIgnoreCase(e.getMessage())) {
+        log.info("Project does not exists.");
+        return false;
+      }
+      // Project exists but with no read access to current user
+      if ("Permission denied. Need READ access.".equalsIgnoreCase(e.getMessage())) {
+        log.info("Project exists, but current user does not has READ access.");
+        return true;
+      }
+      // Some other error
+      log.error("Issue in checking if project is present", e);
+      throw e;
+    }
+  }
+
+  public static String getProjectId(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    log.info("Getting project Id for project: " + azkabanProjectConfig.getAzkabanProjectName());
+    String projectId = AzkabanAjaxAPIClient.getProjectId(sessionId, azkabanProjectConfig);
+    log.info("Project id: " + projectId);
+
+    return projectId;
+  }
+
+  public static String createAzkabanJob(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    log.info("Creating Azkaban project for: " + azkabanProjectConfig.getAzkabanProjectName());
+
+    // Create zip file
+    String zipFilePath = createAzkabanJobZip(azkabanProjectConfig);
+    log.info("Zip file path: " + zipFilePath);
+
+    // Upload zip file to Azkaban
+    String projectId = AzkabanAjaxAPIClient.createAzkabanProject(sessionId, zipFilePath, azkabanProjectConfig);
+    log.info("Project Id: " + projectId);
+
+    return projectId;
+  }
+
+  public static String replaceAzkabanJob(String sessionId, String azkabanProjectId,
+      AzkabanProjectConfig azkabanProjectConfig) throws IOException {
+    log.info("Replacing zip for Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName());
+
+    // Create zip file
+    String zipFilePath = createAzkabanJobZip(azkabanProjectConfig);
+    log.info("Zip file path: " + zipFilePath);
+
+    // Replace the zip file on Azkaban
+    String projectId = AzkabanAjaxAPIClient.replaceAzkabanProject(sessionId, zipFilePath, azkabanProjectConfig);
+    log.info("Project Id: " + projectId);
+
+    return projectId;
+  }
+
+  public static void scheduleJob(String sessionId, String azkabanProjectId,
+      AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    log.info("Scheduling Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName());
+    AzkabanAjaxAPIClient.scheduleAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig);
+  }
+
+  public static void changeJobSchedule(String sessionId, String azkabanProjectId,
+      AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    log.info("Changing schedule for Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName());
+    AzkabanAjaxAPIClient.scheduleAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig);
+  }
+
+  public static String createAzkabanJobZip(AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    log.info("Creating Azkaban job zip file for project: " + azkabanProjectConfig.getAzkabanProjectName());
+    String workDir = azkabanProjectConfig.getWorkDir();
+
+    Optional<String> jarUrlTemplate = azkabanProjectConfig.getAzkabanZipJarUrlTemplate();
+    Optional<List<String>> jarNames = azkabanProjectConfig.getAzkabanZipJarNames();
+    Optional<String> jarVersion = azkabanProjectConfig.getAzkabanZipJarVersion();
+    Optional<List<String>> additionalFiles = azkabanProjectConfig.getAzkabanZipAdditionalFiles();
+    boolean failIfJarNotFound = azkabanProjectConfig.getFailIfJarNotFound();
+    String jobFlowName = azkabanProjectConfig.getAzkabanProjectFlowName();
+    String zipFilename = azkabanProjectConfig.getAzkabanProjectZipFilename();
+
+    // Download the job jars
+    List<File> filesToAdd = Lists.newArrayList();
+    if (jarNames.isPresent() && jarUrlTemplate.isPresent() && jarVersion.isPresent()) {
+      String urlTemplate = jarUrlTemplate.get();
+      String version = jarVersion.get();
+      for (String jarName : jarNames.get()) {
+        String jobJarUrl = urlTemplate.replaceAll("<module-version>", version).replaceAll("<module-name>", jarName);
+        log.info("Downloading job jar from: " + jobJarUrl + " to: " + workDir);
+        File jobJarFile = null;
+        try {
+          jobJarFile = downloadAzkabanJobJar(workDir, jobJarUrl);
+          filesToAdd.add(jobJarFile);
+        } catch (IOException e) {
+          if(failIfJarNotFound) {
+            throw e;
+          }
+          log.warn("Could not download: " + jobJarFile);
+        }
+      }
+    }
+
+    // Download additional files
+    if (additionalFiles.isPresent()) {
+      List<String> files = additionalFiles.get();
+      for (String fileName : files) {
+        log.info("Downloading additional file from: " + fileName + " to: " + workDir);
+        File additionalFile = null;
+        try {
+          additionalFile = downloadAzkabanJobJar(workDir, fileName);
+          filesToAdd.add(additionalFile);
+        } catch (IOException e) {
+          if(failIfJarNotFound) {
+            throw e;
+          }
+          log.warn("Could not download: " + additionalFile);
+        }
+      }
+    }
+
+    // Write the config files
+    log.info("Writing Azkaban config files");
+    File [] jobConfigFile = writeAzkabanConfigFiles(workDir, jobFlowName, azkabanProjectConfig);
+    filesToAdd.add(jobConfigFile[0]);
+
+    // Create the zip file
+    log.info("Writing zip file");
+    String zipfile = createZipFile(workDir, zipFilename, filesToAdd);
+    log.info("Wrote zip file: " + zipfile);
+
+    return zipfile;
+  }
+
+  private static String createZipFile(String directory, String zipFilename, List<File> filesToAdd)
+      throws IOException {
+    // Determine final zip file path
+    String zipFilePath = String.format("%s/%s", directory, zipFilename);
+    File zipFile = new File(zipFilePath);
+    zipFile.delete();
+
+    // Create and add files to zip file
+    addFilesToZip(zipFile, filesToAdd);
+
+    return zipFilePath;
+  }
+
+  private static void addFilesToZip(File zipFile, List<File> filesToAdd) throws IOException {
+    try {
+      @Cleanup
+      OutputStream archiveStream = new FileOutputStream(zipFile);
+      @Cleanup
+      ArchiveOutputStream archive =
+          new ArchiveStreamFactory().createArchiveOutputStream(ArchiveStreamFactory.ZIP, archiveStream);
+
+      for (File fileToAdd : filesToAdd) {
+        ZipArchiveEntry entry = new ZipArchiveEntry(fileToAdd.getName());
+        archive.putArchiveEntry(entry);
+
+        @Cleanup
+        BufferedInputStream input = new BufferedInputStream(new FileInputStream(fileToAdd));
+        IOUtils.copy(input, archive);
+        archive.closeArchiveEntry();
+      }
+
+      archive.finish();
+    } catch (ArchiveException e) {
+      throw new IOException("Issue with creating archive", e);
+    }
+  }
+
+  private static File[] writeAzkabanConfigFiles(String workDir, String flowName, AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    // Determine final config file path
+    String jobFilePath = String.format("%s/%s.job", workDir, flowName);
+    File jobFile = new File(jobFilePath);
+    jobFile.delete();
+
+    StringBuilder propertyFileContent = new StringBuilder();
+    for (Map.Entry entry : azkabanProjectConfig.getJobSpec().getConfigAsProperties().entrySet()) {
+      propertyFileContent.append(String.format("%s=%s", entry.getKey(), entry.getValue())).append("\n");
+    }
+
+    // Write the job file
+    FileUtils.writeStringToFile(jobFile, propertyFileContent.toString(), Charset.forName("UTF-8"),true);
+
+    return new File[] {jobFile};
+  }
+
+  private static File downloadAzkabanJobJar(String workDir, String jobJarUrl)
+      throws IOException {
+    // Determine final jar file path
+    String[] jobJarUrlParts = jobJarUrl.trim().split("/");
+    String jobJarName = jobJarUrlParts[jobJarUrlParts.length-1];
+    String jobJarFilePath = String.format("%s/%s", workDir, jobJarName);
+    File jobJarFile = new File(jobJarFilePath);
+    jobJarFile.delete();
+
+    // Create work directory if not already exists
+    FileUtils.forceMkdir(new File(workDir));
+
+    // Download jar file from artifactory
+    @Cleanup InputStream jobJarInputStream = new URL(jobJarUrl).openStream();
+    @Cleanup OutputStream jobJarOutputStream = new FileOutputStream(jobJarFile);
+    IOUtils.copy(jobJarInputStream, jobJarOutputStream);
+
+    // TODO: compare checksum
+
+    return jobJarFile;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
new file mode 100644
index 0000000..ddae3d9
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.List;
+import java.util.Optional;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
+
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.util.ConfigUtils;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+@Getter
+@ToString
+@AllArgsConstructor
+@Builder(builderMethodName = "hiddenBuilder")
+/***
+ * Class to hold Azkaban project specific configs
+ */
+public class AzkabanProjectConfig {
+  private static final String DEFAULT_AZKABAN_PROJECT_CONFIG_FILE = "default-service-azkaban.conf";
+
+  private final String azkabanServerUrl;
+
+  private final String azkabanProjectName;
+  private final String azkabanProjectDescription;
+  private final String azkabanProjectFlowName;
+  private final String azkabanGroupAdminUsers;
+  private final Optional<String> azkabanUserToProxy;
+
+  private final Optional<List<String>> azkabanZipJarNames;
+  private final Optional<String> azkabanZipJarUrlTemplate;
+  private final Optional<String> azkabanZipJarVersion;
+  private final Optional<List<String>> azkabanZipAdditionalFiles;
+  private final Boolean failIfJarNotFound;
+
+  private final JobSpec jobSpec;
+
+  public AzkabanProjectConfig(JobSpec jobSpec) {
+    // Extract config objects
+    this.jobSpec = jobSpec;
+    Config defaultConfig = ConfigFactory.load(DEFAULT_AZKABAN_PROJECT_CONFIG_FILE);
+    Config config  = jobSpec.getConfig().withFallback(defaultConfig);
+
+    // Azkaban Infrastructure
+    this.azkabanServerUrl = config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY);
+
+    // Azkaban Project Metadata
+    this.azkabanProjectName = constructProjectName(jobSpec, config);
+    this.azkabanProjectDescription = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_DESCRIPTION_KEY);
+    this.azkabanProjectFlowName = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_FLOW_NAME_KEY);
+    this.azkabanGroupAdminUsers = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_GROUP_ADMINS_KEY);
+    this.azkabanUserToProxy = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_USER_TO_PROXY_KEY, null));
+
+    // Azkaban Project Zip
+    this.azkabanZipJarNames = Optional.ofNullable(ConfigUtils.getStringList(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_JAR_NAMES_KEY));
+    this.azkabanZipJarUrlTemplate = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_JAR_URL_TEMPLATE_KEY, null));
+    this.azkabanZipJarVersion = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_JAR_VERSION_KEY, null));
+    this.azkabanZipAdditionalFiles = Optional.ofNullable(
+        ConfigUtils.getStringList(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_ADDITIONAL_FILE_URLS_KEY));
+    this.failIfJarNotFound = ConfigUtils.getBoolean(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_FAIL_IF_JARNOTFOUND_KEY, false);
+  }
+
+  private String constructProjectName(JobSpec jobSpec, Config config) {
+    String projectNamePrefix = ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_NAME_PREFIX_KEY, "");
+    String projectNamePostfix = null == jobSpec.getUri() ? "" :
+        jobSpec.getUri().toString().replaceAll("_", "-").replaceAll("[^A-Za-z0-9\\-]", "_");
+
+    return trimProjectName(String.format("%s_%s", projectNamePrefix, projectNamePostfix));
+  }
+
+  /***
+   * Get Azkaban project zip file name
+   * @return Azkaban project zip file name
+   */
+  public String getAzkabanProjectZipFilename() {
+    return String.format("%s.zip", azkabanProjectName);
+  }
+
+  /***
+   * Get Azkaban project working directory, generated by prefixing a temp name
+   * @return Azkaban project working directory
+   */
+  public String getWorkDir() {
+    return String.format("%s/%s/%s/%s", System.getProperty("user.dir"), "serviceAzkaban", azkabanProjectName, System.currentTimeMillis());
+  }
+
+  private static String trimProjectName(String projectName) {
+    // Azkaban does not support name greater than 64 chars, so limit it to 64 chars
+    if (projectName.length() > 64) {
+      // We are using string.hashcode() so that for same path the generated project name is same (and hence checking
+      // .. for path duplicates is deterministic. Using UUID or currentMillis will produce different shortened path
+      // .. for the same path every time)
+      int pathHash = projectName.hashCode();
+      if (pathHash < 0) {
+        pathHash *= -1;
+      }
+      projectName = String.format("%s_%s", projectName.substring(0, 53), pathHash);
+    }
+
+    return projectName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
new file mode 100644
index 0000000..65209c3
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.SpecExecutorInstance;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+
+public class AzkabanSpecExecutorInstance extends AbstractIdleService implements SpecExecutorInstance {
+  protected static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
+  protected static final Splitter SPLIT_BY_COLON = Splitter.on(":").omitEmptyStrings().trimResults();
+
+  // Executor Instance
+  protected final Config _config;
+  protected final Logger _log;
+  protected final URI _specExecutorInstanceUri;
+  protected final Map<String, String> _capabilities;
+
+  public AzkabanSpecExecutorInstance(Config config, Optional<Logger> log) {
+    _config = config;
+    _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
+    try {
+      _specExecutorInstanceUri = new URI(ConfigUtils.getString(config, ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
+          "NA"));
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+    _capabilities = Maps.newHashMap();
+    if (config.hasPath(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY)) {
+      String capabilitiesStr = config.getString(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY);
+      List<String> capabilities = SPLIT_BY_COMMA.splitToList(capabilitiesStr);
+      for (String capability : capabilities) {
+        List<String> currentCapability = SPLIT_BY_COLON.splitToList(capability);
+        Preconditions.checkArgument(currentCapability.size() == 2, "Only one source:destination pair is supported "
+            + "per capability, found: " + currentCapability);
+        _capabilities.put(currentCapability.get(0), currentCapability.get(1));
+      }
+    }
+  }
+
+  @Override
+  public URI getUri() {
+    return _specExecutorInstanceUri;
+  }
+
+  @Override
+  public Future<String> getDescription() {
+    return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + _specExecutorInstanceUri, null);
+  }
+
+  @Override
+  public Future<Config> getConfig() {
+    return new CompletedFuture<>(_config, null);
+  }
+
+  @Override
+  public Future<String> getHealth() {
+    return new CompletedFuture<>("Healthy", null);
+  }
+
+  @Override
+  public Future<? extends Map<String, String>> getCapabilities() {
+    return new CompletedFuture<>(_capabilities, null);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    // nothing to do in default implementation
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    // nothing to do in default implementation
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
new file mode 100644
index 0000000..f093af2
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import org.apache.commons.codec.EncoderException;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
+
+public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInstance
+    implements SpecExecutorInstanceProducer<Spec>, Closeable {
+
+  // Session Id for GaaS User
+  private String sessionId;
+
+
+  public AzkabanSpecExecutorInstanceProducer(Config config, Optional<Logger> log) {
+    super(config, log);
+
+    try {
+      // Initialize Azkaban client / producer and cache credentials
+      String azkabanUsername = config.getString(ServiceAzkabanConfigKeys.AZKABAN_USERNAME_KEY);
+      String azkabanPassword = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_KEY);
+      String azkabanServerUrl = config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY);
+
+      sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl);
+    } catch (IOException | EncoderException e) {
+      throw new RuntimeException("Could not authenticate with Azkaban", e);
+    }
+  }
+
+  public AzkabanSpecExecutorInstanceProducer(Config config, Logger log) {
+    this(config, Optional.of(log));
+  }
+
+  /** Constructor with no logging */
+  public AzkabanSpecExecutorInstanceProducer(Config config) {
+    this(config, Optional.<Logger>absent());
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+
+  @Override
+  public Future<?> addSpec(Spec addedSpec) {
+    // If project already exists, execute it
+
+    // If project does not already exists, create and execute it
+    AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) addedSpec);
+    try {
+      _log.info("Setting up your Azkaban Project for: " + azkabanProjectConfig.getAzkabanProjectName());
+
+      // Deleted project also returns true if-project-exists check, so optimistically first create the project
+      // .. (it will create project if it was never created or deleted), if project exists it will fail with
+      // .. appropriate exception message, catch that and run in replace project mode if force overwrite is
+      // .. specified
+      try {
+        createNewAzkabanProject(sessionId, azkabanProjectConfig);
+      } catch (IOException e) {
+        if ("Project already exists.".equalsIgnoreCase(e.getMessage())) {
+          if (ConfigUtils.getBoolean(((JobSpec) addedSpec).getConfig(),
+              ServiceAzkabanConfigKeys.AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY, false)) {
+            _log.info("Project already exists for this Spec, but force overwrite specified");
+            updateExistingAzkabanProject(sessionId, azkabanProjectConfig);
+          } else {
+            _log.info(String.format("Azkaban project already exists: " + "%smanager?project=%s",
+                azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName()));
+          }
+        } else {
+          throw e;
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Issue in setting up Azkaban project.", e);
+    }
+
+    return null;
+  }
+
+  @Override
+  public Future<?> updateSpec(Spec updatedSpec) {
+    // Re-create project
+    AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) updatedSpec);
+
+    try {
+      updateExistingAzkabanProject(sessionId, azkabanProjectConfig);
+    } catch (IOException e) {
+      throw new RuntimeException("Issue in setting up Azkaban project.", e);
+    }
+
+    return new CompletedFuture<>(_config, null);
+  }
+
+  @Override
+  public Future<?> deleteSpec(URI deletedSpecURI) {
+    // Delete project
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Future<? extends List<Spec>> listSpecs() {
+    throw new UnsupportedOperationException();
+  }
+
+  private void createNewAzkabanProject(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException {
+    // Create Azkaban Job
+    String azkabanProjectId = AzkabanJobHelper.createAzkabanJob(sessionId, azkabanProjectConfig);
+
+    // Schedule Azkaban Job
+    AzkabanJobHelper.scheduleJob(sessionId, azkabanProjectId, azkabanProjectConfig);
+
+    _log.info(String.format("Azkaban project created: %smanager?project=%s",
+        azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName()));
+  }
+
+  private void updateExistingAzkabanProject(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException {
+    _log.info(String.format("Updating project: %smanager?project=%s", azkabanProjectConfig.getAzkabanServerUrl(),
+        azkabanProjectConfig.getAzkabanProjectName()));
+
+    // Get project Id
+    String azkabanProjectId = AzkabanJobHelper.getProjectId(sessionId, azkabanProjectConfig);
+
+    // Replace Azkaban Job
+    AzkabanJobHelper.replaceAzkabanJob(sessionId, azkabanProjectId, azkabanProjectConfig);
+
+    // Change schedule
+    AzkabanJobHelper.changeJobSchedule(sessionId, azkabanProjectId, azkabanProjectConfig);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
new file mode 100644
index 0000000..762561c
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+public class ServiceAzkabanConfigKeys {
+  public static final String GOBBLIN_SERVICE_AZKABAN_PREFIX = "gobblin.service.azkaban.";
+
+  // Azkaban Session Specifics
+  public static final String AZKABAN_USERNAME_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "username";
+  public static final String AZKABAN_PASSWORD_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "password";
+  public static final String AZKABAN_SERVER_URL_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "server.url";
+  public static final String AZKABAN_PROJECT_NAME_PREFIX_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.namePrefix";
+  public static final String AZKABAN_PROJECT_DESCRIPTION_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.description";
+  public static final String AZKABAN_PROJECT_USER_TO_PROXY_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.userToProxy";
+  public static final String AZKABAN_PROJECT_FLOW_NAME_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.flowName";
+  public static final String AZKABAN_PROJECT_GROUP_ADMINS_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.groupAdmins";
+  public static final String AZKABAN_PROJECT_ZIP_JAR_URL_TEMPLATE_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.jarUrlTemplate";
+  public static final String AZKABAN_PROJECT_ZIP_JAR_NAMES_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.jarNames";
+  public static final String AZKABAN_PROJECT_ZIP_JAR_VERSION_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.jarVersion";
+  public static final String AZKABAN_PROJECT_ZIP_FAIL_IF_JARNOTFOUND_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.failIfJarNotFound";
+  public static final String AZKABAN_PROJECT_ZIP_ADDITIONAL_FILE_URLS_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.additionalFilesUrl";
+  public static final String AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.overwriteIfExists";
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf b/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
new file mode 100644
index 0000000..01a26f1
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Default values
+# These values should generally come from template being used by the Service
+gobblin.service.azkaban.project.namePrefix=GobblinService_
+gobblin.service.azkaban.project.description="Gobblin Service has setup this project"
+gobblin.service.azkaban.project.flowName="GobblinServiceFlow"
+
+gobblin.service.azkaban.project.zip.jarUrlTemplate=${gobblin.service.azkaban.project.job.jar.mavenUrlTemplate}
+gobblin.service.azkaban.project.zip.jarNames=${gobblin.service.azkaban.project.job.jar.mavenUrlTemplateModules}
+gobblin.service.azkaban.project.zip.jarVersion=${gobblin.service.azkaban.project.job.jar.mavenGobblinVersion}
+
+gobblin.service.azkaban.project.zip.failIfJarNotFound=false
+gobblin.service.azkaban.project.zip.additionalFilesUrl=""
+
+gobblin.service.azkaban.project.job.jar.mavenUrlTemplate="https://repo.maven.apache.org/maven2/com/linkedin/gobblin/<module-name>/<module-version>/<module-name>-<module-version>.jar"
+gobblin.service.azkaban.project.job.jar.mavenUrlTemplateModules="gobblin-admin,gobblin-api,gobblin-compaction,gobblin-config-management,gobblin-core,gobblin-core-base,gobblin-distribution,gobblin-example,gobblin-hive-registration,gobblin-metrics-libs,gobblin-metastore,gobblin-modules,gobblin-rest-service,gobblin-runtime,gobblin-runtime-hadoop,gobblin-utility,gobblin-salesforce,gobblin-test-harness,gobblin-tunnel,gobblin-data-management,gobblin-config-management,gobblin-audit,gobblin-yarn,gobblin-cluster,gobblin-aws,gobblin-service,gobblin-test-utils"
+gobblin.service.azkaban.project.job.jar.mavenGobblinVersion="0.11.0"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-runtime/src/main/resources/templates/service-azkaban-hello-world.template
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/resources/templates/service-azkaban-hello-world.template b/gobblin-runtime/src/main/resources/templates/service-azkaban-hello-world.template
new file mode 100644
index 0000000..e69de29


[9/9] incubator-gobblin git commit: Merge pull request #2085 from abti/service_azkaban_orchestrator

Posted by ab...@apache.org.
Merge pull request #2085 from abti/service_azkaban_orchestrator


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/9b9fec81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/9b9fec81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/9b9fec81

Branch: refs/heads/master
Commit: 9b9fec8176007ff985a69639dd058035606c93fc
Parents: 90be15f b0c96ac
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Wed Aug 30 08:55:18 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Aug 30 08:55:18 2017 -0700

----------------------------------------------------------------------
 conf/service/log4j-cluster.properties           |  27 -
 conf/service/log4j-service.properties           |  27 +
 .../service-azkaban-hello-world.template        |  24 +
 gobblin-modules/gobblin-azkaban/build.gradle    |   5 +
 .../orchestration/AzkabanAjaxAPIClient.java     | 507 +++++++++++++++++++
 .../modules/orchestration/AzkabanJobHelper.java | 362 +++++++++++++
 .../orchestration/AzkabanProjectConfig.java     | 128 +++++
 .../AzkabanSpecExecutorInstance.java            | 108 ++++
 .../AzkabanSpecExecutorInstanceProducer.java    | 176 +++++++
 .../orchestration/ServiceAzkabanConfigKeys.java |  42 ++
 .../main/resources/default-service-azkaban.conf |  41 ++
 .../orchestration/AzkabanAjaxAPIClientTest.java |  97 ++++
 .../orchestration/AzkabanProjectConfigTest.java | 109 ++++
 .../src/test/resources/reference.conf           |  15 +
 .../service-azkaban-hello-world.template        |   0
 .../gobblin/service/ServiceConfigKeys.java      |   2 +
 .../modules/core/GobblinServiceManager.java     |   1 +
 gradle/scripts/dependencyDefinitions.gradle     |   1 +
 18 files changed, 1645 insertions(+), 27 deletions(-)
----------------------------------------------------------------------



[7/9] incubator-gobblin git commit: Tests for Azkaban Orchestrator

Posted by ab...@apache.org.
Tests for Azkaban Orchestrator


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ee3e5481
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ee3e5481
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ee3e5481

Branch: refs/heads/master
Commit: ee3e5481a1ef538fd5d5610e30254cb1fc80cad1
Parents: 0bb5139
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Wed Aug 30 04:43:20 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Aug 30 04:43:20 2017 -0700

----------------------------------------------------------------------
 gobblin-modules/gobblin-azkaban/build.gradle    |   3 +
 .../orchestration/AzkabanAjaxAPIClient.java     |   7 +-
 .../orchestration/AzkabanProjectConfig.java     |  11 +-
 .../orchestration/AzkabanAjaxAPIClientTest.java |  97 +++++++++++++++++
 .../orchestration/AzkabanProjectConfigTest.java | 109 +++++++++++++++++++
 .../src/test/resources/reference.conf           |  15 +++
 6 files changed, 238 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee3e5481/gobblin-modules/gobblin-azkaban/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/build.gradle b/gobblin-modules/gobblin-azkaban/build.gradle
index 4bebecc..2f1dde6 100644
--- a/gobblin-modules/gobblin-azkaban/build.gradle
+++ b/gobblin-modules/gobblin-azkaban/build.gradle
@@ -41,6 +41,9 @@ dependencies {
   compile externalDependency.typesafeConfig
   compile externalDependency.hadoopYarnApi
   compile externalDependency.findBugsAnnotations
+
+  testCompile externalDependency.mockito
+  testCompile externalDependency.powermock
 }
 
 test {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee3e5481/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
index d0b8471..90bf005 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
@@ -51,6 +51,7 @@ import org.apache.http.impl.client.HttpClients;
 import org.apache.http.ssl.SSLContextBuilder;
 import org.apache.http.ssl.TrustStrategy;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Maps;
 import com.google.gson.JsonElement;
@@ -345,14 +346,16 @@ public class AzkabanAjaxAPIClient {
     return postRequest;
   }
 
-  private static Map<String, String> executeGetRequest(HttpGet getRequest) throws IOException {
+  @VisibleForTesting
+  protected static Map<String, String> executeGetRequest(HttpGet getRequest) throws IOException {
     // Make the call, get response
     @Cleanup CloseableHttpClient httpClient = getHttpClient();
     HttpResponse response = httpClient.execute(getRequest);
     return handleResponse(response);
   }
 
-  private static Map<String, String> executePostRequest(HttpPost postRequest) throws IOException {
+  @VisibleForTesting
+  protected static Map<String, String> executePostRequest(HttpPost postRequest) throws IOException {
     // Make the call, get response
     @Cleanup CloseableHttpClient httpClient = getHttpClient();
     HttpResponse response = httpClient.execute(postRequest);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee3e5481/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
index 2bac65d..b99683d 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
@@ -24,6 +24,7 @@ import lombok.Builder;
 import lombok.Getter;
 import lombok.ToString;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.util.ConfigUtils;
 
@@ -76,8 +77,14 @@ public class AzkabanProjectConfig {
     this.azkabanZipJarNames = Optional.ofNullable(ConfigUtils.getStringList(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_JAR_NAMES_KEY));
     this.azkabanZipJarUrlTemplate = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_JAR_URL_TEMPLATE_KEY, null));
     this.azkabanZipJarVersion = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_JAR_VERSION_KEY, null));
-    this.azkabanZipAdditionalFiles = Optional.ofNullable(
-        ConfigUtils.getStringList(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_ADDITIONAL_FILE_URLS_KEY));
+    if (config.hasPath(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_ADDITIONAL_FILE_URLS_KEY) &&
+        StringUtils.isNotBlank(config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_ADDITIONAL_FILE_URLS_KEY))) {
+      this.azkabanZipAdditionalFiles = Optional.ofNullable(
+          ConfigUtils.getStringList(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_ADDITIONAL_FILE_URLS_KEY));
+    } else {
+      this.azkabanZipAdditionalFiles = Optional.empty();
+    }
+
     this.failIfJarNotFound = ConfigUtils.getBoolean(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_FAIL_IF_JARNOTFOUND_KEY, false);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee3e5481/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClientTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClientTest.java b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClientTest.java
new file mode 100644
index 0000000..d7dda91
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClientTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Slf4j
+@Test(groups = { "org.apache.gobblin.service.modules.orchestration" })
+public class AzkabanAjaxAPIClientTest {
+
+  @Test
+  public void testCurrentTimeWithinWindow()
+      throws ParseException {
+    // Current hour
+    Calendar now = Calendar.getInstance();
+    int currentHour = now.get(Calendar.HOUR_OF_DAY);
+
+    // Generate a window encapsulating the current time
+    int windowStartInHours = currentHour < 2 ? 24 + (currentHour - 2) : currentHour - 2;
+    int windowEndInHours = (currentHour + 3) % 24;
+    int delayInMinutes = 0;
+
+    // Get computed scheduled time
+    String outputScheduledString =
+        AzkabanAjaxAPIClient.getScheduledTimeInAzkabanFormat(windowStartInHours, windowEndInHours, delayInMinutes);
+
+    // Verify that output schedule time is within window
+    Assert.assertTrue(isWithinWindow(windowStartInHours, windowEndInHours, outputScheduledString));
+  }
+
+  @Test
+  public void testCurrentTimeOutsideWindow()
+      throws ParseException {
+    // Current hour
+    Calendar now = Calendar.getInstance();
+    int currentHour = now.get(Calendar.HOUR_OF_DAY);
+
+    // Generate a window NOT encapsulating the current time
+    int windowStartInHours = currentHour > 10 ? 1 : 11;
+    int windowEndInHours = currentHour > 10 ? 6 : 16;
+    int delayInMinutes = 0;
+
+    // Get computed scheduled time
+    String outputScheduledString =
+        AzkabanAjaxAPIClient.getScheduledTimeInAzkabanFormat(windowStartInHours, windowEndInHours, delayInMinutes);
+
+    // Verify that output schedule time is within window
+    Assert.assertTrue(isWithinWindow(windowStartInHours, windowEndInHours, outputScheduledString));
+  }
+
+  private boolean isWithinWindow(int windowStartInHours, int windowEndInHours, String outputScheduledString)
+      throws ParseException {
+    Calendar windowStart = Calendar.getInstance();
+    windowStart.set(Calendar.HOUR_OF_DAY, windowStartInHours);
+    windowStart.set(Calendar.MINUTE, 0);
+    windowStart.set(Calendar.SECOND, 0);
+
+    Calendar windowEnd = Calendar.getInstance();
+    windowEnd.set(Calendar.HOUR_OF_DAY, windowEndInHours);
+    windowEnd.set(Calendar.MINUTE, 0);
+    windowEnd.set(Calendar.SECOND, 0);
+
+    Date outputDate = new SimpleDateFormat("hh,mm,a,z").parse(outputScheduledString);
+    Calendar receivedTime = Calendar.getInstance();
+    receivedTime.set(Calendar.HOUR_OF_DAY, Integer.parseInt(new SimpleDateFormat("HH").format(outputDate)));
+    receivedTime.set(Calendar.MINUTE, Integer.parseInt(new SimpleDateFormat("mm").format(outputDate)));
+
+    log.info("Window start time is: " + new SimpleDateFormat("MM/dd/yyyy hh,mm,a,z").format(windowStart.getTime()));
+    log.info("Window end time is: " + new SimpleDateFormat("MM/dd/yyyy hh,mm,a,z").format(windowEnd.getTime()));
+    log.info("Output time is: " + new SimpleDateFormat("MM/dd/yyyy hh,mm,a,z").format(receivedTime.getTime()));
+
+    return receivedTime.after(windowStart) && receivedTime.before(windowEnd);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee3e5481/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
new file mode 100644
index 0000000..9e189ab
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.Properties;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.util.ConfigUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+
+
+@Slf4j
+@Test(groups = { "org.apache.gobblin.service.modules.orchestration" })
+public class AzkabanProjectConfigTest {
+
+  @Test
+  public void testProjectNameDefault() throws Exception {
+    String expectedProjectName = "GobblinService__uri";
+
+    Properties properties = new Properties();
+    JobSpec jobSpec = new JobSpec(new URI("uri"), "0.0", "test job spec",
+        ConfigUtils.propertiesToConfig(properties), properties, Optional.absent());
+    AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
+
+    String actualProjectName = azkabanProjectConfig.getAzkabanProjectName();
+
+    Assert.assertEquals(actualProjectName, expectedProjectName);
+  }
+
+  @Test
+  public void testProjectNameWithConfig() throws Exception {
+    String expectedProjectName = "randomPrefix_http___localhost_8000_context";
+
+    Properties properties = new Properties();
+    properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefix");
+    JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context"), "0.0", "test job spec",
+        ConfigUtils.propertiesToConfig(properties), properties, Optional.absent());
+    AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
+
+    String actualProjectName = azkabanProjectConfig.getAzkabanProjectName();
+
+    Assert.assertEquals(actualProjectName, expectedProjectName);
+  }
+
+  @Test
+  public void testProjectNameWithReallyLongName() throws Exception {
+    String expectedProjectName = "randomPrefixWithReallyLongName_http___localhost_8000__55490420";
+
+    Properties properties = new Properties();
+    properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefixWithReallyLongName");
+    JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context/that-keeps-expanding-and-explanding"),
+        "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent());
+    AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
+
+    String actualProjectName = azkabanProjectConfig.getAzkabanProjectName();
+
+    Assert.assertEquals(actualProjectName, expectedProjectName);
+  }
+
+  @Test
+  public void testProjectZipFileName() throws Exception {
+    String expectedZipFileName = "randomPrefix_http___localhost_8000_context.zip";
+
+    Properties properties = new Properties();
+    properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefix");
+    JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context"), "0.0", "test job spec",
+        ConfigUtils.propertiesToConfig(properties), properties, Optional.absent());
+    AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
+
+    String actualZipFileName = azkabanProjectConfig.getAzkabanProjectZipFilename();
+
+    Assert.assertEquals(actualZipFileName, expectedZipFileName);
+  }
+
+  @Test
+  public void testProjectZipFileNameForLongName() throws Exception {
+    String expectedZipFileName = "randomPrefixWithReallyLongName_http___localhost_8000__55490420.zip";
+
+    Properties properties = new Properties();
+    properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefixWithReallyLongName");
+    JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context/that-keeps-expanding-and-explanding"),
+        "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent());
+    AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
+
+    String actualZipFileName = azkabanProjectConfig.getAzkabanProjectZipFilename();
+
+    Assert.assertEquals(actualZipFileName, expectedZipFileName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee3e5481/gobblin-modules/gobblin-azkaban/src/test/resources/reference.conf
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/test/resources/reference.conf b/gobblin-modules/gobblin-azkaban/src/test/resources/reference.conf
new file mode 100644
index 0000000..936b8a1
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/test/resources/reference.conf
@@ -0,0 +1,15 @@
+# Sample configuration properties with default values
+
+# Cluster configuration properties
+gobblin.cluster.app.name=GobblinStandaloneCluster
+gobblin.cluster.email.notification.on.shutdown=false
+gobblin.cluster.helix.instance.max.retries=2
+gobblin.cluster.work.dir=/tmp/gobblin-cluster
+
+# Helix/Zookeeper configuration properties
+gobblin.cluster.helix.cluster.name=GobblinStandaloneCluster
+gobblin.cluster.zk.connection.string="localhost:2181"
+
+fs.uri="file:///"
+
+job.execinfo.server.enabled=false


[8/9] incubator-gobblin git commit: Azkaban orchestrator findbugs fix, addition of default config fallback, updated config values

Posted by ab...@apache.org.
Azkaban orchestrator findbugs fix, addition of default config fallback, updated config values


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b0c96aca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b0c96aca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b0c96aca

Branch: refs/heads/master
Commit: b0c96acabf9056b8caa8e565f737be989f449056
Parents: ee3e548
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Wed Aug 30 08:07:38 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Aug 30 08:07:38 2017 -0700

----------------------------------------------------------------------
 conf/service/log4j-cluster.properties           | 27 ------------------
 conf/service/log4j-service.properties           | 27 ++++++++++++++++++
 .../orchestration/AzkabanAjaxAPIClient.java     |  5 +++-
 .../modules/orchestration/AzkabanJobHelper.java | 30 ++++++++++++++++++--
 .../orchestration/AzkabanProjectConfig.java     |  4 +--
 .../AzkabanSpecExecutorInstance.java            |  4 ++-
 .../AzkabanSpecExecutorInstanceProducer.java    |  6 ++--
 .../orchestration/ServiceAzkabanConfigKeys.java |  1 +
 .../main/resources/default-service-azkaban.conf |  2 +-
 .../gobblin/service/ServiceConfigKeys.java      |  2 ++
 .../modules/core/GobblinServiceManager.java     |  1 +
 11 files changed, 70 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/conf/service/log4j-cluster.properties
----------------------------------------------------------------------
diff --git a/conf/service/log4j-cluster.properties b/conf/service/log4j-cluster.properties
deleted file mode 100755
index a7ffb68..0000000
--- a/conf/service/log4j-cluster.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-#   Licensed under the Apache License, Version 2.0 (the "License");
-#   you may not use this file except in compliance with the License.
-#   You may obtain a copy of the License at
-#
-#       http://www.apache.org/licenses/LICENSE-2.0
-#
-#   Unless required by applicable law or agreed to in writing, software
-#   distributed under the License is distributed on an "AS IS" BASIS,
-#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#   See the License for the specific language governing permissions and
-#   limitations under the License.
-
-# log4j configuration used during build and unit tests
-
-log4j.rootLogger=info,stdout
-log4j.threshhold=ALL
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss z} %-5p [%t] %C %X{tableName} - %m%n
-
-# Suppressed loggers
-log4j.logger.org.apache.helix.controller.GenericHelixController=ERROR
-log4j.logger.org.apache.helix.controller.stages=ERROR
-log4j.logger.org.apache.helix.controller.strategy.AutoRebalanceStrategy=ERROR
-log4j.logger.org.apache.helix.manager.zk=ERROR
-log4j.logger.org.apache.helix.monitoring.mbeans.ClusterStatusMonitor=ERROR
-log4j.logger.org.apache.helix.store.zk.AutoFallbackPropertyStore=ERROR
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/conf/service/log4j-service.properties
----------------------------------------------------------------------
diff --git a/conf/service/log4j-service.properties b/conf/service/log4j-service.properties
new file mode 100755
index 0000000..a7ffb68
--- /dev/null
+++ b/conf/service/log4j-service.properties
@@ -0,0 +1,27 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshhold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss z} %-5p [%t] %C %X{tableName} - %m%n
+
+# Suppressed loggers
+log4j.logger.org.apache.helix.controller.GenericHelixController=ERROR
+log4j.logger.org.apache.helix.controller.stages=ERROR
+log4j.logger.org.apache.helix.controller.strategy.AutoRebalanceStrategy=ERROR
+log4j.logger.org.apache.helix.manager.zk=ERROR
+log4j.logger.org.apache.helix.monitoring.mbeans.ClusterStatusMonitor=ERROR
+log4j.logger.org.apache.helix.store.zk.AutoFallbackPropertyStore=ERROR
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
index 90bf005..9c3cee4 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
@@ -308,7 +308,7 @@ public class AzkabanAjaxAPIClient {
       throws IOException {
     Map<String, String> params = Maps.newHashMap();
     params.put("ajax", "executeFlow");
-    params.put("projectName", azkabanProjectConfig.getAzkabanProjectName());
+    params.put("project", azkabanProjectConfig.getAzkabanProjectName());
     params.put("flow", azkabanProjectConfig.getAzkabanProjectFlowName());
 
     executePostRequest(preparePostRequest(azkabanProjectConfig.getAzkabanServerUrl() + "/executor", sessionId, params));
@@ -454,6 +454,9 @@ public class AzkabanAjaxAPIClient {
    * @param delayMinutes If current time is within window, then additional delay for bootstrapping if desired
    * @return Scheduled time string of the format hh,mm,a,z
    */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+      value = "DMI_RANDOM_USED_ONLY_ONCE",
+      justification = "As expected for randomization")
   public static String getScheduledTimeInAzkabanFormat(int windowStartHour, int windowEndHour, int delayMinutes) {
     // Validate
     if (windowStartHour < 0 || windowEndHour > 23 || windowStartHour >= windowEndHour) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
index a74a6ad..4fbe32b 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
@@ -263,7 +263,13 @@ public class AzkabanJobHelper {
     // Determine final zip file path
     String zipFilePath = String.format("%s/%s", directory, zipFilename);
     File zipFile = new File(zipFilePath);
-    zipFile.delete();
+    if (zipFile.exists()) {
+      if (zipFile.delete()) {
+        log.info("Zipfile existed and was deleted: " + zipFilePath);
+      } else {
+        log.warn("Zipfile exists but was not deleted: " + zipFilePath);
+      }
+    }
 
     // Create and add files to zip file
     addFilesToZip(zipFile, filesToAdd);
@@ -271,6 +277,9 @@ public class AzkabanJobHelper {
     return zipFilePath;
   }
 
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+      value = "OBL_UNSATISFIED_OBLIGATION",
+      justification = "Lombok construct of @Cleanup is handing this, but not detected by FindBugs")
   private static void addFilesToZip(File zipFile, List<File> filesToAdd) throws IOException {
     try {
       @Cleanup
@@ -300,7 +309,13 @@ public class AzkabanJobHelper {
     // Determine final config file path
     String jobFilePath = String.format("%s/%s.job", workDir, flowName);
     File jobFile = new File(jobFilePath);
-    jobFile.delete();
+    if (jobFile.exists()) {
+      if (jobFile.delete()) {
+        log.info("JobFile existed and was deleted: " + jobFilePath);
+      } else {
+        log.warn("JobFile exists but was not deleted: " + jobFilePath);
+      }
+    }
 
     StringBuilder propertyFileContent = new StringBuilder();
     for (Map.Entry entry : azkabanProjectConfig.getJobSpec().getConfigAsProperties().entrySet()) {
@@ -313,6 +328,9 @@ public class AzkabanJobHelper {
     return new File[] {jobFile};
   }
 
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+      value = "OBL_UNSATISFIED_OBLIGATION",
+      justification = "Lombok construct of @Cleanup is handing this, but not detected by FindBugs")
   private static File downloadAzkabanJobJar(String workDir, String jobJarUrl)
       throws IOException {
     // Determine final jar file path
@@ -320,7 +338,13 @@ public class AzkabanJobHelper {
     String jobJarName = jobJarUrlParts[jobJarUrlParts.length-1];
     String jobJarFilePath = String.format("%s/%s", workDir, jobJarName);
     File jobJarFile = new File(jobJarFilePath);
-    jobJarFile.delete();
+    if (jobJarFile.exists()) {
+      if (jobJarFile.delete()) {
+      log.info("JobJarFilePath existed and was deleted: " + jobJarFilePath);
+    } else {
+        log.warn("JobJarFilePath exists but was not deleted: " + jobJarFilePath);
+      }
+    }
 
     // Create work directory if not already exists
     FileUtils.forceMkdir(new File(workDir));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
index b99683d..583988b 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
@@ -39,8 +39,6 @@ import com.typesafe.config.ConfigFactory;
  * Class to hold Azkaban project specific configs
  */
 public class AzkabanProjectConfig {
-  private static final String DEFAULT_AZKABAN_PROJECT_CONFIG_FILE = "default-service-azkaban.conf";
-
   private final String azkabanServerUrl;
 
   private final String azkabanProjectName;
@@ -60,7 +58,7 @@ public class AzkabanProjectConfig {
   public AzkabanProjectConfig(JobSpec jobSpec) {
     // Extract config objects
     this.jobSpec = jobSpec;
-    Config defaultConfig = ConfigFactory.load(DEFAULT_AZKABAN_PROJECT_CONFIG_FILE);
+    Config defaultConfig = ConfigFactory.load(ServiceAzkabanConfigKeys.DEFAULT_AZKABAN_PROJECT_CONFIG_FILE);
     Config config  = jobSpec.getConfig().withFallback(defaultConfig);
 
     // Azkaban Infrastructure

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
index 65209c3..dcc89cc 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
@@ -35,6 +35,7 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
 
 public class AzkabanSpecExecutorInstance extends AbstractIdleService implements SpecExecutorInstance {
@@ -48,7 +49,8 @@ public class AzkabanSpecExecutorInstance extends AbstractIdleService implements
   protected final Map<String, String> _capabilities;
 
   public AzkabanSpecExecutorInstance(Config config, Optional<Logger> log) {
-    _config = config;
+    Config defaultConfig = ConfigFactory.load(ServiceAzkabanConfigKeys.DEFAULT_AZKABAN_PROJECT_CONFIG_FILE);
+    _config = config.withFallback(defaultConfig);
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
     try {
       _specExecutorInstanceUri = new URI(ConfigUtils.getString(config, ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
index f73bc6c..47df250 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
@@ -47,9 +47,9 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
 
     try {
       // Initialize Azkaban client / producer and cache credentials
-      String azkabanUsername = config.getString(ServiceAzkabanConfigKeys.AZKABAN_USERNAME_KEY);
-      String azkabanPassword = getAzkabanPassword(config);
-      String azkabanServerUrl = config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY);
+      String azkabanUsername = _config.getString(ServiceAzkabanConfigKeys.AZKABAN_USERNAME_KEY);
+      String azkabanPassword = getAzkabanPassword(_config);
+      String azkabanServerUrl = _config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY);
 
       _sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl);
     } catch (IOException | EncoderException e) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
index b712a5a..4c24944 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
@@ -37,5 +37,6 @@ public class ServiceAzkabanConfigKeys {
 
   // Azkaban System Environment
   public static final String AZKABAN_PASSWORD_SYSTEM_KEY = "GOBBLIN_SERVICE_AZKABAN_PASSWORD";
+  public static final String DEFAULT_AZKABAN_PROJECT_CONFIG_FILE = "default-service-azkaban.conf";
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf b/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
index 6d31984..caf6ebe 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
+++ b/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
@@ -37,5 +37,5 @@ gobblin.service.azkaban.project.zip.failIfJarNotFound=false
 gobblin.service.azkaban.project.zip.additionalFilesUrl=""
 
 gobblin.service.azkaban.project.job.jar.mavenUrlTemplate="https://repo.maven.apache.org/maven2/com/linkedin/gobblin/<module-name>/<module-version>/<module-name>-<module-version>.jar"
-gobblin.service.azkaban.project.job.jar.mavenUrlTemplateModules="gobblin-admin.jar,gobblin-api.jar,gobblin-compaction.jar,gobblin-config-management.jar,gobblin-core.jar,gobblin-core-base.jar,gobblin-distribution.jar,gobblin-example.jar,gobblin-hive-registration.jar,gobblin-metrics-libs.jar,gobblin-metastore.jar,gobblin-modules.jar,gobblin-rest-service.jar,gobblin-runtime.jar,gobblin-runtime-hadoop.jar,gobblin-utility.jar,gobblin-salesforce.jar,gobblin-test-harness.jar,gobblin-tunnel.jar,gobblin-data-management.jar,gobblin-config-management.jar,gobblin-audit.jar,gobblin-yarn.jar,gobblin-cluster.jar,gobblin-aws.jar,gobblin-service.jar,gobblin-test-utils.jar"
+gobblin.service.azkaban.project.job.jar.mavenUrlTemplateModules="gobblin-admin,gobblin-api,gobblin-compaction,gobblin-config-management,gobblin-core,gobblin-core-base,gobblin-distribution,gobblin-example,gobblin-hive-registration,gobblin-metrics-libs,gobblin-metastore,gobblin-modules,gobblin-rest-service,gobblin-runtime,gobblin-runtime-hadoop,gobblin-utility,gobblin-salesforce,gobblin-test-harness,gobblin-tunnel,gobblin-data-management,gobblin-config-management,gobblin-audit,gobblin-yarn,gobblin-cluster,gobblin-aws,gobblin-service,gobblin-test-utils"
 gobblin.service.azkaban.project.job.jar.mavenGobblinVersion="0.11.0"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index a6f0199..8ea19c4 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -75,4 +75,6 @@ public class ServiceConfigKeys {
   // Template Catalog Keys
   public static final String TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY = GOBBLIN_SERVICE_PREFIX + "templateCatalogs.fullyQualifiedPath";
 
+  // Logging
+  public static final String GOBBLIN_SERVICE_LOG4J_CONFIGURATION_FILE = "log4j-service.properties";
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b0c96aca/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 4707361..c2591e1 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -37,6 +37,7 @@ import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;


[4/9] incubator-gobblin git commit: Add ability to read and override Azkaban password from environment

Posted by ab...@apache.org.
Add ability to read and override Azkaban password from environment


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/2164a1e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/2164a1e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/2164a1e6

Branch: refs/heads/master
Commit: 2164a1e6a8c2cc556f070bfab11409a701bcddd5
Parents: f423d7d
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Wed Aug 30 00:36:12 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Aug 30 00:36:12 2017 -0700

----------------------------------------------------------------------
 .../AzkabanSpecExecutorInstanceProducer.java             | 11 ++++++++++-
 .../modules/orchestration/ServiceAzkabanConfigKeys.java  |  3 +++
 2 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2164a1e6/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
index f093af2..5471f0c 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.concurrent.Future;
 
 import org.apache.commons.codec.EncoderException;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer;
@@ -47,7 +48,7 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
     try {
       // Initialize Azkaban client / producer and cache credentials
       String azkabanUsername = config.getString(ServiceAzkabanConfigKeys.AZKABAN_USERNAME_KEY);
-      String azkabanPassword = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_KEY);
+      String azkabanPassword = getAzkabanPassword(config);
       String azkabanServerUrl = config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY);
 
       sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl);
@@ -56,6 +57,14 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
     }
   }
 
+  private String getAzkabanPassword(Config config) {
+    if (StringUtils.isNotBlank(System.getProperty(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_SYSTEM_KEY))) {
+      return System.getProperty(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_SYSTEM_KEY);
+    }
+
+    return ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_KEY, StringUtils.EMPTY);
+  }
+
   public AzkabanSpecExecutorInstanceProducer(Config config, Logger log) {
     this(config, Optional.of(log));
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2164a1e6/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
index 762561c..b712a5a 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
@@ -34,5 +34,8 @@ public class ServiceAzkabanConfigKeys {
   public static final String AZKABAN_PROJECT_ZIP_FAIL_IF_JARNOTFOUND_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.failIfJarNotFound";
   public static final String AZKABAN_PROJECT_ZIP_ADDITIONAL_FILE_URLS_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.additionalFilesUrl";
   public static final String AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.overwriteIfExists";
+
+  // Azkaban System Environment
+  public static final String AZKABAN_PASSWORD_SYSTEM_KEY = "GOBBLIN_SERVICE_AZKABAN_PASSWORD";
 }
 


[5/9] incubator-gobblin git commit: Add config to support Azkaban solo server default deployment (as per Azkaban website)

Posted by ab...@apache.org.
Add config to support Azkaban solo server default deployment (as per Azkaban website)


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/e285202d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/e285202d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/e285202d

Branch: refs/heads/master
Commit: e285202de01f2fa2e7b0f5d8132e25fc87f7a0fb
Parents: 2164a1e
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Wed Aug 30 02:12:46 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Aug 30 02:12:46 2017 -0700

----------------------------------------------------------------------
 .../src/main/resources/default-service-azkaban.conf       | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e285202d/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf b/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
index 01a26f1..6d31984 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
+++ b/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
@@ -17,6 +17,14 @@
 
 # Default values
 # These values should generally come from template being used by the Service
+
+# Default config for Azkaban Infrastructure
+# (Current defaults are for default local Azkaban setup)
+gobblin.service.azkaban.username=azkaban
+gobblin.service.azkaban.password=azkaban
+gobblin.service.azkaban.server.url="http://localhost:8081/"
+
+# Default config for Azkaban Projects
 gobblin.service.azkaban.project.namePrefix=GobblinService_
 gobblin.service.azkaban.project.description="Gobblin Service has setup this project"
 gobblin.service.azkaban.project.flowName="GobblinServiceFlow"
@@ -29,5 +37,5 @@ gobblin.service.azkaban.project.zip.failIfJarNotFound=false
 gobblin.service.azkaban.project.zip.additionalFilesUrl=""
 
 gobblin.service.azkaban.project.job.jar.mavenUrlTemplate="https://repo.maven.apache.org/maven2/com/linkedin/gobblin/<module-name>/<module-version>/<module-name>-<module-version>.jar"
-gobblin.service.azkaban.project.job.jar.mavenUrlTemplateModules="gobblin-admin,gobblin-api,gobblin-compaction,gobblin-config-management,gobblin-core,gobblin-core-base,gobblin-distribution,gobblin-example,gobblin-hive-registration,gobblin-metrics-libs,gobblin-metastore,gobblin-modules,gobblin-rest-service,gobblin-runtime,gobblin-runtime-hadoop,gobblin-utility,gobblin-salesforce,gobblin-test-harness,gobblin-tunnel,gobblin-data-management,gobblin-config-management,gobblin-audit,gobblin-yarn,gobblin-cluster,gobblin-aws,gobblin-service,gobblin-test-utils"
+gobblin.service.azkaban.project.job.jar.mavenUrlTemplateModules="gobblin-admin.jar,gobblin-api.jar,gobblin-compaction.jar,gobblin-config-management.jar,gobblin-core.jar,gobblin-core-base.jar,gobblin-distribution.jar,gobblin-example.jar,gobblin-hive-registration.jar,gobblin-metrics-libs.jar,gobblin-metastore.jar,gobblin-modules.jar,gobblin-rest-service.jar,gobblin-runtime.jar,gobblin-runtime-hadoop.jar,gobblin-utility.jar,gobblin-salesforce.jar,gobblin-test-harness.jar,gobblin-tunnel.jar,gobblin-data-management.jar,gobblin-config-management.jar,gobblin-audit.jar,gobblin-yarn.jar,gobblin-cluster.jar,gobblin-aws.jar,gobblin-service.jar,gobblin-test-utils.jar"
 gobblin.service.azkaban.project.job.jar.mavenGobblinVersion="0.11.0"
\ No newline at end of file


[6/9] incubator-gobblin git commit: Support to execut Azkaban project from Orchestrator

Posted by ab...@apache.org.
Support to execut Azkaban project from Orchestrator


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0bb5139c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0bb5139c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0bb5139c

Branch: refs/heads/master
Commit: 0bb5139c8822ded33295b9eb118b67df1cb9f418
Parents: e285202
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Wed Aug 30 03:36:04 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Aug 30 03:36:04 2017 -0700

----------------------------------------------------------------------
 .../orchestration/AzkabanAjaxAPIClient.java     | 340 +++++++++++--------
 .../modules/orchestration/AzkabanJobHelper.java |  70 +++-
 .../orchestration/AzkabanProjectConfig.java     |   2 +-
 .../AzkabanSpecExecutorInstanceProducer.java    |  59 ++--
 gradle/scripts/dependencyDefinitions.gradle     |   1 +
 5 files changed, 307 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
index 31fc753..d0b8471 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
@@ -19,8 +19,6 @@ package org.apache.gobblin.service.modules.orchestration;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
 import java.security.KeyManagementException;
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
@@ -37,7 +35,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.EncoderException;
 import org.apache.commons.codec.net.URLCodec;
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
@@ -55,13 +52,14 @@ import org.apache.http.ssl.SSLContextBuilder;
 import org.apache.http.ssl.TrustStrategy;
 
 import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 
 
 @Slf4j
 public class AzkabanAjaxAPIClient {
-
   private static Splitter SPLIT_ON_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
 
   // TODO: Ensure GET call urls do not grow too big
@@ -71,93 +69,107 @@ public class AzkabanAjaxAPIClient {
   private static final long MILLISECONDS_IN_HOUR = 60 * 60 * 1000;
   private static final URLCodec codec = new URLCodec();
 
+  /***
+   * Authenticate a user and obtain a session.id from response. Once a session.id has been obtained,
+   * until the session expires, this id can be used to do any API requests with a proper permission granted.
+   * A session expires if user log's out, changes machine, browser or location, if Azkaban is restarted,
+   * or if the session expires. The default session timeout is 24 hours (one day). User can re-login irrespective
+   * of wheter the session has expired or not. For the same user, a new session will always override the old one.
+   * @param username Username.
+   * @param password Password.
+   * @param azkabanServerUrl Azkaban Server Url.
+   * @return Session Id.
+   * @throws IOException
+   * @throws EncoderException
+   */
   public static String authenticateAndGetSessionId(String username, String password, String azkabanServerUrl)
       throws IOException, EncoderException {
     // Create post request
-    HttpPost postRequest = new HttpPost(azkabanServerUrl);
-    StringEntity input = new StringEntity(String.format("action=%s&username=%s&password=%s", "login",
-        username, codec.encode(password)));
-    input.setContentType("application/x-www-form-urlencoded");
-    postRequest.setEntity(input);
-    postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
+    Map<String, String> params = Maps.newHashMap();
+    params.put("action", "login");
+    params.put("username", username);
+    params.put("password", codec.encode(password));
 
-    // Make the call, get response
-    @Cleanup CloseableHttpClient httpClient = getHttpClient();
-    HttpResponse response = httpClient.execute(postRequest);
-
-    return handleResponse(response, "session.id").get("session.id");
+    return executePostRequest(preparePostRequest(azkabanServerUrl, null, params)).get("session.id");
   }
 
+  /***
+   * Get project.id for a Project Name.
+   * @param sessionId Session Id.
+   * @param azkabanProjectConfig Azkaban Project Config.
+   * @return Project Id.
+   * @throws IOException
+   */
   public static String getProjectId(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException {
     // Note: Every get call to Azkaban provides a projectId in response, so we have are using fetchProjectFlows call
     // .. because it does not need any additional params other than project name
-    // Create get request
-    HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=fetchprojectflows&session.id=%s&"
-            + "project=%s", azkabanProjectConfig.getAzkabanServerUrl(), sessionId,
-        azkabanProjectConfig.getAzkabanProjectName()));
+    Map<String, String> params = Maps.newHashMap();
+    params.put("ajax", "fetchprojectflows");
+    params.put("project", azkabanProjectConfig.getAzkabanProjectName());
 
-    // Make the call, get response
-    @Cleanup CloseableHttpClient httpClient = getHttpClient();
-    HttpResponse response = httpClient.execute(getRequest);
-    return handleResponse(response, "projectId").get("projectId");
+    return executeGetRequest(prepareGetRequest(azkabanProjectConfig.getAzkabanServerUrl() + "/manager",
+        sessionId, params)).get("projectId");
   }
 
+  /***
+   * Creates an Azkaban project and uploads the zip file. If proxy user and group permissions are specified in
+   * Azkaban Project Config, then this method also adds it to the project configuration.
+   * @param sessionId Session Id.
+   * @param zipFilePath Zip file to upload.
+   * @param azkabanProjectConfig Azkaban Project Config.
+   * @return Project Id.
+   * @throws IOException
+   */
   public static String createAzkabanProject(String sessionId, String zipFilePath,
       AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
+    Map<String, String> params = Maps.newHashMap();
+    params.put("ajax", "executeFlow");
+    params.put("name", azkabanProjectConfig.getAzkabanProjectName());
+    params.put("description", azkabanProjectConfig.getAzkabanProjectDescription());
 
-    String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
-    String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
-    String azkabanProjectDescription = azkabanProjectConfig.getAzkabanProjectDescription();
-    String groupAdminUsers = azkabanProjectConfig.getAzkabanGroupAdminUsers();
-
-    // Create post request
-    HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager?action=create");
-    StringEntity input = new StringEntity(String.format("session.id=%s&name=%s&description=%s", sessionId,
-        azkabanProjectName, azkabanProjectDescription));
-    input.setContentType("application/x-www-form-urlencoded");
-    postRequest.setEntity(input);
-    postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
-
-    // Make the call, get response
-    @Cleanup CloseableHttpClient httpClient = getHttpClient();
-    HttpResponse response = httpClient.execute(postRequest);
-    handleResponse(response);
+    executePostRequest(preparePostRequest(azkabanProjectConfig.getAzkabanServerUrl() +
+        "/manager?action=create", sessionId, params));
 
     // Add proxy user if any
     if (azkabanProjectConfig.getAzkabanUserToProxy().isPresent()) {
       Iterable<String> proxyUsers = SPLIT_ON_COMMA.split(azkabanProjectConfig.getAzkabanUserToProxy().get());
       for (String user : proxyUsers) {
-        addProxyUser(sessionId, azkabanServerUrl, azkabanProjectName, user);
+        addProxyUser(sessionId, azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName(), user);
       }
     }
 
     // Add group permissions if any
     // TODO: Support users (not just groups), and different permission types
     // (though we can add users, we only support groups at the moment and award them with admin permissions)
-    if (StringUtils.isNotBlank(groupAdminUsers)) {
-      String [] groups = StringUtils.split(groupAdminUsers, ",");
+    if (StringUtils.isNotBlank(azkabanProjectConfig.getAzkabanGroupAdminUsers())) {
+      String [] groups = StringUtils.split(azkabanProjectConfig.getAzkabanGroupAdminUsers(), ",");
       for (String group : groups) {
-        addUserPermission(sessionId, azkabanServerUrl, azkabanProjectName, group, true, true, false, false,
-            false, false);
+        addUserPermission(sessionId, azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName(),
+            group, true, true, false, false,false,
+            false);
       }
     }
 
     // Upload zip file to azkaban and return projectId
-    return uploadZipFileToAzkaban(sessionId, azkabanServerUrl, azkabanProjectName, zipFilePath);
+    return uploadZipFileToAzkaban(sessionId, azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName(), zipFilePath);
   }
 
+  /***
+   * Replace an existing Azkaban Project. If proxy user and group permissions are specified in
+   * Azkaban Project Config, then this method also adds it to the project configuration.
+   * @param sessionId Session Id.
+   * @param zipFilePath Zip file to upload.
+   * @param azkabanProjectConfig Azkaban Project Config.
+   * @return Project Id.
+   * @throws IOException
+   */
   public static String replaceAzkabanProject(String sessionId, String zipFilePath,
       AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
-
-    String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
-    String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
-    String azkabanProjectDescription = azkabanProjectConfig.getAzkabanProjectDescription();
-    String groupAdminUsers = azkabanProjectConfig.getAzkabanGroupAdminUsers();
-
     // Change project description
-    changeProjectDescription(sessionId, azkabanServerUrl, azkabanProjectName, azkabanProjectDescription);
+    changeProjectDescription(sessionId, azkabanProjectConfig.getAzkabanServerUrl(),
+        azkabanProjectConfig.getAzkabanProjectName(), azkabanProjectConfig.getAzkabanProjectDescription());
 
     // Add proxy user if any
     // Note: 1. We cannot remove previous proxy-user because there is no way to read it from Azkaban
@@ -166,7 +178,8 @@ public class AzkabanAjaxAPIClient {
     if (azkabanProjectConfig.getAzkabanUserToProxy().isPresent()) {
       Iterable<String> proxyUsers = SPLIT_ON_COMMA.split(azkabanProjectConfig.getAzkabanUserToProxy().get());
       for (String user : proxyUsers) {
-        addProxyUser(sessionId, azkabanServerUrl, azkabanProjectName, user);
+        addProxyUser(sessionId, azkabanProjectConfig.getAzkabanServerUrl(),
+            azkabanProjectConfig.getAzkabanProjectName(), user);
       }
     }
 
@@ -175,12 +188,13 @@ public class AzkabanAjaxAPIClient {
     // Note: 1. We cannot remove previous group-user because there is no way to read it from Azkaban
     //       2. Adding same group-user will return an error message, but we will ignore it
     // (though we can add users, we only support groups at the moment and award them with admin permissions)
-    if (StringUtils.isNotBlank(groupAdminUsers)) {
-      String [] groups = StringUtils.split(groupAdminUsers, ",");
+    if (StringUtils.isNotBlank(azkabanProjectConfig.getAzkabanGroupAdminUsers())) {
+      String [] groups = StringUtils.split(azkabanProjectConfig.getAzkabanGroupAdminUsers(), ",");
       for (String group : groups) {
         try {
-          addUserPermission(sessionId, azkabanServerUrl, azkabanProjectName, group, true, true, false, false, false,
-              false);
+          addUserPermission(sessionId, azkabanProjectConfig.getAzkabanServerUrl(),
+              azkabanProjectConfig.getAzkabanProjectName(), group, true, true,
+              false, false, false,false);
         } catch (IOException e) {
           // Ignore if group already exists, we cannot list existing groups; so its okay to attempt adding exiting
           // .. groups
@@ -192,21 +206,20 @@ public class AzkabanAjaxAPIClient {
     }
 
     // Upload zip file to azkaban and return projectId
-    return uploadZipFileToAzkaban(sessionId, azkabanServerUrl, azkabanProjectName, zipFilePath);
+    return uploadZipFileToAzkaban(sessionId, azkabanProjectConfig.getAzkabanServerUrl(),
+        azkabanProjectConfig.getAzkabanProjectName(), zipFilePath);
   }
 
   private static void addProxyUser(String sessionId, String azkabanServerUrl, String azkabanProjectName,
       String proxyUser)
       throws IOException {
-
     // Create get request (adding same proxy user multiple times is a non-issue, Azkaban handles it)
-    HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=addProxyUser&session.id=%s&"
-        + "project=%s&name=%s", azkabanServerUrl, sessionId, azkabanProjectName, proxyUser));
+    Map<String, String> params = Maps.newHashMap();
+    params.put("ajax", "addProxyUser");
+    params.put("project", azkabanProjectName);
+    params.put("name", proxyUser);
 
-    // Make the call, get response
-    @Cleanup CloseableHttpClient httpClient = getHttpClient();
-    HttpResponse response = httpClient.execute(getRequest);
-    handleResponse(response);
+    executeGetRequest(prepareGetRequest(azkabanServerUrl + "/manager", sessionId, params));
   }
 
   private static void addUserPermission(String sessionId, String azkabanServerUrl, String azkabanProjectName,
@@ -219,100 +232,155 @@ public class AzkabanAjaxAPIClient {
 
     // Create get request (adding same normal user permission multiple times will throw an error, but we cannot
     // list whole list of permissions anyways)
-    HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=addPermission&session.id=%s&"
-            + "project=%s&name=%s&group=%s&permissions[admin]=%s&permissions[read]=%s&permissions[write]=%s"
-            + "&permissions[execute]=%s&permissions[schedule]=%s", azkabanServerUrl, sessionId, azkabanProjectName, name,
-        isGroup, adminPermission, readPermission, writePermission, executePermission, schedulePermission));
-
-    // Make the call, get response
-    @Cleanup CloseableHttpClient httpClient = getHttpClient();
-    HttpResponse response = httpClient.execute(getRequest);
-    handleResponse(response);
+    Map<String, String> params = Maps.newHashMap();
+    params.put("ajax", "addPermission");
+    params.put("project", azkabanProjectName);
+    params.put("name", name);
+    params.put("group", Boolean.toString(isGroup));
+    params.put("permissions[admin]", Boolean.toString(adminPermission));
+    params.put("permissions[read]", Boolean.toString(readPermission));
+    params.put("permissions[write]", Boolean.toString(writePermission));
+    params.put("permissions[execute]", Boolean.toString(executePermission));
+    params.put("permissions[schedule]", Boolean.toString(schedulePermission));
+
+    executeGetRequest(prepareGetRequest(azkabanServerUrl + "/manager", sessionId, params));
   }
 
-  private static String uploadZipFileToAzkaban(String sessionId, String azkabanServerUrl, String azkabanProjectName,
-      String jobZipFile)
+  /***
+   * Schedule the Azkaban Project to run with a schedule.
+   * @param sessionId Session Id.
+   * @param azkabanProjectId Project Id.
+   * @param azkabanProjectConfig Azkaban Project Config.
+   * @throws IOException
+   */
+  public static void scheduleAzkabanProject(String sessionId, String azkabanProjectId,
+      AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
+    Map<String, String> params = Maps.newHashMap();
+    params.put("ajax", "scheduleFlow");
+    params.put("projectName", azkabanProjectConfig.getAzkabanProjectName());
+    params.put("flow", azkabanProjectConfig.getAzkabanProjectFlowName());
+    params.put("projectId", azkabanProjectId);
+    params.put("scheduleTime", getScheduledTimeInAzkabanFormat(LOW_NETWORK_TRAFFIC_BEGIN_HOUR,
+        LOW_NETWORK_TRAFFIC_END_HOUR, JOB_START_DELAY_MINUTES));
+    params.put("scheduleDate", getScheduledDateInAzkabanFormat());
+    params.put("is_recurring", "off");
+
+    // Run once OR push down schedule (TODO: Enable when push down is finalized)
+    //    if (azkabanProjectConfig.isScheduled()) {
+    //      params.put("is_recurring", "on");
+    //      params.put("period", "1d");
+    //    } else {
+    //      params.put("is_recurring", "off");
+    //    }
+
+    executePostRequest(preparePostRequest(azkabanProjectConfig.getAzkabanServerUrl() + "/schedule", sessionId, params));
+  }
 
-    // Create post request
-    HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager");
-    HttpEntity entity = MultipartEntityBuilder
-        .create()
-        .addTextBody("session.id", sessionId)
-        .addTextBody("ajax", "upload")
-        .addBinaryBody("file", new File(jobZipFile),
-            ContentType.create("application/zip"), azkabanProjectName + ".zip")
-        .addTextBody("project", azkabanProjectName)
-        .build();
-    postRequest.setEntity(entity);
+  private static void changeProjectDescription(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+      String projectDescription)
+      throws IOException {
+    String encodedProjectDescription;
+    try {
+      encodedProjectDescription = new URLCodec().encode(projectDescription);
+    } catch (EncoderException e) {
+      throw new IOException("Could not encode Azkaban project description", e);
+    }
 
-    // Make the call, get response
-    @Cleanup CloseableHttpClient httpClient = getHttpClient();
-    HttpResponse response = httpClient.execute(postRequest);
+    Map<String, String> params = Maps.newHashMap();
+    params.put("ajax", "changeDescription");
+    params.put("project", azkabanProjectName);
+    params.put("description", encodedProjectDescription);
 
-    // Obtaining projectId is hard. Uploading zip file is one avenue to get it from Azkaban
-    return handleResponse(response, "projectId").get("projectId");
+    executeGetRequest(prepareGetRequest(azkabanServerUrl + "/manager", sessionId, params));
   }
 
-  public static void scheduleAzkabanProject(String sessionId, String azkabanProjectId,
+  /***
+   * Execute an existing Azkaban project.
+   * @param sessionId Session Id.
+   * @param azkabanProjectId Project Id.
+   * @param azkabanProjectConfig Azkaban Project Config.
+   * @throws IOException
+   */
+  public static void executeAzkabanProject(String sessionId, String azkabanProjectId,
       AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
-    String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
-    String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
-    String azkabanProjectFlowName = azkabanProjectConfig.getAzkabanProjectFlowName();
+    Map<String, String> params = Maps.newHashMap();
+    params.put("ajax", "executeFlow");
+    params.put("projectName", azkabanProjectConfig.getAzkabanProjectName());
+    params.put("flow", azkabanProjectConfig.getAzkabanProjectFlowName());
+
+    executePostRequest(preparePostRequest(azkabanProjectConfig.getAzkabanServerUrl() + "/executor", sessionId, params));
+  }
+
+  private static HttpGet prepareGetRequest(String requestUrl, String sessionId, Map<String, String> params)
+      throws IOException {
+    // Create get request
+    StringBuilder stringEntityBuilder = new StringBuilder();
+    stringEntityBuilder.append(String.format("?session.id=%s", sessionId));
+    for (Map.Entry<String, String> entry : params.entrySet()) {
+      stringEntityBuilder.append(String.format("&%s=%s", entry.getKey(), entry.getValue()));
+    }
 
-    String scheduleString = "is_recurring=off"; // run only once
-    // TODO: Enable scheduling on Azkaban, when we are ready to push down the schedule
-//    if (azkabanProjectConfig.isScheduled()) {
-//      scheduleString = "is_recurring=on&period=1d"; // schedule once every day
-//    }
+    return new HttpGet(requestUrl + stringEntityBuilder);
+  }
 
+  private static HttpPost preparePostRequest(String requestUrl, String sessionId, Map<String, String> params)
+      throws IOException {
     // Create post request
-    HttpPost postRequest = new HttpPost(azkabanServerUrl + "/schedule");
-    StringEntity input = new StringEntity(String.format("session.id=%s&ajax=scheduleFlow"
-            + "&projectName=%s&flow=%s&projectId=%s&scheduleTime=%s&scheduleDate=%s&%s",
-        sessionId, azkabanProjectName, azkabanProjectFlowName, azkabanProjectId,
-        getScheduledTimeInAzkabanFormat(LOW_NETWORK_TRAFFIC_BEGIN_HOUR, LOW_NETWORK_TRAFFIC_END_HOUR,
-            JOB_START_DELAY_MINUTES), getScheduledDateInAzkabanFormat(), scheduleString));
+    HttpPost postRequest = new HttpPost(requestUrl);
+    StringBuilder stringEntityBuilder = new StringBuilder();
+    stringEntityBuilder.append(String.format("session.id=%s", sessionId));
+    for (Map.Entry<String, String> entry : params.entrySet()) {
+      if (stringEntityBuilder.length() > 0) {
+        stringEntityBuilder.append("&");
+      }
+      stringEntityBuilder.append(String.format("%s=%s", entry.getKey(), entry.getValue()));
+    }
+    StringEntity input = new StringEntity(stringEntityBuilder.toString());
     input.setContentType("application/x-www-form-urlencoded");
     postRequest.setEntity(input);
     postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
 
+    return postRequest;
+  }
+
+  private static Map<String, String> executeGetRequest(HttpGet getRequest) throws IOException {
+    // Make the call, get response
+    @Cleanup CloseableHttpClient httpClient = getHttpClient();
+    HttpResponse response = httpClient.execute(getRequest);
+    return handleResponse(response);
+  }
+
+  private static Map<String, String> executePostRequest(HttpPost postRequest) throws IOException {
     // Make the call, get response
     @Cleanup CloseableHttpClient httpClient = getHttpClient();
     HttpResponse response = httpClient.execute(postRequest);
-    handleResponse(response);
+    return handleResponse(response);
   }
 
-  private static void changeProjectDescription(String sessionId, String azkabanServerUrl, String azkabanProjectName,
-      String projectDescription)
+  private static String uploadZipFileToAzkaban(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+      String jobZipFile)
       throws IOException {
 
-    HttpGet getRequest;
-    try {
-      // Create get request (adding same proxy user multiple times is a non-issue, Azkaban handles it)
-      getRequest = new HttpGet(String
-          .format("%s/manager?ajax=changeDescription&session.id=%s&" + "project=%s&description=%s", azkabanServerUrl,
-              sessionId, azkabanProjectName, new URLCodec().encode(projectDescription)));
-    } catch (EncoderException e) {
-      throw new IOException("Could not encode Azkaban project description", e);
-    }
+    // Create post request
+    HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager");
+    HttpEntity entity = MultipartEntityBuilder
+        .create()
+        .addTextBody("session.id", sessionId)
+        .addTextBody("ajax", "upload")
+        .addBinaryBody("file", new File(jobZipFile),
+            ContentType.create("application/zip"), azkabanProjectName + ".zip")
+        .addTextBody("project", azkabanProjectName)
+        .build();
+    postRequest.setEntity(entity);
 
     // Make the call, get response
     @Cleanup CloseableHttpClient httpClient = getHttpClient();
-    HttpResponse response = httpClient.execute(getRequest);
-    handleResponse(response);
-  }
+    HttpResponse response = httpClient.execute(postRequest);
 
-  public static void notifyUberdistcp2ToolServer(String uberdistcp2ToolServer,
-      AzkabanProjectConfig azkabanProjectConfig)
-      throws IOException {
-    boolean isGoUrl = false;
-    if (!StringUtils.isBlank(uberdistcp2ToolServer)) {
-      if (uberdistcp2ToolServer.startsWith("https://go") || uberdistcp2ToolServer.startsWith("http://go")) {
-        isGoUrl = true;
-      }
-    }
+    // Obtaining projectId is hard. Uploading zip file is one avenue to get it from Azkaban
+    return handleResponse(response, "projectId").get("projectId");
   }
 
   private static CloseableHttpClient getHttpClient()
@@ -350,11 +418,9 @@ public class AzkabanAjaxAPIClient {
       // Handle error if any
       handleResponseError(jsonObject);
 
-      // Get required responseKeys
-      if (ArrayUtils.isNotEmpty(responseKeys)) {
-        for (String responseKey : responseKeys) {
-          responseMap.put(responseKey, jsonObject.get(responseKey).toString().replaceAll("\"", ""));
-        }
+      // Get all responseKeys
+      for(Map.Entry<String, JsonElement> entry : jsonObject.entrySet()) {
+        responseMap.put(entry.getKey(), entry.getValue().toString().replaceAll("\"", ""));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
index 627761e..a74a6ad 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
@@ -46,6 +46,13 @@ import com.google.common.collect.Lists;
 @Slf4j
 public class AzkabanJobHelper {
 
+  /***
+   * Checks if an Azkaban project exists by name.
+   * @param sessionId Session Id.
+   * @param azkabanProjectConfig Azkaban Project Config that contains project name.
+   * @return true if project exists else false.
+   * @throws IOException
+   */
   public static boolean isAzkabanJobPresent(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
     log.info("Checking if Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName() + " exists");
@@ -74,6 +81,13 @@ public class AzkabanJobHelper {
     }
   }
 
+  /***
+   * Get Project Id by an Azkaban Project Name.
+   * @param sessionId Session Id.
+   * @param azkabanProjectConfig Azkaban Project Config that contains project Name.
+   * @return Project Id.
+   * @throws IOException
+   */
   public static String getProjectId(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
     log.info("Getting project Id for project: " + azkabanProjectConfig.getAzkabanProjectName());
@@ -83,6 +97,14 @@ public class AzkabanJobHelper {
     return projectId;
   }
 
+  /***
+   * Create project on Azkaban based on Azkaban config. This includes preparing the zip file and uploading it to
+   * Azkaban, setting permissions and schedule.
+   * @param sessionId Session Id.
+   * @param azkabanProjectConfig Azkaban Project Config.
+   * @return Project Id.
+   * @throws IOException
+   */
   public static String createAzkabanJob(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
     log.info("Creating Azkaban project for: " + azkabanProjectConfig.getAzkabanProjectName());
@@ -98,6 +120,15 @@ public class AzkabanJobHelper {
     return projectId;
   }
 
+  /***
+   * Replace project on Azkaban based on Azkaban config. This includes preparing the zip file and uploading it to
+   * Azkaban, setting permissions and schedule.
+   * @param sessionId Session Id.
+   * @param azkabanProjectId Project Id.
+   * @param azkabanProjectConfig Azkaban Project Config.
+   * @return Project Id.
+   * @throws IOException
+   */
   public static String replaceAzkabanJob(String sessionId, String azkabanProjectId,
       AzkabanProjectConfig azkabanProjectConfig) throws IOException {
     log.info("Replacing zip for Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName());
@@ -113,6 +144,13 @@ public class AzkabanJobHelper {
     return projectId;
   }
 
+  /***
+   * Schedule an already created Azkaban project.
+   * @param sessionId Session Id.
+   * @param azkabanProjectId Project Id.
+   * @param azkabanProjectConfig Azkaban Project Config that contains schedule information.
+   * @throws IOException
+   */
   public static void scheduleJob(String sessionId, String azkabanProjectId,
       AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
@@ -120,6 +158,13 @@ public class AzkabanJobHelper {
     AzkabanAjaxAPIClient.scheduleAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig);
   }
 
+  /***
+   * Change the schedule of an already created Azkaban project.
+   * @param sessionId Session Id.
+   * @param azkabanProjectId Project Id.
+   * @param azkabanProjectConfig Azkaban Project Config that contains schedule information.
+   * @throws IOException
+   */
   public static void changeJobSchedule(String sessionId, String azkabanProjectId,
       AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
@@ -127,7 +172,28 @@ public class AzkabanJobHelper {
     AzkabanAjaxAPIClient.scheduleAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig);
   }
 
-  public static String createAzkabanJobZip(AzkabanProjectConfig azkabanProjectConfig)
+  /***
+   * Execute an already created Azkaban project.
+   * @param sessionId Session Id.
+   * @param azkabanProjectId Project Id.
+   * @param azkabanProjectConfig Azkaban Project Config that contains schedule information.
+   * @throws IOException
+   */
+  public static void executeJob(String sessionId, String azkabanProjectId,
+      AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    log.info("Executing Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName());
+    AzkabanAjaxAPIClient.executeAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig);
+  }
+
+  /***
+   * Create Azkaban project zip file.
+   * @param azkabanProjectConfig Azkaban Project Config that contains information about what to include in
+   *                             zip file.
+   * @return Zip file path.
+   * @throws IOException
+   */
+  private static String createAzkabanJobZip(AzkabanProjectConfig azkabanProjectConfig)
       throws IOException {
     log.info("Creating Azkaban job zip file for project: " + azkabanProjectConfig.getAzkabanProjectName());
     String workDir = azkabanProjectConfig.getWorkDir();
@@ -153,7 +219,7 @@ public class AzkabanJobHelper {
           jobJarFile = downloadAzkabanJobJar(workDir, jobJarUrl);
           filesToAdd.add(jobJarFile);
         } catch (IOException e) {
-          if(failIfJarNotFound) {
+          if (failIfJarNotFound) {
             throw e;
           }
           log.warn("Could not download: " + jobJarFile);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
index ddae3d9..2bac65d 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
@@ -69,7 +69,7 @@ public class AzkabanProjectConfig {
     this.azkabanProjectName = constructProjectName(jobSpec, config);
     this.azkabanProjectDescription = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_DESCRIPTION_KEY);
     this.azkabanProjectFlowName = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_FLOW_NAME_KEY);
-    this.azkabanGroupAdminUsers = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_GROUP_ADMINS_KEY);
+    this.azkabanGroupAdminUsers = ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_GROUP_ADMINS_KEY, "");
     this.azkabanUserToProxy = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_USER_TO_PROXY_KEY, null));
 
     // Azkaban Project Zip

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
index 5471f0c..f73bc6c 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
@@ -39,7 +39,7 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
     implements SpecExecutorInstanceProducer<Spec>, Closeable {
 
   // Session Id for GaaS User
-  private String sessionId;
+  private String _sessionId;
 
 
   public AzkabanSpecExecutorInstanceProducer(Config config, Optional<Logger> log) {
@@ -51,7 +51,7 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
       String azkabanPassword = getAzkabanPassword(config);
       String azkabanServerUrl = config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY);
 
-      sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl);
+      _sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl);
     } catch (IOException | EncoderException e) {
       throw new RuntimeException("Could not authenticate with Azkaban", e);
     }
@@ -82,37 +82,46 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
   @Override
   public Future<?> addSpec(Spec addedSpec) {
     // If project already exists, execute it
-
-    // If project does not already exists, create and execute it
-    AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) addedSpec);
     try {
-      _log.info("Setting up your Azkaban Project for: " + azkabanProjectConfig.getAzkabanProjectName());
-
-      // Deleted project also returns true if-project-exists check, so optimistically first create the project
-      // .. (it will create project if it was never created or deleted), if project exists it will fail with
-      // .. appropriate exception message, catch that and run in replace project mode if force overwrite is
-      // .. specified
-      try {
-        createNewAzkabanProject(sessionId, azkabanProjectConfig);
-      } catch (IOException e) {
-        if ("Project already exists.".equalsIgnoreCase(e.getMessage())) {
-          if (ConfigUtils.getBoolean(((JobSpec) addedSpec).getConfig(),
-              ServiceAzkabanConfigKeys.AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY, false)) {
-            _log.info("Project already exists for this Spec, but force overwrite specified");
-            updateExistingAzkabanProject(sessionId, azkabanProjectConfig);
+      AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) addedSpec);
+      boolean azkabanProjectExists = AzkabanJobHelper.isAzkabanJobPresent(_sessionId, azkabanProjectConfig);
+
+      // If project does not already exists, create and execute it
+      if (azkabanProjectExists) {
+        _log.info("Executing Azkaban Project: " + azkabanProjectConfig.getAzkabanProjectName());
+        AzkabanJobHelper.executeJob(_sessionId, AzkabanJobHelper.getProjectId(_sessionId, azkabanProjectConfig),
+            azkabanProjectConfig);
+      } else {
+        _log.info("Setting up Azkaban Project: " + azkabanProjectConfig.getAzkabanProjectName());
+
+        // Deleted project also returns true if-project-exists check, so optimistically first create the project
+        // .. (it will create project if it was never created or deleted), if project exists it will fail with
+        // .. appropriate exception message, catch that and run in replace project mode if force overwrite is
+        // .. specified
+        try {
+          createNewAzkabanProject(_sessionId, azkabanProjectConfig);
+        } catch (IOException e) {
+          if ("Project already exists.".equalsIgnoreCase(e.getMessage())) {
+            if (ConfigUtils.getBoolean(((JobSpec) addedSpec).getConfig(),
+                ServiceAzkabanConfigKeys.AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY, false)) {
+              _log.info("Project already exists for this Spec, but force overwrite specified");
+              updateExistingAzkabanProject(_sessionId, azkabanProjectConfig);
+            } else {
+              _log.info(String.format("Azkaban project already exists: " + "%smanager?project=%s",
+                  azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName()));
+            }
           } else {
-            _log.info(String.format("Azkaban project already exists: " + "%smanager?project=%s",
-                azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName()));
+            throw e;
           }
-        } else {
-          throw e;
         }
       }
+
+
     } catch (IOException e) {
       throw new RuntimeException("Issue in setting up Azkaban project.", e);
     }
 
-    return null;
+    return new CompletedFuture<>(_config, null);
   }
 
   @Override
@@ -121,7 +130,7 @@ public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInst
     AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) updatedSpec);
 
     try {
-      updateExistingAzkabanProject(sessionId, azkabanProjectConfig);
+      updateExistingAzkabanProject(_sessionId, azkabanProjectConfig);
     } catch (IOException e) {
       throw new RuntimeException("Issue in setting up Azkaban project.", e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0bb5139c/gradle/scripts/dependencyDefinitions.gradle
----------------------------------------------------------------------
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index 41e1485..1e86b96 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -69,6 +69,7 @@ ext.externalDependency = [
     "hiveExec": "org.apache.hive:hive-exec:" + hiveVersion + ":core",
     "hiveSerDe": "org.apache.hive:hive-serde:" + hiveVersion,
     "httpclient": "org.apache.httpcomponents:httpclient:4.5.2",
+    "httpmime": "org.apache.httpcomponents:httpmime:4.5.2",
     "httpcore": "org.apache.httpcomponents:httpcore:4.4.4",
     "httpasyncclient": "org.apache.httpcomponents:httpasyncclient:4.1.3",
     "jgit":"org.eclipse.jgit:org.eclipse.jgit:4.8.0.201706111038-r",


[2/9] incubator-gobblin git commit: Azkaban Orchestrator gradle dependencies

Posted by ab...@apache.org.
Azkaban Orchestrator gradle dependencies


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/9864f69b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/9864f69b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/9864f69b

Branch: refs/heads/master
Commit: 9864f69bd3054d8e7dced61e1d1f60544c87775a
Parents: 08e60ef
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Tue Aug 8 12:42:41 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Tue Aug 8 12:42:41 2017 -0700

----------------------------------------------------------------------
 gobblin-modules/gobblin-azkaban/build.gradle | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9864f69b/gobblin-modules/gobblin-azkaban/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/build.gradle b/gobblin-modules/gobblin-azkaban/build.gradle
index d2010ba..4bebecc 100644
--- a/gobblin-modules/gobblin-azkaban/build.gradle
+++ b/gobblin-modules/gobblin-azkaban/build.gradle
@@ -33,6 +33,8 @@ dependencies {
   compile externalDependency.log4j
   compile externalDependency.guava
   compile externalDependency.commonsLang
+  compile externalDependency.httpclient
+  compile externalDependency.httpmime
   compile externalDependency.jodaTime
   compile externalDependency.lombok
   compile externalDependency.slf4j


[3/9] incubator-gobblin git commit: Merge branch 'master' into service_azkaban_orchestrator

Posted by ab...@apache.org.
Merge branch 'master' into service_azkaban_orchestrator


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f423d7da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f423d7da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f423d7da

Branch: refs/heads/master
Commit: f423d7da42fe564ef67d4214bf2d31ef2bf1a32c
Parents: 9864f69 90be15f
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Wed Aug 30 00:20:42 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Aug 30 00:20:42 2017 -0700

----------------------------------------------------------------------
 conf/log4j-compaction.xml                       |   2 +-
 conf/log4j-mapreduce.xml                        |   4 +-
 config/checkstyle/checkstyle.xml                |  16 +
 config/checkstyle/suppressions.xml              |  10 +
 .../static/js/collections/job-executions.js     |   2 +-
 .../extractor/CheckpointableWatermark.java      |  26 +-
 .../gobblin_scopes/GobblinScopeInstance.java    |   3 +-
 .../gobblin_scopes/GobblinScopeTypes.java       |   3 +-
 .../broker/gobblin_scopes/JobScopeInstance.java |   3 +-
 .../gobblin_scopes/TaskScopeInstance.java       |   3 +-
 .../broker/iface/SubscopedBrokerBuilder.java    |   3 +-
 .../configuration/ConfigurationKeys.java        |  29 ++
 .../org/apache/gobblin/dataset/Dataset.java     |  12 +-
 .../apache/gobblin/dataset/DatasetsFinder.java  |   1 +
 .../gobblin/dataset/IterableDatasetFinder.java  |  24 +
 .../gobblin/dataset/PartitionableDataset.java   |  61 +++
 .../apache/gobblin/dataset/URNIdentified.java   |  29 ++
 .../URNLexicographicalComparator.java           |  53 +++
 .../dataset/comparators/package-info.java       |  21 +
 .../dataset/test/SimpleDatasetForTesting.java   |  38 ++
 .../test/SimpleDatasetPartitionForTesting.java  |  35 ++
 .../SimplePartitionableDatasetForTesting.java   |  56 +++
 .../test/StaticDatasetsFinderForTesting.java    |  61 +++
 .../org/apache/gobblin/fork/CopyHelper.java     |  27 +-
 .../extractor/CheckpointableWatermark.java      |  29 +-
 .../source/extractor/StreamingExtractor.java    |  27 +-
 .../source/workunit/BasicWorkUnitStream.java    |  13 +-
 .../apache/gobblin/writer/WatermarkStorage.java |  27 +-
 .../java/com/linkedin/gobblin/TestAlias.java    |  19 +-
 .../gobblin/ack/HierarchicalAckableTest.java    |   1 -
 .../gobblin_scopes/GobblinScopesTest.java       |   3 +-
 .../org/apache/gobblin/fork/CopyHelperTest.java |  27 +-
 .../gobblin/aws/AWSJobConfigurationManager.java |   3 +-
 .../gobblin/aws/GobblinAWSClusterManager.java   |   6 +-
 .../gobblin/aws/GobblinAWSTaskRunner.java       |   6 +-
 .../gobblin/cluster/GobblinClusterManager.java  |   5 +-
 .../gobblin/cluster/GobblinHelixConstants.java  |  26 +-
 .../apache/gobblin/cluster/GobblinHelixJob.java |  45 +-
 .../cluster/GobblinHelixJobLauncher.java        |  17 +-
 .../cluster/GobblinHelixMessagingService.java   |  16 +
 .../gobblin/cluster/GobblinHelixTaskDriver.java |  26 +-
 .../gobblin/cluster/GobblinTaskRunner.java      |  12 +-
 .../apache/helix/task/GobblinJobRebalancer.java |  26 +-
 .../mapreduce/MRCompactionTaskFactory.java      |  16 +
 .../compaction/verify/CompactionVerifier.java   |  16 +
 .../verify/InputRecordCountHelper.java          |  16 +
 .../mapreduce/RenameSourceDirectoryTest.java    |  16 +
 .../suite/TestCompactionSuiteFactories.java     |  16 +
 .../verify/PinotAuditCountVerifierTest.java     |  16 +
 .../FineGrainedWatermarkTrackerBenchmark.java   |  19 +-
 .../java/org/apache/gobblin/async/Callback.java |  16 +
 .../gobblin/converter/SamplingConverter.java    |  27 +-
 ...blinTrackingEventFlattenFilterConverter.java |   1 -
 .../DefaultCheckpointableWatermark.java         |  27 +-
 .../limiter/LimiterConfigurationKeys.java       |  16 +
 .../apache/gobblin/test/AnyToJsonConverter.java |  27 +-
 .../gobblin/test/AnyToStringConverter.java      |  27 +-
 .../org/apache/gobblin/test/TestRecord.java     |  27 +-
 .../writer/AcknowledgableRecordEnvelope.java    |  27 +-
 .../gobblin/writer/AcknowledgableWatermark.java |  27 +-
 .../gobblin/writer/AsyncWriterManager.java      |  27 +-
 .../java/org/apache/gobblin/writer/Batch.java   |  27 +-
 .../apache/gobblin/writer/BatchAccumulator.java |  16 +
 .../gobblin/writer/BatchAsyncDataWriter.java    |  27 +-
 .../gobblin/writer/BufferedAsyncDataWriter.java |  27 +-
 .../gobblin/writer/BytesBoundedBatch.java       |  27 +-
 .../writer/FineGrainedWatermarkTracker.java     |  27 +-
 .../writer/FutureWrappedWriteCallback.java      |  27 +-
 .../gobblin/writer/GenericWriteResponse.java    |  27 +-
 .../writer/GenericWriteResponseWrapper.java     |  27 +-
 .../gobblin/writer/LastWatermarkTracker.java    |  27 +-
 .../writer/MultiWriterWatermarkManager.java     |  27 +-
 .../writer/MultiWriterWatermarkTracker.java     |  27 +-
 .../org/apache/gobblin/writer/RecordFuture.java |  27 +-
 .../apache/gobblin/writer/RecordMetadata.java   |  27 +-
 .../writer/SequentialBasedBatchAccumulator.java |  27 +-
 .../apache/gobblin/writer/SyncDataWriter.java   |  27 +-
 .../writer/TrackerBasedWatermarkManager.java    |  27 +-
 .../gobblin/writer/WatermarkAwareWriter.java    |  27 +-
 .../writer/WatermarkAwareWriterWrapper.java     |  27 +-
 .../apache/gobblin/writer/WatermarkManager.java |  27 +-
 .../apache/gobblin/writer/WatermarkTracker.java |  27 +-
 .../gobblin/writer/WatermarkTrackerFactory.java |  27 +-
 .../apache/gobblin/writer/WriteResponse.java    |  27 +-
 .../gobblin/writer/WriteResponseFuture.java     |  27 +-
 .../gobblin/writer/WriteResponseMapper.java     |  27 +-
 .../converter/SamplingConverterTest.java        |  26 +-
 .../writer/FineGrainedWatermarkTrackerTest.java |  26 +-
 .../writer/MultiWriterWatermarkManagerTest.java |  26 +-
 .../gobblin/writer/WatermarkTrackerTest.java    |  26 +-
 ...nElementConversionWithAvroSchemaFactory.java | 166 +++++++
 .../JsonRecordAvroSchemaToAvroConverter.java    | 128 ++++++
 .../converter/csv/CsvToJsonConverter.java       |  48 +-
 .../converter/json/BytesToJsonConverter.java    |  51 +++
 .../gobblin/lineage/LineageException.java       |  39 ++
 .../org/apache/gobblin/lineage/LineageInfo.java | 234 ++++++++++
 .../gobblin/publisher/BaseDataPublisher.java    |  18 +-
 .../publisher/HiveRegistrationPublisher.java    |  36 +-
 .../source/DatePartitionedDailyAvroSource.java  |  19 +-
 .../extractor/extract/QueryBasedSource.java     |   6 +
 .../source/extractor/partition/Partition.java   |  16 +
 .../writer/CloseOnFlushWriterWrapper.java       |  15 +-
 .../gobblin/writer/PartitionedDataWriter.java   |   8 +-
 .../gobblin/async/AsyncDataDispatcherTest.java  |  16 +
 .../avro/FlattenNestedKeyConverterTest.java     |  16 +
 ...JsonRecordAvroSchemaToAvroConverterTest.java |  81 ++++
 .../json/BytesToJsonConverterTest.java          |  43 ++
 .../string/ObjectToStringConverterTest.java     |   1 -
 .../apache/gobblin/lineage/LineageInfoTest.java | 160 +++++++
 .../publisher/BaseDataPublisherTest.java        |  32 ++
 .../AvroGenericRecordAccessorTest.java          |  26 +-
 .../RecordAccessorProviderFactoryTest.java      |  26 +-
 .../security/ssl/SSLContextFactoryTest.java     |  16 +
 .../DatePartitionedAvroFileExtractorTest.java   |  19 +-
 .../extract/QueryBasedExtractorTest.java        |  16 +
 .../extractor/partition/PartitionerTest.java    |  16 +
 .../writer/CloseOnFlushWriterWrapperTest.java   |  77 +++-
 .../writer/MetadataWriterWrapperTest.java       |  16 +
 .../gobblin/writer/PartitionedWriterTest.java   |  37 ++
 .../resources/converter/jsonToAvroRecord.json   |  13 +
 .../resources/converter/jsonToAvroSchema.avsc   |  50 +++
 .../management/copy/OwnerAndPermission.java     |  29 ++
 .../converter/AbstractAvroToOrcConverter.java   |   2 +
 .../management/copy/CloseableFsCopySource.java  |   2 +-
 .../data/management/copy/CopyConfiguration.java |   5 +
 .../data/management/copy/CopyEntity.java        |  17 +-
 .../data/management/copy/CopySource.java        |  14 +-
 .../copy/CopyableDatasetMetadata.java           |  16 +-
 .../data/management/copy/CopyableFile.java      |  12 +-
 .../copy/RecursiveCopyableDataset.java          |  11 +-
 .../management/copy/RecursivePathFinder.java    |   9 +-
 .../FileAwareInputStreamExtractor.java          |   4 +
 .../copy/hive/HivePartitionFileSet.java         |   3 +-
 .../copy/hive/UnpartitionedTableFileSet.java    |   2 +-
 .../copy/publisher/CopyDataPublisher.java       |   8 +
 .../replication/ConfigBasedMultiDatasets.java   |  15 +-
 .../replication/CopyRouteGeneratorBase.java     |   7 +
 .../copy/replication/DataFlowTopology.java      |  10 +
 .../replication/ReplicationConfiguration.java   |   2 +
 .../writer/FileAwareInputStreamDataWriter.java  |   7 +
 .../data/management/dataset/DatasetUtils.java   |   9 +
 .../dataset/DefaultFileSystemGlobFinder.java    |  16 +
 .../management/source/DatasetFinderSource.java  | 141 ++++++
 .../source/LoopingDatasetFinderSource.java      | 226 ++++++++++
 .../hive/CopyPartitionParametersTest.java       |  16 +
 .../copy/ConcurrentBoundedWorkUnitListTest.java |   2 +-
 .../copy/CopySourcePrioritizationTest.java      |   2 +-
 .../data/management/copy/CopyableFileTest.java  |   4 +-
 .../data/management/copy/CopyableFileUtils.java |   4 +-
 .../source/DatasetFinderSourceTest.java         | 137 ++++++
 .../source/LoopingDatasetFinderSourceTest.java  | 218 ++++++++++
 gobblin-docs/Powered-By.md                      |  15 +-
 .../case-studies/Kafka-HDFS-Ingestion.md        |   4 +-
 .../hive/HiveMetaStoreClientFactoryTest.java    |  16 +
 .../gobblin/metastore/DatasetStateStore.java    |  64 +++
 .../gobblin/metastore/DatasetStoreDataset.java  |  56 +++
 .../metastore/DatasetStoreDatasetFinder.java    |  93 ++++
 .../apache/gobblin/metastore/FsStateStore.java  |  13 +-
 .../apache/gobblin/metastore/StateStore.java    |  19 +
 .../metadata/DatasetStateStoreEntryManager.java |  60 +++
 .../metadata/StateStoreEntryManager.java        |  63 +++
 .../metastore/predicates/DatasetPredicate.java  |  55 +++
 .../predicates/StateStorePredicate.java         |  43 ++
 .../predicates/StoreNamePredicate.java          |  44 ++
 .../config/checkstyle/suppressions.xml          |  10 +
 .../gobblin/metrics/event/sla/SlaEventKeys.java |   1 +
 ...roToJsonRecordWithMetadataConverterTest.java |  16 +
 .../retention/Avro2OrcStaleDatasetCleaner.java  |  16 +
 .../compliance/purger/HivePurgerWriterTest.java |  16 +
 .../converter/AnyToCouchbaseJsonConverter.java  |  26 +-
 .../writer/CouchbaseEnvironmentFactory.java     |  26 +-
 .../AnyToCouchbaseJsonConverterTest.java        |  26 +-
 .../AvroStringFieldEncryptorConverter.java      |  26 +-
 ...ordToEncryptedSerializedRecordConverter.java |  26 +-
 .../StringFieldEncryptorConverter.java          |  26 +-
 .../AvroStringFieldEncryptorConverterTest.java  |  16 +
 .../crypto/GobblinEncryptionProviderTest.java   |  16 +
 ...oEncryptedSerializedRecordConverterBase.java |  26 +-
 .../apache/gobblin/crypto/GPGFileDecryptor.java | 133 +++++-
 .../gobblin/crypto/GPGFileDecryptorTest.java    |  61 +++
 .../crypto/gpg/KeyBasedEncryptionFile.txt.gpg   | Bin 0 -> 383 bytes
 .../gpg/PasswordBasedEncryptionFile.txt.gpg     |   2 +
 .../src/test/resources/crypto/gpg/private.key   |  59 +++
 .../gobblin/eventhub/EventhubMetricNames.java   |  16 +
 .../writer/BatchedEventhubDataWriter.java       |  16 +
 .../writer/EventhubBatchAccumulator.java        |  16 +
 .../eventhub/writer/EventhubDataWriter.java     |  27 +-
 .../writer/EventhubDataWriterBuilder.java       |  26 +-
 .../eventhub/writer/EventhubRequest.java        |  26 +-
 .../writer/BatchedEventhubDataWriterTest.java   |  26 +-
 .../writer/EventhubAccumulatorTest.java         |  16 +
 .../eventhub/writer/EventhubBatchTest.java      |  16 +
 .../eventhub/writer/EventhubDataWriterTest.java |  26 +-
 .../converter/AsyncHttpJoinConverter.java       |  16 +
 .../converter/AvroApacheHttpJoinConverter.java  |  16 +
 .../converter/AvroHttpJoinConverter.java        |  16 +
 .../gobblin/converter/AvroR2JoinConverter.java  |  16 +
 .../gobblin/converter/HttpJoinConverter.java    |  16 +
 .../gobblin/http/ApacheHttpAsyncClient.java     |  16 +
 .../apache/gobblin/http/ApacheHttpClient.java   |  16 +
 .../gobblin/http/ApacheHttpRequestBuilder.java  |  16 +
 .../gobblin/http/ApacheHttpResponseHandler.java |  16 +
 .../gobblin/http/ApacheHttpResponseStatus.java  |  16 +
 .../gobblin/http/ThrottledHttpClient.java       |  16 +
 .../org/apache/gobblin/r2/R2ResponseStatus.java |  16 +
 .../apache/gobblin/r2/R2RestRequestBuilder.java |  16 +
 .../gobblin/r2/R2RestResponseHandler.java       |  16 +
 .../org/apache/gobblin/utils/HttpConstants.java |  16 +
 .../org/apache/gobblin/utils/HttpUtils.java     |  16 +
 .../gobblin/writer/AvroHttpWriterBuilder.java   |  16 +
 .../java/org/apache/gobblin/HttpTestUtils.java  |  16 +
 .../org/apache/gobblin/MockGenericRecord.java   |  16 +
 .../http/ApacheHttpRequestBuilderTest.java      |  16 +
 .../apache/gobblin/r2/R2ClientFactoryTest.java  |  16 +
 .../gobblin/r2/R2RestRequestBuilderTest.java    |  16 +
 .../org/apache/gobblin/util/HttpUtilsTest.java  |  16 +
 .../gobblin/writer/AsyncHttpWriterTest.java     |  16 +
 .../KafkaSchemaRegistryConfigurationKeys.java   |   3 +
 .../kafka/schemareg/LiKafkaSchemaRegistry.java  |  33 +-
 .../writer/KafkaWriterConfigurationKeys.java    |   2 +
 .../gobblin/kafka/writer/KafkaWriterHelper.java |   1 +
 .../metrics/kafka/KafkaAvroSchemaRegistry.java  |  11 +
 .../reporter/util/KafkaAvroReporterUtil.java    |  75 ++++
 ...thMetadataToEnvelopedRecordWithMetadata.java |  26 +-
 .../apache/gobblin/metadata/types/Metadata.java |  16 +
 .../apache/gobblin/type/ContentTypeUtils.java   |  26 +-
 .../apache/gobblin/type/RecordWithMetadata.java |  26 +-
 .../type/SerializedRecordWithMetadata.java      |  26 +-
 .../converter/MetadataConverterWrapperTest.java |  26 +-
 ...tadataToEnvelopedRecordWithMetadataTest.java |  26 +-
 .../metadata/GlobalMetadataCollectorTest.java   |  16 +
 .../extractor/extract/jdbc/MysqlSource.java     |  13 +
 .../gobblin/source/jdbc/JdbcExtractor.java      |  34 +-
 .../gobblin/source/jdbc/JdbcExtractorTest.java  |  12 +
 .../google/AsyncIteratorWithDataSink.java       |  16 +
 .../GoggleIngestionConfigurationKeys.java       |  16 +
 .../ingestion/google/util/SchemaUtil.java       |  16 +
 .../local/gobblin-oozie-example-workflow.xml    |   2 +-
 .../config/checkstyle/suppressions.xml          |  10 +
 .../config/checkstyle/suppressions.xml          |  10 +
 .../config/checkstyle/suppressions.xml          |  10 +
 .../config/checkstyle/suppressions.xml          |  10 +
 .../config/checkstyle/suppressions.xml          |  10 +
 .../ThrottlingGuiceServletConfig.java           |  19 +-
 ...adoopKerberosKeytabAuthenticationPlugin.java |  19 +-
 ...adoopKerberosKeytabAuthenticationPlugin.java |  14 +
 .../gobblin/runtime/fork/MockTaskContext.java   |  19 +-
 .../main/java/gobblin/runtime/TaskState.java    |  16 +
 .../mapreduce/GobblinWorkUnitsInputFormat.java  |  33 ++
 .../runtime/CheckpointableWatermarkState.java   |  26 +-
 .../apache/gobblin/runtime/ExecutionModel.java  |  26 +-
 .../gobblin/runtime/FsDatasetStateStore.java    |  84 +++-
 ...RegTaskStateCollectorServiceHandlerImpl.java |  54 +++
 .../org/apache/gobblin/runtime/JobContext.java  |   4 +-
 .../gobblin/runtime/SafeDatasetCommit.java      |  39 +-
 .../StateStoreBasedWatermarkStorage.java        |  27 +-
 .../StateStoreBasedWatermarkStorageCli.java     |  26 +-
 .../java/org/apache/gobblin/runtime/Task.java   |  17 +-
 .../gobblin/runtime/TaskConfigurationKeys.java  |  29 +-
 .../apache/gobblin/runtime/TaskExecutor.java    | 233 +++++++++-
 .../runtime/TaskInstantiationException.java     |  26 +-
 .../runtime/TaskStateCollectorService.java      |  67 ++-
 .../TaskStateCollectorServiceHandler.java       |  44 ++
 ...teCollectorServiceHiveRegHandlerFactory.java |  33 ++
 .../runtime/commit/DatasetStateCommitStep.java  |   4 +-
 .../gobblin/runtime/crypto/DecryptCli.java      |  16 +
 .../gobblin/runtime/fork/AsynchronousFork.java  |  19 +-
 .../org/apache/gobblin/runtime/fork/Fork.java   |  19 +-
 .../gobblin/runtime/fork/SynchronousFork.java   |  19 +-
 .../FsDatasetStateStoreEntryManager.java        |  51 +++
 .../runtime/services/JMXReportingService.java   |  13 +
 .../gobblin/runtime/spec_store/FSSpecStore.java |   4 +-
 .../apache/gobblin/runtime/task/FailedTask.java |  16 +
 .../apache/gobblin/runtime/task/NoopTask.java   |  60 +++
 .../runtime/util/JobStateToJsonConverter.java   |  34 +-
 .../gobblin/runtime/util/StateStores.java       |  34 +-
 .../gobblin/scheduler/BaseGobblinJob.java       |  19 +-
 .../apache/gobblin/scheduler/JobScheduler.java  |  80 +++-
 .../runtime/FsDatasetStateStoreTest.java        |  92 ++++
 .../gobblin/runtime/JobBrokerInjectionTest.java |   2 +-
 .../gobblin/runtime/JobLauncherTestHelper.java  |  14 +-
 .../gobblin/runtime/LimiterStopEventTest.java   |  16 +
 .../gobblin/runtime/TaskContinuousTest.java     |  26 +-
 .../runtime/TaskStateCollectorServiceTest.java  |  14 +
 .../runtime/commit/CommitSequenceTest.java      |   2 +-
 .../instance/hadoop/TestHadoopConfigLoader.java |  19 +-
 .../FileBasedJobLockFactoryManagerTest.java     |  15 +-
 .../util/JobStateToJsonConverterTest.java       |  38 +-
 .../salesforce/SalesforceSourceTest.java        |  16 +
 gobblin-service/build.gradle                    |   1 +
 .../gobblin/service/ServiceConfigKeys.java      |   1 +
 .../service/modules/core/GitConfigMonitor.java  | 434 +++++++++++++++++++
 .../modules/core/GobblinServiceManager.java     |  31 ++
 .../scheduler/GobblinServiceJobScheduler.java   |  10 +
 .../modules/core/GitConfigMonitorTest.java      | 334 ++++++++++++++
 .../modules/core/GobblinServiceManagerTest.java |  76 +++-
 .../gobblin/GobblinLocalJobLauncherUtils.java   |  16 +
 .../TaskSkipErrRecordsIntegrationTest.java      |  79 ++++
 .../org/apache/gobblin/TestAvroConverter.java   |  49 +++
 .../org/apache/gobblin/TestAvroExtractor.java   |  16 +
 .../java/org/apache/gobblin/TestAvroSource.java |  16 +
 .../WriterOutputFormatIntegrationTest.java      |  16 +
 .../task_skip_err_records.properties            |  42 ++
 .../java/org/apache/gobblin/util/AvroUtils.java | 109 +++++
 .../apache/gobblin/util/DatePartitionType.java  |  20 +-
 .../org/apache/gobblin/util/FileListUtils.java  |  50 ++-
 .../org/apache/gobblin/util/HadoopUtils.java    |  20 +
 .../java/org/apache/gobblin/util/HostUtils.java |  16 +
 .../java/org/apache/gobblin/util/JvmUtils.java  |  16 +
 .../java/org/apache/gobblin/util/PathUtils.java |  12 +
 .../java/org/apache/gobblin/util/PortUtils.java |  20 +-
 .../org/apache/gobblin/util/PullFileLoader.java |  29 +-
 .../util/executors/IteratorExecutor.java        |  31 ++
 .../util/executors/MDCPropagatingCallable.java  |  20 +-
 .../MDCPropagatingExecutorService.java          |  20 +-
 .../util/executors/MDCPropagatingRunnable.java  |  20 +-
 .../MDCPropagatingScheduledExecutorService.java |  20 +-
 .../util/hadoop/GobblinSequenceFileReader.java  |  49 +++
 .../gobblin/util/io/EmptyInputStream.java       |  36 ++
 .../gobblin/util/limiter/LimiterFactory.java    |   3 +-
 .../gobblin/util/limiter/MultiLimiter.java      |   3 +-
 .../gobblin/util/limiter/NoopLimiter.java       |   3 +-
 .../limiter/broker/SharedLimiterFactory.java    |   3 +-
 .../util/limiter/broker/SharedLimiterKey.java   |   3 +-
 .../org/apache/gobblin/util/AvroUtilsTest.java  |  23 +
 .../apache/gobblin/util/FileListUtilsTest.java  | 104 +++++
 .../org/apache/gobblin/util/PortUtilsTest.java  |  20 +-
 .../apache/gobblin/util/PullFileLoaderTest.java |  12 +-
 .../gobblin/util/limiter/MultiLimiterTest.java  |   2 +-
 .../broker/SharedLimiterFactoryTest.java        |   2 +-
 .../pullFileLoaderTest/dir1/dir1.configuration  |  19 +
 .../pullFileLoaderTest/dir1/dir1.properties     |  18 -
 gradle/scripts/dependencyDefinitions.gradle     |   1 +
 gradle/scripts/testSetup.gradle                 |   5 +
 334 files changed, 8363 insertions(+), 1487 deletions(-)
----------------------------------------------------------------------