You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/01/30 17:51:01 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1017] Deprecate FlowStatus in favor of FlowExecution and add endpoint to kill flows

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 978e339  [GOBBLIN-1017] Deprecate FlowStatus in favor of FlowExecution and add endpoint to kill flows
978e339 is described below

commit 978e339a5ecc17fb939bacae48e6e65a5198154b
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Thu Jan 30 09:50:54 2020 -0800

    [GOBBLIN-1017] Deprecate FlowStatus in favor of FlowExecution and add endpoint to kill flows
    
    Closes #2862 from jack-moseley/flow-execution
---
 ...che.gobblin.service.flowconfigsV2.restspec.json |  2 +-
 ...he.gobblin.service.flowexecutions.restspec.json | 40 +++++++++
 .../{FlowStatus.pdsc => FlowExecution.pdsc}        |  4 +-
 .../org/apache/gobblin/service/FlowStatus.pdsc     |  1 +
 ...che.gobblin.service.flowconfigsV2.snapshot.json |  2 +-
 ...e.gobblin.service.flowexecutions.snapshot.json} | 39 +++++----
 ...ache.gobblin.service.flowstatuses.snapshot.json | 19 +++--
 ...wStatusClient.java => FlowExecutionClient.java} | 84 +++++++++----------
 .../apache/gobblin/service/FlowStatusClient.java   | 14 ++--
 ...tusResource.java => FlowExecutionResource.java} | 84 ++++++++++++-------
 .../apache/gobblin/service/FlowStatusResource.java | 97 ++--------------------
 .../service/monitoring/FlowStatusGenerator.java    |  9 ++
 .../gobblin/service/monitoring/KillFlowEvent.java  | 30 +++++++
 .../modules/core/GobblinServiceManager.java        |  2 +
 .../service/modules/orchestration/DagManager.java  | 29 +++++--
 15 files changed, 253 insertions(+), 203 deletions(-)

diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
index 0127c42..ce0714c 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
@@ -10,7 +10,7 @@
       "type" : "org.apache.gobblin.service.FlowId",
       "params" : "org.apache.gobblin.service.FlowStatusId"
     },
