You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/04/30 23:55:19 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1135] added back flow remove feature for spec executors when dag manager is not enabled codesyle changes

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c908962  [GOBBLIN-1135] added back flow remove feature for spec executors when dag manager is not enabled codesyle changes
c908962 is described below

commit c908962e87cfd42c54da23f9160b460d277bb00e
Author: Arjun <ab...@linkedin.com>
AuthorDate: Thu Apr 30 16:55:11 2020 -0700

    [GOBBLIN-1135] added back flow remove feature for spec executors when dag manager is not enabled
    codesyle changes
    
    Closes #2974 from arjun4084346/deleteClusterJobs
---
 .../apache/gobblin/runtime/api/SpecExecutor.java   |  2 +-
 .../modules/orchestration/AzkabanSpecExecutor.java |  2 +-
 .../gobblin/service/SimpleKafkaSpecExecutor.java   |  2 +-
 .../AbstractSpecExecutor.java                      |  3 ++-
 .../InMemorySpecExecutor.java                      |  4 ++--
 .../spec_executorInstance/LocalFsSpecExecutor.java |  4 ++--
 .../spec_executorInstance/MockedSpecExecutor.java  |  4 ++--
 .../modules/orchestration/Orchestrator.java        | 22 +++++++++++++++++++++-
 .../modules/flow/MultiHopFlowCompilerTest.java     |  2 +-
 9 files changed, 33 insertions(+), 12 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
index 85ee7af..8569275 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
@@ -53,7 +53,7 @@ public interface SpecExecutor {
 
   /** A communication socket for generating spec to assigned physical executors, paired with
    * a consumer on the physical executor side. */
-  Future<? extends SpecProducer> getProducer();
+  Future<? extends SpecProducer<Spec>> getProducer();
 
   public static enum Verb {
     ADD(1, "add"),
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
index 0e6c4a0..bb970d9 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
@@ -70,7 +70,7 @@ public class AzkabanSpecExecutor extends AbstractSpecExecutor {
 
 
   @Override
-  public Future<? extends SpecProducer> getProducer() {
+  public Future<? extends SpecProducer<Spec>> getProducer() {
     return new CompletedFuture<>(this.azkabanSpecProducer, null);
   }
 
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
index c3dfcb3..29e735e 100644
--- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
@@ -58,7 +58,7 @@ public class SimpleKafkaSpecExecutor extends AbstractSpecExecutor {
   }
 
   @Override
-  public Future<? extends SpecProducer> getProducer() {
+  public Future<? extends SpecProducer<Spec>> getProducer() {
     return new CompletedFuture<>(this.specProducer, null);
   }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java
index e0a235c..ea8c497 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java
@@ -31,6 +31,7 @@ import com.google.common.io.Closer;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecConsumer;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.util.ConfigUtils;
@@ -182,7 +183,7 @@ public abstract class AbstractSpecExecutor extends AbstractIdleService implement
 
   abstract protected void shutDown() throws Exception;
 
-  abstract public Future<? extends SpecProducer> getProducer();
+  abstract public Future<? extends SpecProducer<Spec>> getProducer();
 
   abstract public Future<String> getDescription();
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java
index e0be4e9..4f3c899 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java
@@ -76,8 +76,8 @@ public class InMemorySpecExecutor extends AbstractSpecExecutor {
   }
 
   @Override
-  public Future<? extends SpecProducer> getProducer(){
-    return new CompletedFuture(this.inMemorySpecProducer, null);
+  public Future<? extends SpecProducer<Spec>> getProducer(){
+    return new CompletedFuture<>(this.inMemorySpecProducer, null);
   }
 
   @Override
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecExecutor.java
index 4d49ed0..c6c9536 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecExecutor.java
@@ -52,8 +52,8 @@ public class LocalFsSpecExecutor extends AbstractSpecExecutor {
   }
 
   @Override
-  public Future<? extends SpecProducer> getProducer(){
-    return new CompletedFuture(this.specProducer, null);
+  public Future<? extends SpecProducer<Spec>> getProducer(){
+    return new CompletedFuture<>(this.specProducer, null);
   }
 
   @Override
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
index 4a31a6e..5cfc53b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
@@ -54,7 +54,7 @@ public class MockedSpecExecutor extends InMemorySpecExecutor {
   }
 
   @Override
-  public Future<? extends SpecProducer> getProducer(){
-    return new CompletedFuture(this.mockedSpecProducer, null);
+  public Future<? extends SpecProducer<Spec>> getProducer(){
+    return new CompletedFuture<>(this.mockedSpecProducer, null);
   }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index cefd513..d545f9c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -351,7 +351,27 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
         _log.info("Forwarding cancel request for flow URI {} to DagManager.", spec.getUri());
         this.dagManager.get().stopDag(spec.getUri());
       } else {
-        _log.warn("Operation not supported.");
+        // If DagManager is not enabled, we need to recompile the flow to find the spec producer,
+        // If compilation results is different, it remove request can go to some different spec producer
+        Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+        if (jobExecutionPlanDag.isEmpty()) {
+          _log.warn("Cannot determine an executor to delete Spec: " + spec);
+          return;
+        }
+
+        // Delete all compiled JobSpecs on their respective Executor
+        for (Dag.DagNode<JobExecutionPlan> dagNode: jobExecutionPlanDag.getNodes()) {
+          JobExecutionPlan jobExecutionPlan = dagNode.getValue();
+          Spec jobSpec = jobExecutionPlan.getJobSpec();
+          try {
+            SpecProducer<Spec> producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
+            _log.info(String.format("Going to delete JobSpec: %s on Executor: %s", jobSpec, producer));
+            producer.deleteSpec(jobSpec.getUri(), headers);
+          } catch (Exception e) {
+            _log.error(String.format("Could not delete JobSpec: %s for flow: %s", jobSpec, spec), e);
+          }
+        }
       }
     } else {
       throw new RuntimeException("Spec not of type FlowSpec, cannot delete: " + spec);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index c03aa39..a99928e 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -761,7 +761,7 @@ public class MultiHopFlowCompilerTest {
     }
 
     @Override
-    public Future<? extends SpecProducer> getProducer() {
+    public Future<? extends SpecProducer<Spec>> getProducer() {
       return new CompletedFuture<>(this.azkabanSpecProducer, null);
     }