You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/04/30 23:55:19 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1135] added
back flow remove feature for spec executors when dag manager is not enabled
codesyle changes
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c908962 [GOBBLIN-1135] added back flow remove feature for spec executors when dag manager is not enabled codesyle changes
c908962 is described below
commit c908962e87cfd42c54da23f9160b460d277bb00e
Author: Arjun <ab...@linkedin.com>
AuthorDate: Thu Apr 30 16:55:11 2020 -0700
[GOBBLIN-1135] added back flow remove feature for spec executors when dag manager is not enabled
codesyle changes
Closes #2974 from arjun4084346/deleteClusterJobs
---
.../apache/gobblin/runtime/api/SpecExecutor.java | 2 +-
.../modules/orchestration/AzkabanSpecExecutor.java | 2 +-
.../gobblin/service/SimpleKafkaSpecExecutor.java | 2 +-
.../AbstractSpecExecutor.java | 3 ++-
.../InMemorySpecExecutor.java | 4 ++--
.../spec_executorInstance/LocalFsSpecExecutor.java | 4 ++--
.../spec_executorInstance/MockedSpecExecutor.java | 4 ++--
.../modules/orchestration/Orchestrator.java | 22 +++++++++++++++++++++-
.../modules/flow/MultiHopFlowCompilerTest.java | 2 +-
9 files changed, 33 insertions(+), 12 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
index 85ee7af..8569275 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
@@ -53,7 +53,7 @@ public interface SpecExecutor {
/** A communication socket for generating spec to assigned physical executors, paired with
* a consumer on the physical executor side. */
- Future<? extends SpecProducer> getProducer();
+ Future<? extends SpecProducer<Spec>> getProducer();
public static enum Verb {
ADD(1, "add"),
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
index 0e6c4a0..bb970d9 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
@@ -70,7 +70,7 @@ public class AzkabanSpecExecutor extends AbstractSpecExecutor {
@Override
- public Future<? extends SpecProducer> getProducer() {
+ public Future<? extends SpecProducer<Spec>> getProducer() {
return new CompletedFuture<>(this.azkabanSpecProducer, null);
}
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
index c3dfcb3..29e735e 100644
--- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
@@ -58,7 +58,7 @@ public class SimpleKafkaSpecExecutor extends AbstractSpecExecutor {
}
@Override
- public Future<? extends SpecProducer> getProducer() {
+ public Future<? extends SpecProducer<Spec>> getProducer() {
return new CompletedFuture<>(this.specProducer, null);
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java
index e0a235c..ea8c497 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java
@@ -31,6 +31,7 @@ import com.google.common.io.Closer;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
+import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecConsumer;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
@@ -182,7 +183,7 @@ public abstract class AbstractSpecExecutor extends AbstractIdleService implement
abstract protected void shutDown() throws Exception;
- abstract public Future<? extends SpecProducer> getProducer();
+ abstract public Future<? extends SpecProducer<Spec>> getProducer();
abstract public Future<String> getDescription();
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java
index e0be4e9..4f3c899 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java
@@ -76,8 +76,8 @@ public class InMemorySpecExecutor extends AbstractSpecExecutor {
}
@Override
- public Future<? extends SpecProducer> getProducer(){
- return new CompletedFuture(this.inMemorySpecProducer, null);
+ public Future<? extends SpecProducer<Spec>> getProducer(){
+ return new CompletedFuture<>(this.inMemorySpecProducer, null);
}
@Override
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecExecutor.java
index 4d49ed0..c6c9536 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecExecutor.java
@@ -52,8 +52,8 @@ public class LocalFsSpecExecutor extends AbstractSpecExecutor {
}
@Override
- public Future<? extends SpecProducer> getProducer(){
- return new CompletedFuture(this.specProducer, null);
+ public Future<? extends SpecProducer<Spec>> getProducer(){
+ return new CompletedFuture<>(this.specProducer, null);
}
@Override
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
index 4a31a6e..5cfc53b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
@@ -54,7 +54,7 @@ public class MockedSpecExecutor extends InMemorySpecExecutor {
}
@Override
- public Future<? extends SpecProducer> getProducer(){
- return new CompletedFuture(this.mockedSpecProducer, null);
+ public Future<? extends SpecProducer<Spec>> getProducer(){
+ return new CompletedFuture<>(this.mockedSpecProducer, null);
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index cefd513..d545f9c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -351,7 +351,27 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
_log.info("Forwarding cancel request for flow URI {} to DagManager.", spec.getUri());
this.dagManager.get().stopDag(spec.getUri());
} else {
- _log.warn("Operation not supported.");
+ // If DagManager is not enabled, we need to recompile the flow to find the spec producer,
+ // If compilation results is different, it remove request can go to some different spec producer
+ Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+ if (jobExecutionPlanDag.isEmpty()) {
+ _log.warn("Cannot determine an executor to delete Spec: " + spec);
+ return;
+ }
+
+ // Delete all compiled JobSpecs on their respective Executor
+ for (Dag.DagNode<JobExecutionPlan> dagNode: jobExecutionPlanDag.getNodes()) {
+ JobExecutionPlan jobExecutionPlan = dagNode.getValue();
+ Spec jobSpec = jobExecutionPlan.getJobSpec();
+ try {
+ SpecProducer<Spec> producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
+ _log.info(String.format("Going to delete JobSpec: %s on Executor: %s", jobSpec, producer));
+ producer.deleteSpec(jobSpec.getUri(), headers);
+ } catch (Exception e) {
+ _log.error(String.format("Could not delete JobSpec: %s for flow: %s", jobSpec, spec), e);
+ }
+ }
}
} else {
throw new RuntimeException("Spec not of type FlowSpec, cannot delete: " + spec);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index c03aa39..a99928e 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -761,7 +761,7 @@ public class MultiHopFlowCompilerTest {
}
@Override
- public Future<? extends SpecProducer> getProducer() {
+ public Future<? extends SpecProducer<Spec>> getProducer() {
return new CompletedFuture<>(this.azkabanSpecProducer, null);
}