You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/10/26 23:38:03 UTC

incubator-gobblin git commit: [GOBBLIN-299] Add deletion support to Azkaban Orchestrator

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 2d05b03d5 -> 278b48d41


[GOBBLIN-299] Add deletion support to Azkaban Orchestrator

Closes #2154 from
abti/service_azkaban_orchestrator


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/278b48d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/278b48d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/278b48d4

Branch: refs/heads/master
Commit: 278b48d41464c1959b03c479025dac1543b9a0a7
Parents: 2d05b03
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Fri Oct 27 05:07:54 2017 +0530
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Fri Oct 27 05:07:54 2017 +0530

----------------------------------------------------------------------
 .../orchestration/AzkabanAjaxAPIClient.java     | 16 +++++++++
 .../modules/orchestration/AzkabanJobHelper.java | 14 ++++++++
 .../orchestration/AzkabanProjectConfig.java     |  2 +-
 .../orchestration/AzkabanSpecProducer.java      |  8 +++++
 .../modules/orchestration/Orchestrator.java     | 34 +++++++++++++++++++-
 .../scheduler/GobblinServiceJobScheduler.java   | 11 +++++--
 6 files changed, 81 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/278b48d4/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
index 9c3cee4..f54d6a5 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
@@ -157,6 +157,22 @@ public class AzkabanAjaxAPIClient {
   }
 
   /***
+   * Deletes an Azkaban project.
+   * @param sessionId Session Id.
+   * @param azkabanProjectConfig Azkaban Project Config.
+   * @throws IOException
+   */
+  public static void deleteAzkabanProject(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    Map<String, String> params = Maps.newHashMap();
+    params.put("delete", "true");
+    params.put("project", azkabanProjectConfig.getAzkabanProjectName());
+
+    executeGetRequest(prepareGetRequest(azkabanProjectConfig.getAzkabanServerUrl() + "/manager",
+        sessionId, params));
+  }
+
+  /***
    * Replace an existing Azkaban Project. If proxy user and group permissions are specified in
    * Azkaban Project Config, then this method also adds it to the project configuration.
    * @param sessionId Session Id.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/278b48d4/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
index 4fbe32b..09b6dde 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
@@ -121,6 +121,20 @@ public class AzkabanJobHelper {
   }
 
   /***
+   * Delete project on Azkaban based on Azkaban config.
+   * @param sessionId Session Id.
+   * @param azkabanProjectConfig Azkaban Project Config.
+   * @throws IOException
+   */
+  public static void deleteAzkabanJob(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    log.info("Deleting Azkaban project for: " + azkabanProjectConfig.getAzkabanProjectName());
+
+    // Delete project
+    AzkabanAjaxAPIClient.deleteAzkabanProject(sessionId, azkabanProjectConfig);
+  }
+
+  /***
    * Replace project on Azkaban based on Azkaban config. This includes preparing the zip file and uploading it to
    * Azkaban, setting permissions and schedule.
    * @param sessionId Session Id.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/278b48d4/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
index 583988b..4b8bb9c 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
@@ -86,7 +86,7 @@ public class AzkabanProjectConfig {
     this.failIfJarNotFound = ConfigUtils.getBoolean(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_FAIL_IF_JARNOTFOUND_KEY, false);
   }
 
-  private String constructProjectName(JobSpec jobSpec, Config config) {
+  public static String constructProjectName(JobSpec jobSpec, Config config) {
     String projectNamePrefix = ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_NAME_PREFIX_KEY, "");
     String projectNamePostfix = null == jobSpec.getUri() ? "" :
         jobSpec.getUri().toString().replaceAll("_", "-").replaceAll("[^A-Za-z0-9\\-]", "_");

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/278b48d4/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java
index 5a491ab..7b11cef 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java
@@ -141,6 +141,14 @@ public class AzkabanSpecProducer implements SpecProducer<Spec>, Closeable {
   @Override
   public Future<?> deleteSpec(URI deletedSpecURI) {
     // Delete project
+    JobSpec jobSpec = new JobSpec.Builder(deletedSpecURI).build();
+
+    try {
+      AzkabanJobHelper.deleteAzkabanJob(_sessionId, new AzkabanProjectConfig(jobSpec));
+    } catch (IOException e) {
+      throw new RuntimeException("Issue in deleting Azkaban project.", e);
+    }
+
     throw new UnsupportedOperationException();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/278b48d4/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 286911b..e2d36aa 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -196,7 +196,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
           producer = specsToExecute.getValue().getProducer().get();
           Spec jobSpec = specsToExecute.getKey();
 
-          _log.info(String.format("Going to orchestrate JobSpc: %s on Executor: %s", jobSpec, producer));
+          _log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec, producer));
           producer.addSpec(jobSpec);
         } catch(Exception e) {
           _log.error("Cannot successfully setup spec: " + specsToExecute.getKey() + " on executor: " + producer +
@@ -211,6 +211,38 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
   }
 
+  public void remove(Spec spec) {
+    // TODO: Evolve logic to cache and reuse previously compiled JobSpecs
+    // .. this will work for Identity compiler but not always for multi-hop.
+    // Note: Current logic assumes compilation is consistent between all executions
+    if (spec instanceof FlowSpec) {
+      Map<Spec, SpecExecutor> specExecutorInstanceMap = specCompiler.compileFlow(spec);
+
+      if (specExecutorInstanceMap.isEmpty()) {
+        _log.warn("Cannot determine an executor to delete Spec: " + spec);
+        return;
+      }
+
+      // Delete all compiled JobSpecs on their respective Executor
+      for (Map.Entry<Spec, SpecExecutor> specsToDelete : specExecutorInstanceMap.entrySet()) {
+        // Delete this spec on selected executor
+        SpecProducer producer = null;
+        try {
+          producer = specsToDelete.getValue().getProducer().get();
+          Spec jobSpec = specsToDelete.getKey();
+
+          _log.info(String.format("Going to delete JobSpec: %s on Executor: %s", jobSpec, producer));
+          producer.deleteSpec(jobSpec.getUri());
+        } catch(Exception e) {
+          _log.error("Cannot successfully delete spec: " + specsToDelete.getKey() + " on executor: " + producer +
+              " for flow: " + spec, e);
+        }
+      }
+    } else {
+      throw new RuntimeException("Spec not of type FlowSpec, cannot delete: " + spec);
+    }
+  }
+
   @Nonnull
   @Override
   public MetricContext getMetricContext() {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/278b48d4/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 0eabf2c..0c45daf 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -229,8 +229,15 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     }
 
     try {
-      this.scheduledFlowSpecs.remove(deletedSpecURI.toString());
-      unscheduleJob(deletedSpecURI.toString());
+      Spec deletedSpec = this.scheduledFlowSpecs.get(deletedSpecURI.toString());
+      if (null != deletedSpec) {
+        this.orchestrator.remove(deletedSpec);
+        this.scheduledFlowSpecs.remove(deletedSpecURI.toString());
+        unscheduleJob(deletedSpecURI.toString());
+      } else {
+        _log.warn(String.format("Spec with URI: %s was not found in cache. May be it was cleaned, if not please "
+                + "clean it manually", deletedSpecURI));
+      }
     } catch (JobException e) {
       _log.warn(String.format("Spec with URI: %s was not unscheduled cleaning",
           deletedSpecURI), e);