-    "supports" : [ "create", "delete", "get", "update", "partial_update" ],
+    "supports" : [ "create", "delete", "get", "partial_update", "update" ],
     "methods" : [ {
       "annotations" : {
         "returnEntity" : { }
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
new file mode 100644
index 0000000..210e2b0
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
@@ -0,0 +1,40 @@
+{
+  "name" : "flowexecutions",
+  "namespace" : "org.apache.gobblin.service",
+  "path" : "/flowexecutions",
+  "schema" : "org.apache.gobblin.service.FlowExecution",
+  "doc" : "Resource for handling flow execution requests\n\ngenerated from: org.apache.gobblin.service.FlowExecutionResource",
+  "collection" : {
+    "identifier" : {
+      "name" : "id",
+      "type" : "org.apache.gobblin.service.FlowStatusId",
+      "params" : "com.linkedin.restli.common.EmptyRecord"
+    },
+    "supports" : [ "delete", "get" ],
+    "methods" : [ {
+      "method" : "get",
+      "doc" : "Retrieve the FlowExecution with the given key"
+    }, {
+      "method" : "delete",
+      "doc" : "Kill the FlowExecution with the given key"
+    } ],
+    "finders" : [ {
+      "name" : "latestFlowExecution",
+      "parameters" : [ {
+        "name" : "flowId",
+        "type" : "org.apache.gobblin.service.FlowId"
+      }, {
+        "name" : "count",
+        "type" : "int",
+        "optional" : true
+      }, {
+        "name" : "tag",
+        "type" : "string",
+        "optional" : true
+      } ]
+    } ],
+    "entity" : {
+      "path" : "/flowexecutions/{id}"
+    }
+  }
+}
\ No newline at end of file
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowStatus.pdsc b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowExecution.pdsc
similarity index 90%
copy from gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowStatus.pdsc
copy to gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowExecution.pdsc
index cd2ed2b..2736366 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowStatus.pdsc
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowExecution.pdsc
@@ -1,8 +1,8 @@
 {
   "type" : "record",
-  "name" : "FlowStatus",
+  "name" : "FlowExecution",
   "namespace" : "org.apache.gobblin.service",
-  "doc" : "Status of a flow",
+  "doc" : "Represents an execution of a flow",
   "fields" : [
     {
       "name" : "id",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowStatus.pdsc b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowStatus.pdsc
index cd2ed2b..165100f 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowStatus.pdsc
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowStatus.pdsc
@@ -3,6 +3,7 @@
   "name" : "FlowStatus",
   "namespace" : "org.apache.gobblin.service",
   "doc" : "Status of a flow",
+  "deprecated" : "Use FlowExecution instead",
   "fields" : [
     {
       "name" : "id",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
index 359e6bc..50064ca 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
@@ -108,7 +108,7 @@
         "type" : "org.apache.gobblin.service.FlowId",
         "params" : "org.apache.gobblin.service.FlowStatusId"
       },
-      "supports" : [ "create", "delete", "get", "update", "partial_update" ],
+      "supports" : [ "create", "delete", "get", "partial_update", "update" ],
       "methods" : [ {
         "annotations" : {
           "returnEntity" : { }
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
similarity index 86%
copy from gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
copy to gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
index 8b60aa1..895d706 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
@@ -38,14 +38,14 @@
     "doc" : "Execution status for a flow or job",
     "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
     "symbolDocs" : {
-      "COMPILED":"Flow compiled to jobs.",
-      "PENDING":"Flow or job is in pending state.",
-      "PENDING_RETRY":"Flow or job is pending retry.",
-      "ORCHESTRATED":"Job(s) orchestrated to spec executors.",
-      "RUNNING": "Flow or job is currently executing",
-      "COMPLETE":"Flow or job completed execution",
-      "FAILED":"Flow or job failed",
-      "CANCELLED":"Flow cancelled."
+      "COMPILED" : "Flow compiled to jobs.",
+      "PENDING" : "Flow or job is in pending state.",
+      "PENDING_RETRY" : "Flow or job is pending retry.",
+      "ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
+      "RUNNING" : "Flow or job is currently executing",
+      "COMPLETE" : "Flow or job completed execution",
+      "FAILED" : "Flow or job failed",
+      "CANCELLED" : "Flow cancelled."
     }
   }, {
     "type" : "record",
@@ -168,9 +168,9 @@
     } ]
   }, {
     "type" : "record",
-    "name" : "FlowStatus",
+    "name" : "FlowExecution",
     "namespace" : "org.apache.gobblin.service",
-    "doc" : "Status of a flow",
+    "doc" : "Represents an execution of a flow",
     "fields" : [ {
       "name" : "id",
       "type" : "FlowStatusId",
@@ -206,24 +206,27 @@
     }
   } ],
   "schema" : {
-    "name" : "flowstatuses",
+    "name" : "flowexecutions",
     "namespace" : "org.apache.gobblin.service",
-    "path" : "/flowstatuses",
-    "schema" : "org.apache.gobblin.service.FlowStatus",
-    "doc" : "Resource for handling flow status requests\n\ngenerated from: org.apache.gobblin.service.FlowStatusResource",
+    "path" : "/flowexecutions",
+    "schema" : "org.apache.gobblin.service.FlowExecution",
+    "doc" : "Resource for handling flow execution requests\n\ngenerated from: org.apache.gobblin.service.FlowExecutionResource",
     "collection" : {
       "identifier" : {
         "name" : "id",
         "type" : "org.apache.gobblin.service.FlowStatusId",
         "params" : "com.linkedin.restli.common.EmptyRecord"
       },
-      "supports" : [ "get" ],
+      "supports" : [ "delete", "get" ],
       "methods" : [ {
         "method" : "get",
-        "doc" : "Retrieve the FlowStatus with the given key"
+        "doc" : "Retrieve the FlowExecution with the given key"
+      }, {
+        "method" : "delete",
+        "doc" : "Kill the FlowExecution with the given key"
       } ],
       "finders" : [ {
-        "name" : "latestFlowStatus",
+        "name" : "latestFlowExecution",
         "parameters" : [ {
           "name" : "flowId",
           "type" : "org.apache.gobblin.service.FlowId"
@@ -238,7 +241,7 @@
         } ]
       } ],
       "entity" : {
-        "path" : "/flowstatuses/{id}"
+        "path" : "/flowexecutions/{id}"
       }
     }
   }
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
index 8b60aa1..e3b171f 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
@@ -38,14 +38,14 @@
     "doc" : "Execution status for a flow or job",
     "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
     "symbolDocs" : {
-      "COMPILED":"Flow compiled to jobs.",
-      "PENDING":"Flow or job is in pending state.",
-      "PENDING_RETRY":"Flow or job is pending retry.",
-      "ORCHESTRATED":"Job(s) orchestrated to spec executors.",
-      "RUNNING": "Flow or job is currently executing",
-      "COMPLETE":"Flow or job completed execution",
-      "FAILED":"Flow or job failed",
-      "CANCELLED":"Flow cancelled."
+      "COMPILED" : "Flow compiled to jobs.",
+      "PENDING" : "Flow or job is in pending state.",
+      "PENDING_RETRY" : "Flow or job is pending retry.",
+      "ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
+      "RUNNING" : "Flow or job is currently executing",
+      "COMPLETE" : "Flow or job completed execution",
+      "FAILED" : "Flow or job failed",
+      "CANCELLED" : "Flow cancelled."
     }
   }, {
     "type" : "record",
@@ -194,7 +194,8 @@
         "items" : "JobStatus"
       },
       "doc" : "Status of jobs belonging to the flow"
-    } ]
+    } ],
+    "deprecated" : "Use FlowExecution instead"
   }, {
     "type" : "record",
     "name" : "EmptyRecord",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
similarity index 57%
copy from gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
copy to gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
index 3da1547..5cbd0f0 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
@@ -43,115 +43,115 @@ import com.linkedin.restli.common.EmptyRecord;
 
 
 /**
- * Flow status client for REST flow status server
+ * Flow execution client for REST flow execution server
  */
-public class FlowStatusClient implements Closeable {
-  private static final Logger LOG = LoggerFactory.getLogger(FlowStatusClient.class);
+public class FlowExecutionClient implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(FlowExecutionClient.class);
 
   private Optional<HttpClientFactory> _httpClientFactory;
   private Optional<RestClient> _restClient;
-  private final FlowstatusesRequestBuilders _flowstatusesRequestBuilders;
+  private final FlowexecutionsRequestBuilders _flowexecutionsRequestBuilders;
 
   /**
-   * Construct a {@link FlowStatusClient} to communicate with http flow status server at URI serverUri
+   * Construct a {@link FlowExecutionClient} to communicate with http flow execution server at URI serverUri
    * @param serverUri address and port of the REST server
    */
-  public FlowStatusClient(String serverUri) {
-    LOG.debug("FlowConfigClient with serverUri " + serverUri);
+  public FlowExecutionClient(String serverUri) {
+    LOG.debug("FlowExecutionClient with serverUri " + serverUri);
 
     _httpClientFactory = Optional.of(new HttpClientFactory());
     Client r2Client = new TransportClientAdapter(_httpClientFactory.get().getClient(Collections.<String, String>emptyMap()));
     _restClient = Optional.of(new RestClient(r2Client, serverUri));
 
-    _flowstatusesRequestBuilders = createRequestBuilders();
+    _flowexecutionsRequestBuilders = createRequestBuilders();
   }
 
   /**
-   * Construct a {@link FlowStatusClient} to communicate with http flow status server at URI serverUri
+   * Construct a {@link FlowExecutionClient} to communicate with http flow execution server at URI serverUri
    * @param restClient restClient to send restli request
    */
-  public FlowStatusClient(RestClient restClient) {
-    LOG.debug("FlowConfigClient with restClient " + restClient);
+  public FlowExecutionClient(RestClient restClient) {
+    LOG.debug("FlowExecutionClient with restClient " + restClient);
 
     _httpClientFactory = Optional.absent();
     _restClient = Optional.of(restClient);
 
-    _flowstatusesRequestBuilders = createRequestBuilders();
+    _flowexecutionsRequestBuilders = createRequestBuilders();
   }
 
-  protected FlowstatusesRequestBuilders createRequestBuilders() {
-    return new FlowstatusesRequestBuilders();
+  protected FlowexecutionsRequestBuilders createRequestBuilders() {
+    return new FlowexecutionsRequestBuilders();
   }
 
   /**
-   * Get a flow status
-   * @param flowStatusId identifier of flow status to get
-   * @return a {@link FlowStatus} with the flow status
+   * Get a flow execution
+   * @param flowStatusId identifier of flow execution to get
+   * @return a {@link FlowExecution} with the flow execution
    * @throws RemoteInvocationException
    */
-  public FlowStatus getFlowStatus(FlowStatusId flowStatusId)
+  public FlowExecution getFlowExecution(FlowStatusId flowStatusId)
       throws RemoteInvocationException {
-    LOG.debug("getFlowConfig with groupName " + flowStatusId.getFlowGroup() + " flowName " +
+    LOG.debug("getFlowExecution with groupName " + flowStatusId.getFlowGroup() + " flowName " +
         flowStatusId.getFlowName());
 
-    GetRequest<FlowStatus> getRequest = _flowstatusesRequestBuilders.get()
+    GetRequest<FlowExecution> getRequest = _flowexecutionsRequestBuilders.get()
         .id(new ComplexResourceKey<>(flowStatusId, new EmptyRecord())).build();
 
-    Response<FlowStatus> response =
+    Response<FlowExecution> response =
         _restClient.get().sendRequest(getRequest).getResponse();
     return response.getEntity();
   }
 
   /**
-   * Get the latest flow status
-   * @param flowId identifier of flow status to get
-   * @return a {@link FlowStatus} with the flow status
+   * Get the latest flow execution
+   * @param flowId identifier of flow execution to get
+   * @return a {@link FlowExecution}
    * @throws RemoteInvocationException
    */
-  public FlowStatus getLatestFlowStatus(FlowId flowId)
+  public FlowExecution getLatestFlowExecution(FlowId flowId)
       throws RemoteInvocationException {
-    LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " +
+    LOG.debug("getFlowExecution with groupName " + flowId.getFlowGroup() + " flowName " +
         flowId.getFlowName());
 
-    FindRequest<FlowStatus> findRequest = _flowstatusesRequestBuilders.findByLatestFlowStatus().flowIdParam(flowId).build();
+    FindRequest<FlowExecution> findRequest = _flowexecutionsRequestBuilders.findByLatestFlowExecution().flowIdParam(flowId).build();
 
-    Response<CollectionResponse<FlowStatus>> response =
+    Response<CollectionResponse<FlowExecution>> response =
         _restClient.get().sendRequest(findRequest).getResponse();
 
-    List<FlowStatus> flowStatusList = response.getEntity().getElements();
+    List<FlowExecution> flowExecutionList = response.getEntity().getElements();
 
-    if (flowStatusList.isEmpty()) {
+    if (flowExecutionList.isEmpty()) {
       return null;
     } else {
-      Preconditions.checkArgument(flowStatusList.size() == 1);
-      return flowStatusList.get(0);
+      Preconditions.checkArgument(flowExecutionList.size() == 1);
+      return flowExecutionList.get(0);
     }
   }
 
   /**
-   * Get the latest flow status
-   * @param flowId identifier of flow status to get
-   * @return a list of {@link FlowStatus}es corresponding to the latest <code>count</code> executions, containing only
+   * Get the latest k flow executions
+   * @param flowId identifier of flow execution to get
+   * @return a list of {@link FlowExecution}es corresponding to the latest <code>count</code> executions, containing only
    * jobStatuses that match the given tag.
    * @throws RemoteInvocationException
    */
-  public List<FlowStatus> getLatestFlowStatus(FlowId flowId, Integer count, String tag)
+  public List<FlowExecution> getLatestFlowExecution(FlowId flowId, Integer count, String tag)
       throws RemoteInvocationException {
-    LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " +
+    LOG.debug("getFlowExecution with groupName " + flowId.getFlowGroup() + " flowName " +
         flowId.getFlowName() + " count " + Integer.toString(count));
 
-    FindRequest<FlowStatus> findRequest = _flowstatusesRequestBuilders.findByLatestFlowStatus().flowIdParam(flowId).
+    FindRequest<FlowExecution> findRequest = _flowexecutionsRequestBuilders.findByLatestFlowExecution().flowIdParam(flowId).
         addReqParam("count", count, Integer.class).addParam("tag", tag, String.class).build();
 
-    Response<CollectionResponse<FlowStatus>> response =
+    Response<CollectionResponse<FlowExecution>> response =
         _restClient.get().sendRequest(findRequest).getResponse();
 
-    List<FlowStatus> flowStatusList = response.getEntity().getElements();
+    List<FlowExecution> flowExecutionList = response.getEntity().getElements();
 
-    if (flowStatusList.isEmpty()) {
+    if (flowExecutionList.isEmpty()) {
       return null;
     } else {
-      return flowStatusList;
+      return flowExecutionList;
     }
   }
 
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
index 3da1547..def8a5a 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowStatusClient.java
@@ -44,7 +44,9 @@ import com.linkedin.restli.common.EmptyRecord;
 
 /**
  * Flow status client for REST flow status server
+ * @deprecated Use {@link FlowExecutionClient}
  */
+@Deprecated
 public class FlowStatusClient implements Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(FlowStatusClient.class);
 
@@ -57,7 +59,7 @@ public class FlowStatusClient implements Closeable {
    * @param serverUri address and port of the REST server
    */
   public FlowStatusClient(String serverUri) {
-    LOG.debug("FlowConfigClient with serverUri " + serverUri);
+    LOG.debug("FlowStatusClient with serverUri " + serverUri);
 
     _httpClientFactory = Optional.of(new HttpClientFactory());
     Client r2Client = new TransportClientAdapter(_httpClientFactory.get().getClient(Collections.<String, String>emptyMap()));
@@ -71,7 +73,7 @@ public class FlowStatusClient implements Closeable {
    * @param restClient restClient to send restli request
    */
   public FlowStatusClient(RestClient restClient) {
-    LOG.debug("FlowConfigClient with restClient " + restClient);
+    LOG.debug("FlowStatusClient with restClient " + restClient);
 
     _httpClientFactory = Optional.absent();
     _restClient = Optional.of(restClient);
@@ -91,7 +93,7 @@ public class FlowStatusClient implements Closeable {
    */
   public FlowStatus getFlowStatus(FlowStatusId flowStatusId)
       throws RemoteInvocationException {
-    LOG.debug("getFlowConfig with groupName " + flowStatusId.getFlowGroup() + " flowName " +
+    LOG.debug("getFlowStatus with groupName " + flowStatusId.getFlowGroup() + " flowName " +
         flowStatusId.getFlowName());
 
     GetRequest<FlowStatus> getRequest = _flowstatusesRequestBuilders.get()
@@ -110,7 +112,7 @@ public class FlowStatusClient implements Closeable {
    */
   public FlowStatus getLatestFlowStatus(FlowId flowId)
       throws RemoteInvocationException {
-    LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " +
+    LOG.debug("getFlowStatus with groupName " + flowId.getFlowGroup() + " flowName " +
         flowId.getFlowName());
 
     FindRequest<FlowStatus> findRequest = _flowstatusesRequestBuilders.findByLatestFlowStatus().flowIdParam(flowId).build();
@@ -129,7 +131,7 @@ public class FlowStatusClient implements Closeable {
   }
 
   /**
-   * Get the latest flow status
+   * Get the latest k flow statuses
    * @param flowId identifier of flow status to get
    * @return a list of {@link FlowStatus}es corresponding to the latest <code>count</code> executions, containing only
    * jobStatuses that match the given tag.
@@ -137,7 +139,7 @@ public class FlowStatusClient implements Closeable {
    */
   public List<FlowStatus> getLatestFlowStatus(FlowId flowId, Integer count, String tag)
       throws RemoteInvocationException {
-    LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " +
+    LOG.debug("getFlowStatus with groupName " + flowId.getFlowGroup() + " flowName " +
         flowId.getFlowName() + " count " + Integer.toString(count));
 
     FindRequest<FlowStatus> findRequest = _flowstatusesRequestBuilders.findByLatestFlowStatus().flowIdParam(flowId).
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
similarity index 68%
copy from gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
copy to gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
index 767efa4..960415d 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
@@ -29,7 +29,9 @@ import com.google.inject.Inject;
 import com.linkedin.data.template.SetMode;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.HttpStatus;
 import com.linkedin.restli.server.PagingContext;
+import com.linkedin.restli.server.UpdateResponse;
 import com.linkedin.restli.server.annotations.Context;
 import com.linkedin.restli.server.annotations.Finder;
 import com.linkedin.restli.server.annotations.Optional;
@@ -42,64 +44,84 @@ import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 
 
 /**
- * Resource for handling flow status requests
+ * Resource for handling flow execution requests
  */
-@RestLiCollection(name = "flowstatuses", namespace = "org.apache.gobblin.service", keyName = "id")
-public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId, EmptyRecord, FlowStatus> {
-  private static final Logger LOG = LoggerFactory.getLogger(FlowStatusResource.class);
+@RestLiCollection(name = "flowexecutions", namespace = "org.apache.gobblin.service", keyName = "id")
+public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatusId, EmptyRecord, FlowExecution> {
+  private static final Logger LOG = LoggerFactory.getLogger(FlowExecutionResource.class);
   public static final String FLOW_STATUS_GENERATOR_INJECT_NAME = "FlowStatusGenerator";
   public static final String MESSAGE_SEPARATOR = ", ";
 
   @Inject @javax.inject.Inject @javax.inject.Named(FLOW_STATUS_GENERATOR_INJECT_NAME)
   FlowStatusGenerator _flowStatusGenerator;
 
-  public FlowStatusResource() {}
+  public FlowExecutionResource() {}
 
   /**
-   * Retrieve the FlowStatus with the given key
-   * @param key flow status id key containing group name and flow name
-   * @return {@link FlowStatus} with flow status for the latest execution of the flow
+   * Retrieve the FlowExecution with the given key
+   * @param key {@link FlowStatusId} of flow to get
+   * @return corresponding {@link FlowExecution}
    */
   @Override
-  public FlowStatus get(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
+  public FlowExecution get(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
+    // this returns null to raise a 404 error if flowStatus is null
+    return convertFlowStatus(getFlowStatusFromGenerator(key, this._flowStatusGenerator));
+  }
+
+  @Finder("latestFlowExecution")
+  public List<FlowExecution> getLatestFlowExecution(@Context PagingContext context,
+      @QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag) {
+    List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = getLatestFlowStatusesFromGenerator(flowId, count, tag, this._flowStatusGenerator);
+
+    if (flowStatuses != null) {
+      return flowStatuses.stream().map(FlowExecutionResource::convertFlowStatus).collect(Collectors.toList());
+    }
+
+    // will return 404 status code
+    return null;
+  }
+
+  /**
+   * Kill the FlowExecution with the given key
+   * @param key {@link FlowStatusId} of flow to kill
+   * @return {@link UpdateResponse}
+   */
+  @Override
+  public UpdateResponse delete(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
+    String flowGroup = key.getKey().getFlowGroup();
+    String flowName = key.getKey().getFlowName();
+    Long flowExecutionId = key.getKey().getFlowExecutionId();
+    _flowStatusGenerator.killFlow(flowGroup, flowName, flowExecutionId);
+    return new UpdateResponse(HttpStatus.S_200_OK);
+  }
+
+  public static org.apache.gobblin.service.monitoring.FlowStatus getFlowStatusFromGenerator(ComplexResourceKey<FlowStatusId, EmptyRecord> key,
+      FlowStatusGenerator flowStatusGenerator) {
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
     long flowExecutionId = key.getKey().getFlowExecutionId();
 
     LOG.info("Get called with flowGroup " + flowGroup + " flowName " + flowName + " flowExecutionId " + flowExecutionId);
 
-    org.apache.gobblin.service.monitoring.FlowStatus flowStatus =
-        _flowStatusGenerator.getFlowStatus(flowName, flowGroup, flowExecutionId, null);
-
-    // this returns null to raise a 404 error if flowStatus is null
-    return convertFlowStatus(flowStatus);
+    return flowStatusGenerator.getFlowStatus(flowName, flowGroup, flowExecutionId, null);
   }
 
-  @Finder("latestFlowStatus")
-  public List<FlowStatus> getLatestFlowStatus(@Context PagingContext context,
-      @QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag) {
+  public static List<org.apache.gobblin.service.monitoring.FlowStatus> getLatestFlowStatusesFromGenerator(FlowId flowId,
+      Integer count, String tag, FlowStatusGenerator flowStatusGenerator) {
     if (count == null) {
       count = 1;
     }
-    LOG.info("getLatestFlowStatus called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + count);
-
-    List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses =
-        _flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag);
+    LOG.info("get latest called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + count);
 
-    if (flowStatuses != null) {
-      return flowStatuses.stream().map(this::convertFlowStatus).collect(Collectors.toList());
-    }
-
-    // will return 404 status code
-    return null;
+    return flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag);
   }
 
   /**
-   * Forms a {@link org.apache.gobblin.service.FlowStatus} from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
+   * Forms a {@link FlowExecution} from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
    * @param monitoringFlowStatus
-   * @return a {@link org.apache.gobblin.service.FlowStatus} converted from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
+   * @return a {@link FlowExecution} converted from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
    */
-  private FlowStatus convertFlowStatus(org.apache.gobblin.service.monitoring.FlowStatus monitoringFlowStatus) {
+  public static FlowExecution convertFlowStatus(org.apache.gobblin.service.monitoring.FlowStatus monitoringFlowStatus) {
     if (monitoringFlowStatus == null) {
       return null;
     }
@@ -151,7 +173,7 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
         flowMessagesStringBuffer.substring(0, flowMessagesStringBuffer.length() -
             MESSAGE_SEPARATOR.length()) : StringUtils.EMPTY;
 
-    return new FlowStatus()
+    return new FlowExecution()
         .setId(new FlowStatusId().setFlowGroup(flowId.getFlowGroup()).setFlowName(flowId.getFlowName())
             .setFlowExecutionId(monitoringFlowStatus.getFlowExecutionId()))
         .setExecutionStatistics(new FlowStatistics().setExecutionStartTime(getFlowStartTime(monitoringFlowStatus))
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index 767efa4..6047b24 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -17,16 +17,13 @@
 
 package org.apache.gobblin.service;
 
-import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
-import com.linkedin.data.template.SetMode;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.server.PagingContext;
@@ -38,7 +35,6 @@ import com.linkedin.restli.server.annotations.RestLiCollection;
 import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
 
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
-import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 
 
 /**
@@ -46,7 +42,6 @@ import org.apache.gobblin.service.monitoring.JobStatusRetriever;
  */
 @RestLiCollection(name = "flowstatuses", namespace = "org.apache.gobblin.service", keyName = "id")
 public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId, EmptyRecord, FlowStatus> {
-  private static final Logger LOG = LoggerFactory.getLogger(FlowStatusResource.class);
   public static final String FLOW_STATUS_GENERATOR_INJECT_NAME = "FlowStatusGenerator";
   public static final String MESSAGE_SEPARATOR = ", ";
 
@@ -62,29 +57,14 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
    */
   @Override
   public FlowStatus get(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
-    String flowGroup = key.getKey().getFlowGroup();
-    String flowName = key.getKey().getFlowName();
-    long flowExecutionId = key.getKey().getFlowExecutionId();
-
-    LOG.info("Get called with flowGroup " + flowGroup + " flowName " + flowName + " flowExecutionId " + flowExecutionId);
-
-    org.apache.gobblin.service.monitoring.FlowStatus flowStatus =
-        _flowStatusGenerator.getFlowStatus(flowName, flowGroup, flowExecutionId, null);
-
     // this returns null to raise a 404 error if flowStatus is null
-    return convertFlowStatus(flowStatus);
+    return convertFlowStatus(FlowExecutionResource.getFlowStatusFromGenerator(key, this._flowStatusGenerator));
   }
 
   @Finder("latestFlowStatus")
   public List<FlowStatus> getLatestFlowStatus(@Context PagingContext context,
       @QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag) {
-    if (count == null) {
-      count = 1;
-    }
-    LOG.info("getLatestFlowStatus called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + count);
-
-    List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses =
-        _flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag);
+    List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = FlowExecutionResource.getLatestFlowStatusesFromGenerator(flowId, count, tag, this._flowStatusGenerator);
 
     if (flowStatuses != null) {
       return flowStatuses.stream().map(this::convertFlowStatus).collect(Collectors.toList());
@@ -96,77 +76,18 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
 
   /**
    * Forms a {@link org.apache.gobblin.service.FlowStatus} from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
+   * Logic is used from {@link FlowExecutionResource} since this class is deprecated
    * @param monitoringFlowStatus
    * @return a {@link org.apache.gobblin.service.FlowStatus} converted from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
    */
   private FlowStatus convertFlowStatus(org.apache.gobblin.service.monitoring.FlowStatus monitoringFlowStatus) {
-    if (monitoringFlowStatus == null) {
-      return null;
-    }
-
-    Iterator<org.apache.gobblin.service.monitoring.JobStatus> jobStatusIter = monitoringFlowStatus.getJobStatusIterator();
-    JobStatusArray jobStatusArray = new JobStatusArray();
-    FlowId flowId = new FlowId().setFlowName(monitoringFlowStatus.getFlowName())
-        .setFlowGroup(monitoringFlowStatus.getFlowGroup());
-
-    long flowEndTime = 0L;
-    ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;
-
-    StringBuffer flowMessagesStringBuffer = new StringBuffer();
-
-    while (jobStatusIter.hasNext()) {
-      org.apache.gobblin.service.monitoring.JobStatus queriedJobStatus = jobStatusIter.next();
-
-      // Check if this is the flow status instead of a single job status
-      if (JobStatusRetriever.isFlowStatus(queriedJobStatus)) {
-        flowEndTime = queriedJobStatus.getEndTime();
-        flowExecutionStatus = ExecutionStatus.valueOf(queriedJobStatus.getEventName());
-        continue;
-      }
-
-      JobStatus jobStatus = new JobStatus();
-
-      jobStatus.setFlowId(flowId)
-          .setJobId(new JobId().setJobName(queriedJobStatus.getJobName())
-              .setJobGroup(queriedJobStatus.getJobGroup()))
-          .setJobTag(queriedJobStatus.getJobTag(), SetMode.IGNORE_NULL)
-          .setExecutionStatistics(new JobStatistics()
-              .setExecutionStartTime(queriedJobStatus.getStartTime())
-              .setExecutionEndTime(queriedJobStatus.getEndTime())
-              .setProcessedCount(queriedJobStatus.getProcessedCount()))
-          .setExecutionStatus(ExecutionStatus.valueOf(queriedJobStatus.getEventName()))
-          .setMessage(queriedJobStatus.getMessage())
-          .setJobState(new JobState().setLowWatermark(queriedJobStatus.getLowWatermark()).
-              setHighWatermark(queriedJobStatus.getHighWatermark()));
-
-      jobStatusArray.add(jobStatus);
-
-      if (!queriedJobStatus.getMessage().isEmpty()) {
-        flowMessagesStringBuffer.append(queriedJobStatus.getMessage());
-        flowMessagesStringBuffer.append(MESSAGE_SEPARATOR);
-      }
-    }
-
-    String flowMessages = flowMessagesStringBuffer.length() > 0 ?
-        flowMessagesStringBuffer.substring(0, flowMessagesStringBuffer.length() -
-            MESSAGE_SEPARATOR.length()) : StringUtils.EMPTY;
-
+    FlowExecution flowExecution = FlowExecutionResource.convertFlowStatus(monitoringFlowStatus);
     return new FlowStatus()
-        .setId(new FlowStatusId().setFlowGroup(flowId.getFlowGroup()).setFlowName(flowId.getFlowName())
-            .setFlowExecutionId(monitoringFlowStatus.getFlowExecutionId()))
-        .setExecutionStatistics(new FlowStatistics().setExecutionStartTime(getFlowStartTime(monitoringFlowStatus))
-            .setExecutionEndTime(flowEndTime))
-        .setMessage(flowMessages)
-        .setExecutionStatus(flowExecutionStatus)
-        .setJobStatuses(jobStatusArray);
-  }
-
-  /**
-   * Return the flow start time given a {@link org.apache.gobblin.service.monitoring.FlowStatus}. Flow execution ID is
-   * assumed to be the flow start time.
-   */
-  private static long getFlowStartTime(org.apache.gobblin.service.monitoring.FlowStatus flowStatus) {
-    return flowStatus.getFlowExecutionId();
+        .setId(flowExecution.getId())
+        .setExecutionStatistics(flowExecution.getExecutionStatistics())
+        .setMessage(flowExecution.getMessage())
+        .setExecutionStatus(flowExecution.getExecutionStatus())
+        .setJobStatuses(flowExecution.getJobStatuses());
   }
 }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index c757485..a7a451d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -23,6 +23,7 @@ import java.util.stream.Collectors;
 
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
 
 import lombok.Builder;
 
@@ -38,6 +39,7 @@ public class FlowStatusGenerator {
   public static final List<String> FINISHED_STATUSES = Lists.newArrayList("FAILED", "COMPLETE", "CANCELLED");
 
   private final JobStatusRetriever jobStatusRetriever;
+  private final EventBus eventBus;
 
   /**
    * Get the flow statuses of last <code>count</code> (or fewer) executions
@@ -120,4 +122,11 @@ public class FlowStatusGenerator {
     String status = jobStatus.getEventName().toUpperCase();
     return !FINISHED_STATUSES.contains(status);
   }
+
+  /**
+   * Send kill request for the given flow
+   */
+  public void killFlow(String flowGroup, String flowName, Long flowExecutionId) {
+    this.eventBus.post(new KillFlowEvent(flowGroup, flowName, flowExecutionId));
+  }
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/KillFlowEvent.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/KillFlowEvent.java
new file mode 100644
index 0000000..ae0458e
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/KillFlowEvent.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+
+@AllArgsConstructor
+@Data
+public class KillFlowEvent {
+  private String flowGroup;
+  private String flowName;
+  private Long flowExecutionId;
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 0818b1d..5cd3bc2 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -118,6 +118,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
   private volatile boolean stopInProgress = false;
 
   // An EventBus used for communications between services running in the ApplicationMaster
+  @Getter
   protected final EventBus eventBus = new EventBus(GobblinServiceManager.class.getSimpleName());
 
   protected final FileSystem fs;
@@ -490,6 +491,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
     if (!this.helixManager.isPresent() || this.helixManager.get().isLeader()){
       if (this.isDagManagerEnabled) {
         this.dagManager.setActive(true);
+        this.eventBus.register(this.dagManager);
       }
     }
   }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 1dff94a..67b3e38 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -43,6 +43,7 @@ import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.eventbus.Subscribe;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -75,6 +76,7 @@ import org.apache.gobblin.service.monitoring.JobStatus;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
 import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
@@ -274,11 +276,28 @@ public class DagManager extends AbstractIdleService {
     log.info("Found {} flows to cancel.", flowExecutionIds.size());
 
     for (long flowExecutionId : flowExecutionIds) {
-      int queueId =  DagManagerUtils.getDagQueueId(flowExecutionId, this.numThreads);
-      String dagId = DagManagerUtils.generateDagId(flowGroup, flowName, flowExecutionId);
-      if (!this.cancelQueue[queueId].offer(dagId)) {
-        throw new IOException("Could not add dag " + dagId + " to cancellation queue.");
-      }
+      killFlow(flowGroup, flowName, flowExecutionId);
+    }
+  }
+
+  /**
+   * Add the specified flow to {@link DagManager#cancelQueue}
+   */
+  private void killFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException {
+    int queueId =  DagManagerUtils.getDagQueueId(flowExecutionId, this.numThreads);
+    String dagId = DagManagerUtils.generateDagId(flowGroup, flowName, flowExecutionId);
+    if (!this.cancelQueue[queueId].offer(dagId)) {
+      throw new IOException("Could not add dag " + dagId + " to cancellation queue.");
+    }
+  }
+
+  @Subscribe
+  public void handleKillFlowEvent(KillFlowEvent killFlowEvent) {
+    log.info("Received kill request for flow ({}, {}, {})", killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(), killFlowEvent.getFlowExecutionId());
+    try {
+      killFlow(killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(), killFlowEvent.getFlowExecutionId());
+    } catch (IOException e) {
+      log.warn("Failed to kill flow", e);
     }
   }