You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/01/30 17:51:01 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1017] Deprecate
FlowStatus in favor of FlowExecution and add endpoint to kill flows
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 978e339 [GOBBLIN-1017] Deprecate FlowStatus in favor of FlowExecution and add endpoint to kill flows
978e339 is described below
commit 978e339a5ecc17fb939bacae48e6e65a5198154b
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Thu Jan 30 09:50:54 2020 -0800
[GOBBLIN-1017] Deprecate FlowStatus in favor of FlowExecution and add endpoint to kill flows
Closes #2862 from jack-moseley/flow-execution
---
...che.gobblin.service.flowconfigsV2.restspec.json | 2 +-
...he.gobblin.service.flowexecutions.restspec.json | 40 +++++++++
.../{FlowStatus.pdsc => FlowExecution.pdsc} | 4 +-
.../org/apache/gobblin/service/FlowStatus.pdsc | 1 +
...che.gobblin.service.flowconfigsV2.snapshot.json | 2 +-
...e.gobblin.service.flowexecutions.snapshot.json} | 39 +++++----
...ache.gobblin.service.flowstatuses.snapshot.json | 19 +++--
...wStatusClient.java => FlowExecutionClient.java} | 84 +++++++++----------
.../apache/gobblin/service/FlowStatusClient.java | 14 ++--
...tusResource.java => FlowExecutionResource.java} | 84 ++++++++++++-------
.../apache/gobblin/service/FlowStatusResource.java | 97 ++--------------------
.../service/monitoring/FlowStatusGenerator.java | 9 ++
.../gobblin/service/monitoring/KillFlowEvent.java | 30 +++++++
.../modules/core/GobblinServiceManager.java | 2 +
.../service/modules/orchestration/DagManager.java | 29 +++++--
15 files changed, 253 insertions(+), 203 deletions(-)
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
index 0127c42..ce0714c 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
@@ -10,7 +10,7 @@
"type" : "org.apache.gobblin.service.FlowId",
"params" : "org.apache.gobblin.service.FlowStatusId"
},
- "supports" : [ "create", "delete", "get", "update", "partial_update" ],
+ "supports" : [ "create", "delete", "get", "partial_update", "update" ],
"methods" : [ {
"annotations" : {
"returnEntity" : { }
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
new file mode 100644
index 0000000..210e2b0
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
@@ -0,0 +1,40 @@
+{
+ "name" : "flowexecutions",
+ "namespace" : "org.apache.gobblin.service",
+ "path" : "/flowexecutions",
+ "schema" : "org.apache.gobblin.service.FlowExecution",
+ "doc" : "Resource for handling flow execution requests\n\ngenerated from: org.apache.gobblin.service.FlowExecutionResource",
+ "collection" : {
+ "identifier" : {
+ "name" : "id",
+ "type" : "org.apache.gobblin.service.FlowStatusId",
+ "params" : "com.linkedin.restli.common.EmptyRecord"
+ },
+ "supports" : [ "delete", "get" ],
+ "methods" : [ {
+ "method" : "get",
+ "doc" : "Retrieve the FlowExecution with the given key"
+ }, {
+ "method" : "delete",
+ "doc" : "Kill the FlowExecution with the given key"
+ } ],
+ "finders" : [ {
+ "name" : "latestFlowExecution",
+ "parameters" : [ {
+ "name" : "flowId",
+ "type" : "org.apache.gobblin.service.FlowId"
+ }, {
+ "name" : "count",
+ "type" : "int",
+ "optional" : true
+ }, {
+ "name" : "tag",
+ "type" : "string",
+ "optional" : true
+ } ]
+ } ],
+ "entity" : {
+ "path" : "/flowexecutions/{id}"
+ }
+ }
+}
\ No newline at end of file
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowStatus.pdsc b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowExecution.pdsc
similarity index 90%
copy from gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowStatus.pdsc
copy to gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowExecution.pdsc
index cd2ed2b..2736366 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowStatus.pdsc
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowExecution.pdsc
@@ -1,8 +1,8 @@
{
"type" : "record",
- "name" : "FlowStatus",
+ "name" : "FlowExecution",
"namespace" : "org.apache.gobblin.service",
- "doc" : "Status of a flow",
+ "doc" : "Represents an execution of a flow",
"fields" : [
{
"name" : "id",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowStatus.pdsc b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowStatus.pdsc
index cd2ed2b..165100f 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowStatus.pdsc
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowStatus.pdsc
@@ -3,6 +3,7 @@
"name" : "FlowStatus",
"namespace" : "org.apache.gobblin.service",
"doc" : "Status of a flow",
+ "deprecated" : "Use FlowExecution instead",
"fields" : [
{
"name" : "id",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
index 359e6bc..50064ca 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
@@ -108,7 +108,7 @@
"type" : "org.apache.gobblin.service.FlowId",
"params" : "org.apache.gobblin.service.FlowStatusId"
},
- "supports" : [ "create", "delete", "get", "update", "partial_update" ],
+ "supports" : [ "create", "delete", "get", "partial_update", "update" ],
"methods" : [ {
"annotations" : {
"returnEntity" : { }
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
similarity index 86%
copy from gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
copy to gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
index 8b60aa1..895d706 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
@@ -38,14 +38,14 @@
"doc" : "Execution status for a flow or job",
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
"symbolDocs" : {
- "COMPILED":"Flow compiled to jobs.",
- "PENDING":"Flow or job is in pending state.",
- "PENDING_RETRY":"Flow or job is pending retry.",
- "ORCHESTRATED":"Job(s) orchestrated to spec executors.",
- "RUNNING": "Flow or job is currently executing",
- "COMPLETE":"Flow or job completed execution",
- "FAILED":"Flow or job failed",
- "CANCELLED":"Flow cancelled."
+ "COMPILED" : "Flow compiled to jobs.",
+ "PENDING" : "Flow or job is in pending state.",
+ "PENDING_RETRY" : "Flow or job is pending retry.",
+ "ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
+ "RUNNING" : "Flow or job is currently executing",
+ "COMPLETE" : "Flow or job completed execution",
+ "FAILED" : "Flow or job failed",
+ "CANCELLED" : "Flow cancelled."
}
}, {
"type" : "record",
@@ -168,9 +168,9 @@
} ]
}, {
"type" : "record",
- "name" : "FlowStatus",
+ "name" : "FlowExecution",
"namespace" : "org.apache.gobblin.service",
- "doc" : "Status of a flow",
+ "doc" : "Represents an execution of a flow",
"fields" : [ {
"name" : "id",
"type" : "FlowStatusId",
@@ -206,24 +206,27 @@
}
} ],
"schema" : {
- "name" : "flowstatuses",
+ "name" : "flowexecutions",
"namespace" : "org.apache.gobblin.service",
- "path" : "/flowstatuses",
- "schema" : "org.apache.gobblin.service.FlowStatus",
- "doc" : "Resource for handling flow status requests\n\ngenerated from: org.apache.gobblin.service.FlowStatusResource",
+ "path" : "/flowexecutions",
+ "schema" : "org.apache.gobblin.service.FlowExecution",
+ "doc" : "Resource for handling flow execution requests\n\ngenerated from: org.apache.gobblin.service.FlowExecutionResource",
"collection" : {
"identifier" : {
"name" : "id",
"type" : "org.apache.gobblin.service.FlowStatusId",
"params" : "com.linkedin.restli.common.EmptyRecord"
},
- "supports" : [ "get" ],
+ "supports" : [ "delete", "get" ],
"methods" : [ {
"method" : "get",
- "doc" : "Retrieve the FlowStatus with the given key"
+ "doc" : "Retrieve the FlowExecution with the given key"
+ }, {
+ "method" : "delete",
+ "doc" : "Kill the FlowExecution with the given key"
} ],
"finders" : [ {
- "name" : "latestFlowStatus",
+ "name" : "latestFlowExecution",
"parameters" : [ {
"name" : "flowId",
"type" : "org.apache.gobblin.service.FlowId"
@@ -238,7 +241,7 @@
} ]
} ],
"entity" : {
- "path" : "/flowstatuses/{id}"
+ "path" : "/flowexecutions/{id}"
}
}
}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
index 8b60aa1..e3b171f 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
@@ -38,14 +38,14 @@
"doc" : "Execution status for a flow or job",
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
"symbolDocs" : {
- "COMPILED":"Flow compiled to jobs.",
- "PENDING":"Flow or job is in pending state.",
- "PENDING_RETRY":"Flow or job is pending retry.",
- "ORCHESTRATED":"Job(s) orchestrated to spec executors.",
- "RUNNING": "Flow or job is currently executing",
- "COMPLETE":"Flow or job completed execution",
- "FAILED":"Flow or job failed",
- "CANCELLED":"Flow cancelled."
+ "COMPILED" : "Flow compiled to jobs.",
+ "PENDING" : "Flow or job is in pending state.",
+ "PENDING_RETRY" : "Flow or job is pending retry.",
+ "ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
+ "RUNNING" : "Flow or job is currently executing",
+ "COMPLETE" : "Flow or job completed execution",
+ "FAILED" : "Flow or job failed",
+ "CANCELLED" : "Flow cancelled."
}
}, {
"type" : "record",
@@ -194,7 +194,8 @@
"items" : "JobStatus"
},
"doc" : "Status of jobs belonging to the flow"
- } ]
+ } ],
+ "deprecated" : "Use FlowExecution instead"
}, {
"type" : "record",
"name" : "EmptyRecord",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
similarity index 57%
copy from gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
copy to gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
index 3da1547..5cbd0f0 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
@@ -43,115 +43,115 @@ import com.linkedin.restli.common.EmptyRecord;
/**
- * Flow status client for REST flow status server
+ * Flow execution client for REST flow execution server
*/
-public class FlowStatusClient implements Closeable {
- private static final Logger LOG = LoggerFactory.getLogger(FlowStatusClient.class);
+public class FlowExecutionClient implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(FlowExecutionClient.class);
private Optional<HttpClientFactory> _httpClientFactory;
private Optional<RestClient> _restClient;
- private final FlowstatusesRequestBuilders _flowstatusesRequestBuilders;
+ private final FlowexecutionsRequestBuilders _flowexecutionsRequestBuilders;
/**
- * Construct a {@link FlowStatusClient} to communicate with http flow status server at URI serverUri
+ * Construct a {@link FlowExecutionClient} to communicate with http flow execution server at URI serverUri
* @param serverUri address and port of the REST server
*/
- public FlowStatusClient(String serverUri) {
- LOG.debug("FlowConfigClient with serverUri " + serverUri);
+ public FlowExecutionClient(String serverUri) {
+ LOG.debug("FlowExecutionClient with serverUri " + serverUri);
_httpClientFactory = Optional.of(new HttpClientFactory());
Client r2Client = new TransportClientAdapter(_httpClientFactory.get().getClient(Collections.<String, String>emptyMap()));
_restClient = Optional.of(new RestClient(r2Client, serverUri));
- _flowstatusesRequestBuilders = createRequestBuilders();
+ _flowexecutionsRequestBuilders = createRequestBuilders();
}
/**
- * Construct a {@link FlowStatusClient} to communicate with http flow status server at URI serverUri
+ * Construct a {@link FlowExecutionClient} to communicate with http flow execution server at URI serverUri
* @param restClient restClient to send restli request
*/
- public FlowStatusClient(RestClient restClient) {
- LOG.debug("FlowConfigClient with restClient " + restClient);
+ public FlowExecutionClient(RestClient restClient) {
+ LOG.debug("FlowExecutionClient with restClient " + restClient);
_httpClientFactory = Optional.absent();
_restClient = Optional.of(restClient);
- _flowstatusesRequestBuilders = createRequestBuilders();
+ _flowexecutionsRequestBuilders = createRequestBuilders();
}
- protected FlowstatusesRequestBuilders createRequestBuilders() {
- return new FlowstatusesRequestBuilders();
+ protected FlowexecutionsRequestBuilders createRequestBuilders() {
+ return new FlowexecutionsRequestBuilders();
}
/**
- * Get a flow status
- * @param flowStatusId identifier of flow status to get
- * @return a {@link FlowStatus} with the flow status
+ * Get a flow execution
+ * @param flowStatusId identifier of flow execution to get
+ * @return a {@link FlowExecution} with the flow execution
* @throws RemoteInvocationException
*/
- public FlowStatus getFlowStatus(FlowStatusId flowStatusId)
+ public FlowExecution getFlowExecution(FlowStatusId flowStatusId)
throws RemoteInvocationException {
- LOG.debug("getFlowConfig with groupName " + flowStatusId.getFlowGroup() + " flowName " +
+ LOG.debug("getFlowExecution with groupName " + flowStatusId.getFlowGroup() + " flowName " +
flowStatusId.getFlowName());
- GetRequest<FlowStatus> getRequest = _flowstatusesRequestBuilders.get()
+ GetRequest<FlowExecution> getRequest = _flowexecutionsRequestBuilders.get()
.id(new ComplexResourceKey<>(flowStatusId, new EmptyRecord())).build();
- Response<FlowStatus> response =
+ Response<FlowExecution> response =
_restClient.get().sendRequest(getRequest).getResponse();
return response.getEntity();
}
/**
- * Get the latest flow status
- * @param flowId identifier of flow status to get
- * @return a {@link FlowStatus} with the flow status
+ * Get the latest flow execution
+ * @param flowId identifier of flow execution to get
+ * @return a {@link FlowExecution}
* @throws RemoteInvocationException
*/
- public FlowStatus getLatestFlowStatus(FlowId flowId)
+ public FlowExecution getLatestFlowExecution(FlowId flowId)
throws RemoteInvocationException {
- LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " +
+ LOG.debug("getFlowExecution with groupName " + flowId.getFlowGroup() + " flowName " +
flowId.getFlowName());
- FindRequest<FlowStatus> findRequest = _flowstatusesRequestBuilders.findByLatestFlowStatus().flowIdParam(flowId).build();
+ FindRequest<FlowExecution> findRequest = _flowexecutionsRequestBuilders.findByLatestFlowExecution().flowIdParam(flowId).build();
- Response<CollectionResponse<FlowStatus>> response =
+ Response<CollectionResponse<FlowExecution>> response =
_restClient.get().sendRequest(findRequest).getResponse();
- List<FlowStatus> flowStatusList = response.getEntity().getElements();
+ List<FlowExecution> flowExecutionList = response.getEntity().getElements();
- if (flowStatusList.isEmpty()) {
+ if (flowExecutionList.isEmpty()) {
return null;
} else {
- Preconditions.checkArgument(flowStatusList.size() == 1);
- return flowStatusList.get(0);
+ Preconditions.checkArgument(flowExecutionList.size() == 1);
+ return flowExecutionList.get(0);
}
}
/**
- * Get the latest flow status
- * @param flowId identifier of flow status to get
- * @return a list of {@link FlowStatus}es corresponding to the latest <code>count</code> executions, containing only
+ * Get the latest k flow executions
+ * @param flowId identifier of flow execution to get
+ * @return a list of {@link FlowExecution}es corresponding to the latest <code>count</code> executions, containing only
* jobStatuses that match the given tag.
* @throws RemoteInvocationException
*/
- public List<FlowStatus> getLatestFlowStatus(FlowId flowId, Integer count, String tag)
+ public List<FlowExecution> getLatestFlowExecution(FlowId flowId, Integer count, String tag)
throws RemoteInvocationException {
- LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " +
+ LOG.debug("getFlowExecution with groupName " + flowId.getFlowGroup() + " flowName " +
flowId.getFlowName() + " count " + Integer.toString(count));
- FindRequest<FlowStatus> findRequest = _flowstatusesRequestBuilders.findByLatestFlowStatus().flowIdParam(flowId).
+ FindRequest<FlowExecution> findRequest = _flowexecutionsRequestBuilders.findByLatestFlowExecution().flowIdParam(flowId).
addReqParam("count", count, Integer.class).addParam("tag", tag, String.class).build();
- Response<CollectionResponse<FlowStatus>> response =
+ Response<CollectionResponse<FlowExecution>> response =
_restClient.get().sendRequest(findRequest).getResponse();
- List<FlowStatus> flowStatusList = response.getEntity().getElements();
+ List<FlowExecution> flowExecutionList = response.getEntity().getElements();
- if (flowStatusList.isEmpty()) {
+ if (flowExecutionList.isEmpty()) {
return null;
} else {
- return flowStatusList;
+ return flowExecutionList;
}
}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
index 3da1547..def8a5a 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
@@ -44,7 +44,9 @@ import com.linkedin.restli.common.EmptyRecord;
/**
* Flow status client for REST flow status server
+ * @deprecated Use {@link FlowExecutionClient}
*/
+@Deprecated
public class FlowStatusClient implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(FlowStatusClient.class);
@@ -57,7 +59,7 @@ public class FlowStatusClient implements Closeable {
* @param serverUri address and port of the REST server
*/
public FlowStatusClient(String serverUri) {
- LOG.debug("FlowConfigClient with serverUri " + serverUri);
+ LOG.debug("FlowStatusClient with serverUri " + serverUri);
_httpClientFactory = Optional.of(new HttpClientFactory());
Client r2Client = new TransportClientAdapter(_httpClientFactory.get().getClient(Collections.<String, String>emptyMap()));
@@ -71,7 +73,7 @@ public class FlowStatusClient implements Closeable {
* @param restClient restClient to send restli request
*/
public FlowStatusClient(RestClient restClient) {
- LOG.debug("FlowConfigClient with restClient " + restClient);
+ LOG.debug("FlowStatusClient with restClient " + restClient);
_httpClientFactory = Optional.absent();
_restClient = Optional.of(restClient);
@@ -91,7 +93,7 @@ public class FlowStatusClient implements Closeable {
*/
public FlowStatus getFlowStatus(FlowStatusId flowStatusId)
throws RemoteInvocationException {
- LOG.debug("getFlowConfig with groupName " + flowStatusId.getFlowGroup() + " flowName " +
+ LOG.debug("getFlowStatus with groupName " + flowStatusId.getFlowGroup() + " flowName " +
flowStatusId.getFlowName());
GetRequest<FlowStatus> getRequest = _flowstatusesRequestBuilders.get()
@@ -110,7 +112,7 @@ public class FlowStatusClient implements Closeable {
*/
public FlowStatus getLatestFlowStatus(FlowId flowId)
throws RemoteInvocationException {
- LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " +
+ LOG.debug("getFlowStatus with groupName " + flowId.getFlowGroup() + " flowName " +
flowId.getFlowName());
FindRequest<FlowStatus> findRequest = _flowstatusesRequestBuilders.findByLatestFlowStatus().flowIdParam(flowId).build();
@@ -129,7 +131,7 @@ public class FlowStatusClient implements Closeable {
}
/**
- * Get the latest flow status
+ * Get the latest k flow statuses
* @param flowId identifier of flow status to get
* @return a list of {@link FlowStatus}es corresponding to the latest <code>count</code> executions, containing only
* jobStatuses that match the given tag.
@@ -137,7 +139,7 @@ public class FlowStatusClient implements Closeable {
*/
public List<FlowStatus> getLatestFlowStatus(FlowId flowId, Integer count, String tag)
throws RemoteInvocationException {
- LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " +
+ LOG.debug("getFlowStatus with groupName " + flowId.getFlowGroup() + " flowName " +
flowId.getFlowName() + " count " + Integer.toString(count));
FindRequest<FlowStatus> findRequest = _flowstatusesRequestBuilders.findByLatestFlowStatus().flowIdParam(flowId).
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
similarity index 68%
copy from gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
copy to gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
index 767efa4..960415d 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
@@ -29,7 +29,9 @@ import com.google.inject.Inject;
import com.linkedin.data.template.SetMode;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.HttpStatus;
import com.linkedin.restli.server.PagingContext;
+import com.linkedin.restli.server.UpdateResponse;
import com.linkedin.restli.server.annotations.Context;
import com.linkedin.restli.server.annotations.Finder;
import com.linkedin.restli.server.annotations.Optional;
@@ -42,64 +44,84 @@ import org.apache.gobblin.service.monitoring.JobStatusRetriever;
/**
- * Resource for handling flow status requests
+ * Resource for handling flow execution requests
*/
-@RestLiCollection(name = "flowstatuses", namespace = "org.apache.gobblin.service", keyName = "id")
-public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId, EmptyRecord, FlowStatus> {
- private static final Logger LOG = LoggerFactory.getLogger(FlowStatusResource.class);
+@RestLiCollection(name = "flowexecutions", namespace = "org.apache.gobblin.service", keyName = "id")
+public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatusId, EmptyRecord, FlowExecution> {
+ private static final Logger LOG = LoggerFactory.getLogger(FlowExecutionResource.class);
public static final String FLOW_STATUS_GENERATOR_INJECT_NAME = "FlowStatusGenerator";
public static final String MESSAGE_SEPARATOR = ", ";
@Inject @javax.inject.Inject @javax.inject.Named(FLOW_STATUS_GENERATOR_INJECT_NAME)
FlowStatusGenerator _flowStatusGenerator;
- public FlowStatusResource() {}
+ public FlowExecutionResource() {}
/**
- * Retrieve the FlowStatus with the given key
- * @param key flow status id key containing group name and flow name
- * @return {@link FlowStatus} with flow status for the latest execution of the flow
+ * Retrieve the FlowExecution with the given key
+ * @param key {@link FlowStatusId} of flow to get
+ * @return corresponding {@link FlowExecution}
*/
@Override
- public FlowStatus get(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
+ public FlowExecution get(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
+ // this returns null to raise a 404 error if flowStatus is null
+ return convertFlowStatus(getFlowStatusFromGenerator(key, this._flowStatusGenerator));
+ }
+
+ @Finder("latestFlowExecution")
+ public List<FlowExecution> getLatestFlowExecution(@Context PagingContext context,
+ @QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag) {
+ List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = getLatestFlowStatusesFromGenerator(flowId, count, tag, this._flowStatusGenerator);
+
+ if (flowStatuses != null) {
+ return flowStatuses.stream().map(FlowExecutionResource::convertFlowStatus).collect(Collectors.toList());
+ }
+
+ // will return 404 status code
+ return null;
+ }
+
+ /**
+ * Kill the FlowExecution with the given key
+ * @param key {@link FlowStatusId} of flow to kill
+ * @return {@link UpdateResponse}
+ */
+ @Override
+ public UpdateResponse delete(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
+ String flowGroup = key.getKey().getFlowGroup();
+ String flowName = key.getKey().getFlowName();
+ Long flowExecutionId = key.getKey().getFlowExecutionId();
+ _flowStatusGenerator.killFlow(flowGroup, flowName, flowExecutionId);
+ return new UpdateResponse(HttpStatus.S_200_OK);
+ }
+
+ public static org.apache.gobblin.service.monitoring.FlowStatus getFlowStatusFromGenerator(ComplexResourceKey<FlowStatusId, EmptyRecord> key,
+ FlowStatusGenerator flowStatusGenerator) {
String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
long flowExecutionId = key.getKey().getFlowExecutionId();
LOG.info("Get called with flowGroup " + flowGroup + " flowName " + flowName + " flowExecutionId " + flowExecutionId);
- org.apache.gobblin.service.monitoring.FlowStatus flowStatus =
- _flowStatusGenerator.getFlowStatus(flowName, flowGroup, flowExecutionId, null);
-
- // this returns null to raise a 404 error if flowStatus is null
- return convertFlowStatus(flowStatus);
+ return flowStatusGenerator.getFlowStatus(flowName, flowGroup, flowExecutionId, null);
}
- @Finder("latestFlowStatus")
- public List<FlowStatus> getLatestFlowStatus(@Context PagingContext context,
- @QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag) {
+ public static List<org.apache.gobblin.service.monitoring.FlowStatus> getLatestFlowStatusesFromGenerator(FlowId flowId,
+ Integer count, String tag, FlowStatusGenerator flowStatusGenerator) {
if (count == null) {
count = 1;
}
- LOG.info("getLatestFlowStatus called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + count);
-
- List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses =
- _flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag);
+ LOG.info("get latest called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + count);
- if (flowStatuses != null) {
- return flowStatuses.stream().map(this::convertFlowStatus).collect(Collectors.toList());
- }
-
- // will return 404 status code
- return null;
+ return flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag);
}
/**
- * Forms a {@link org.apache.gobblin.service.FlowStatus} from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
+ * Forms a {@link FlowExecution} from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
* @param monitoringFlowStatus
- * @return a {@link org.apache.gobblin.service.FlowStatus} converted from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
+ * @return a {@link FlowExecution} converted from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
*/
- private FlowStatus convertFlowStatus(org.apache.gobblin.service.monitoring.FlowStatus monitoringFlowStatus) {
+ public static FlowExecution convertFlowStatus(org.apache.gobblin.service.monitoring.FlowStatus monitoringFlowStatus) {
if (monitoringFlowStatus == null) {
return null;
}
@@ -151,7 +173,7 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
flowMessagesStringBuffer.substring(0, flowMessagesStringBuffer.length() -
MESSAGE_SEPARATOR.length()) : StringUtils.EMPTY;
- return new FlowStatus()
+ return new FlowExecution()
.setId(new FlowStatusId().setFlowGroup(flowId.getFlowGroup()).setFlowName(flowId.getFlowName())
.setFlowExecutionId(monitoringFlowStatus.getFlowExecutionId()))
.setExecutionStatistics(new FlowStatistics().setExecutionStartTime(getFlowStartTime(monitoringFlowStatus))
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index 767efa4..6047b24 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -17,16 +17,13 @@
package org.apache.gobblin.service;
-import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
-import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
-import com.linkedin.data.template.SetMode;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.server.PagingContext;
@@ -38,7 +35,6 @@ import com.linkedin.restli.server.annotations.RestLiCollection;
import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
-import org.apache.gobblin.service.monitoring.JobStatusRetriever;
/**
@@ -46,7 +42,6 @@ import org.apache.gobblin.service.monitoring.JobStatusRetriever;
*/
@RestLiCollection(name = "flowstatuses", namespace = "org.apache.gobblin.service", keyName = "id")
public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId, EmptyRecord, FlowStatus> {
- private static final Logger LOG = LoggerFactory.getLogger(FlowStatusResource.class);
public static final String FLOW_STATUS_GENERATOR_INJECT_NAME = "FlowStatusGenerator";
public static final String MESSAGE_SEPARATOR = ", ";
@@ -62,29 +57,14 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
*/
@Override
public FlowStatus get(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
- String flowGroup = key.getKey().getFlowGroup();
- String flowName = key.getKey().getFlowName();
- long flowExecutionId = key.getKey().getFlowExecutionId();
-
- LOG.info("Get called with flowGroup " + flowGroup + " flowName " + flowName + " flowExecutionId " + flowExecutionId);
-
- org.apache.gobblin.service.monitoring.FlowStatus flowStatus =
- _flowStatusGenerator.getFlowStatus(flowName, flowGroup, flowExecutionId, null);
-
// this returns null to raise a 404 error if flowStatus is null
- return convertFlowStatus(flowStatus);
+ return convertFlowStatus(FlowExecutionResource.getFlowStatusFromGenerator(key, this._flowStatusGenerator));
}
@Finder("latestFlowStatus")
public List<FlowStatus> getLatestFlowStatus(@Context PagingContext context,
@QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag) {
- if (count == null) {
- count = 1;
- }
- LOG.info("getLatestFlowStatus called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + count);
-
- List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses =
- _flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag);
+ List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = FlowExecutionResource.getLatestFlowStatusesFromGenerator(flowId, count, tag, this._flowStatusGenerator);
if (flowStatuses != null) {
return flowStatuses.stream().map(this::convertFlowStatus).collect(Collectors.toList());
@@ -96,77 +76,18 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
/**
* Forms a {@link org.apache.gobblin.service.FlowStatus} from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
+ * Logic is used from {@link FlowExecutionResource} since this class is deprecated
* @param monitoringFlowStatus
* @return a {@link org.apache.gobblin.service.FlowStatus} converted from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
*/
private FlowStatus convertFlowStatus(org.apache.gobblin.service.monitoring.FlowStatus monitoringFlowStatus) {
- if (monitoringFlowStatus == null) {
- return null;
- }
-
- Iterator<org.apache.gobblin.service.monitoring.JobStatus> jobStatusIter = monitoringFlowStatus.getJobStatusIterator();
- JobStatusArray jobStatusArray = new JobStatusArray();
- FlowId flowId = new FlowId().setFlowName(monitoringFlowStatus.getFlowName())
- .setFlowGroup(monitoringFlowStatus.getFlowGroup());
-
- long flowEndTime = 0L;
- ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;
-
- StringBuffer flowMessagesStringBuffer = new StringBuffer();
-
- while (jobStatusIter.hasNext()) {
- org.apache.gobblin.service.monitoring.JobStatus queriedJobStatus = jobStatusIter.next();
-
- // Check if this is the flow status instead of a single job status
- if (JobStatusRetriever.isFlowStatus(queriedJobStatus)) {
- flowEndTime = queriedJobStatus.getEndTime();
- flowExecutionStatus = ExecutionStatus.valueOf(queriedJobStatus.getEventName());
- continue;
- }
-
- JobStatus jobStatus = new JobStatus();
-
- jobStatus.setFlowId(flowId)
- .setJobId(new JobId().setJobName(queriedJobStatus.getJobName())
- .setJobGroup(queriedJobStatus.getJobGroup()))
- .setJobTag(queriedJobStatus.getJobTag(), SetMode.IGNORE_NULL)
- .setExecutionStatistics(new JobStatistics()
- .setExecutionStartTime(queriedJobStatus.getStartTime())
- .setExecutionEndTime(queriedJobStatus.getEndTime())
- .setProcessedCount(queriedJobStatus.getProcessedCount()))
- .setExecutionStatus(ExecutionStatus.valueOf(queriedJobStatus.getEventName()))
- .setMessage(queriedJobStatus.getMessage())
- .setJobState(new JobState().setLowWatermark(queriedJobStatus.getLowWatermark()).
- setHighWatermark(queriedJobStatus.getHighWatermark()));
-
- jobStatusArray.add(jobStatus);
-
- if (!queriedJobStatus.getMessage().isEmpty()) {
- flowMessagesStringBuffer.append(queriedJobStatus.getMessage());
- flowMessagesStringBuffer.append(MESSAGE_SEPARATOR);
- }
- }
-
- String flowMessages = flowMessagesStringBuffer.length() > 0 ?
- flowMessagesStringBuffer.substring(0, flowMessagesStringBuffer.length() -
- MESSAGE_SEPARATOR.length()) : StringUtils.EMPTY;
-
+ FlowExecution flowExecution = FlowExecutionResource.convertFlowStatus(monitoringFlowStatus);
return new FlowStatus()
- .setId(new FlowStatusId().setFlowGroup(flowId.getFlowGroup()).setFlowName(flowId.getFlowName())
- .setFlowExecutionId(monitoringFlowStatus.getFlowExecutionId()))
- .setExecutionStatistics(new FlowStatistics().setExecutionStartTime(getFlowStartTime(monitoringFlowStatus))
- .setExecutionEndTime(flowEndTime))
- .setMessage(flowMessages)
- .setExecutionStatus(flowExecutionStatus)
- .setJobStatuses(jobStatusArray);
- }
-
- /**
- * Return the flow start time given a {@link org.apache.gobblin.service.monitoring.FlowStatus}. Flow execution ID is
- * assumed to be the flow start time.
- */
- private static long getFlowStartTime(org.apache.gobblin.service.monitoring.FlowStatus flowStatus) {
- return flowStatus.getFlowExecutionId();
+ .setId(flowExecution.getId())
+ .setExecutionStatistics(flowExecution.getExecutionStatistics())
+ .setMessage(flowExecution.getMessage())
+ .setExecutionStatus(flowExecution.getExecutionStatus())
+ .setJobStatuses(flowExecution.getJobStatuses());
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index c757485..a7a451d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -23,6 +23,7 @@ import java.util.stream.Collectors;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
import lombok.Builder;
@@ -38,6 +39,7 @@ public class FlowStatusGenerator {
public static final List<String> FINISHED_STATUSES = Lists.newArrayList("FAILED", "COMPLETE", "CANCELLED");
private final JobStatusRetriever jobStatusRetriever;
+ private final EventBus eventBus;
/**
* Get the flow statuses of last <code>count</code> (or fewer) executions
@@ -120,4 +122,11 @@ public class FlowStatusGenerator {
String status = jobStatus.getEventName().toUpperCase();
return !FINISHED_STATUSES.contains(status);
}
+
+ /**
+ * Send kill request for the given flow
+ */
+ public void killFlow(String flowGroup, String flowName, Long flowExecutionId) {
+ this.eventBus.post(new KillFlowEvent(flowGroup, flowName, flowExecutionId));
+ }
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/KillFlowEvent.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/KillFlowEvent.java
new file mode 100644
index 0000000..ae0458e
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/KillFlowEvent.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+
+@AllArgsConstructor
+@Data
+public class KillFlowEvent {
+ private String flowGroup;
+ private String flowName;
+ private Long flowExecutionId;
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 0818b1d..5cd3bc2 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -118,6 +118,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
private volatile boolean stopInProgress = false;
// An EventBus used for communications between services running in the ApplicationMaster
+ @Getter
protected final EventBus eventBus = new EventBus(GobblinServiceManager.class.getSimpleName());
protected final FileSystem fs;
@@ -490,6 +491,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
if (!this.helixManager.isPresent() || this.helixManager.get().isLeader()){
if (this.isDagManagerEnabled) {
this.dagManager.setActive(true);
+ this.eventBus.register(this.dagManager);
}
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 1dff94a..67b3e38 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -43,6 +43,7 @@ import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -75,6 +76,7 @@ import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -274,11 +276,28 @@ public class DagManager extends AbstractIdleService {
log.info("Found {} flows to cancel.", flowExecutionIds.size());
for (long flowExecutionId : flowExecutionIds) {
- int queueId = DagManagerUtils.getDagQueueId(flowExecutionId, this.numThreads);
- String dagId = DagManagerUtils.generateDagId(flowGroup, flowName, flowExecutionId);
- if (!this.cancelQueue[queueId].offer(dagId)) {
- throw new IOException("Could not add dag " + dagId + " to cancellation queue.");
- }
+ killFlow(flowGroup, flowName, flowExecutionId);
+ }
+ }
+
+ /**
+ * Add the specified flow to {@link DagManager#cancelQueue}
+ */
+ private void killFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException {
+ int queueId = DagManagerUtils.getDagQueueId(flowExecutionId, this.numThreads);
+ String dagId = DagManagerUtils.generateDagId(flowGroup, flowName, flowExecutionId);
+ if (!this.cancelQueue[queueId].offer(dagId)) {
+ throw new IOException("Could not add dag " + dagId + " to cancellation queue.");
+ }
+ }
+
+ @Subscribe
+ public void handleKillFlowEvent(KillFlowEvent killFlowEvent) {
+ log.info("Received kill request for flow ({}, {}, {})", killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(), killFlowEvent.getFlowExecutionId());
+ try {
+ killFlow(killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(), killFlowEvent.getFlowExecutionId());
+ } catch (IOException e) {
+ log.warn("Failed to kill flow", e);
}
}