You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/09/23 17:04:49 UTC

[gobblin] branch master updated: [GOBBLIN-1706] Add DagActionStore to store the action to kill/resume one flow execution (#3558)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new bbd18f33c [GOBBLIN-1706] Add DagActionStore to store the action to kill/resume one flow execution (#3558)
bbd18f33c is described below

commit bbd18f33c9df0776c837b2df1353270b5719edae
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Fri Sep 23 10:04:41 2022 -0700

    [GOBBLIN-1706] Add DagActionStore to store the action to kill/resume one flow execution (#3558)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1706]Add DagActionStore to store the action to kill/resume one flow execution
    
    * add new flow execution handler which use DagactionStore to persist dag actions and let other host get the info
    
    * Make dag manager integrate with the dag action store
    
    * address comments
    
    * address comments
    
    * fix typo and add comments
    
    * [GOBBLIN-1699] Log progress of reducer task for visibility with slow compaction jobs #3552
    
    * before starting reduce
    * after first record is reduced
    * after reducing every 1000 records
    
    Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
    
    * [GOBBLIN-1673][GOBBLIN-1683] Skeleton code for handling messages between task runner / application master for Dynamic work unit allocation (#3539)
    
    * [GOBBLIN-1673] Schema for dynamic work unit message
    
    * [GOBBLIN-1683] Dynamic Work Unit messaging abstractions
    
    * [GOBBLIN-1698] Fast fail during work unit generation based on config. (#3542)
    
    * fast fail during work unit generation based on config.
    
    * [GOBBLIN-1690] Added logging to ORC writer
    
    Closes #3543 from rdsr/master
    
    * [GOBBLIN-1678] Refactor git flowgraph component to be extensible (#3536)
    
    * Refactor git flowgraph component to be extensible
    
    * Move files to appropriate modules
    
    * Cleanup and add javadocs
    
    * Cleanup, add missing javadocs
    
    * Address review and import order
    
    * Fix findbugs
    
    * Use java sort instead of collections
    
    * Add GMCE topic explicitly to hive commit event (#3547)
    
    * [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode (#3544)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode
    
    * add orchestor as listener before service start
    
    * fix code style
    
    * address comments
    
    * fix test case to test orchestor as one listener of flow spec
    
    * remove unintentional change
    
    * remove unused import
    
    * address comments
    
    * fix typo
    
    Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
    
    * fast fail during work unit generation based on config.
    
    Co-authored-by: Meeth Gala <mg...@linkedin.com>
    Co-authored-by: Ratandeep <rd...@gmail.com>
    Co-authored-by: William Lo <lo...@gmail.com>
    Co-authored-by: Jack Moseley <jm...@linkedin.com>
    Co-authored-by: Zihan Li <zi...@linkedin.com>
    Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
    
    * Define basics for collecting Iceberg metadata for the current snapshot (#3559)
    
    * [GOBBLIN-1701] Replace jcenter with either maven central or gradle plugin portal (#3554)
    
    * remove jcentral
    * Use gradle plugin portal for shadow
    * Use maven central in all other cases
    
    * [GOBBLIN-1695] Fix: Failure to add spec executors doesn't block deployment (#3551)
    
    * Allow first time failure to authenticate with Azkaban to fail silently
    
    * Fix findbugs report
    
    * Refactor azkaban authentication into function. Call on init and if session_id is null when adding a flow
    
    * Add handling for fetchSession throwing an exception
    
    * Add logging when fails on constructor and initialization, but continue to local deploy
    
    * Revert changes for azkabanSpecProducer, but quiet log instead of throw in constructor
    
    * Fixed vars
    
    * Revert changes on azkabanSpecProducer
    
    * clean up error throwing
    
    * revert function checking changes
    
    * Reformat file
    
    * Clean up function
    
    * Format file for try/catch
    
    * Allow first time failure to authenticate with Azkaban to fail silently
    
    * Fix findbugs report
    
    * Refactor azkaban authentication into function. Call on init and if session_id is null when adding a flow
    
    * Fixed rebase
    
    * Fixed rebase
    
    * Revert changes for azkabanSpecProducer, but quiet log instead of throw in constructor
    
    * Add whitespace back
    
    * fix helix job wait completion bug when job goes to STOPPING state (#3556)
    
    address comments
    
    update stoppingStateEndTime with currentTime
    
    update test cases
    
    * [GOBBLIN-1699] Log progress of reducer task for visibility with slow compaction jobs #3552
    
    * before starting reduce
    * after first record is reduced
    * after reducing every 1000 records
    
    Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
    
    * Define basics for collecting Iceberg metadata for the current snapshot
    
    * [GOBBLIN-1673][GOBBLIN-1683] Skeleton code for handling messages between task runner / application master for Dynamic work unit allocation (#3539)
    
    * [GOBBLIN-1673] Schema for dynamic work unit message
    
    * [GOBBLIN-1683] Dynamic Work Unit messaging abstractions
    
    * Address review comments
    
    * Correct import order
    
    Co-authored-by: Matthew Ho <ho...@gmail.com>
    Co-authored-by: Andy Jiang <20...@users.noreply.github.com>
    Co-authored-by: Hanghang Nate Liu <na...@gmail.com>
    Co-authored-by: umustafi <um...@gmail.com>
    Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
    Co-authored-by: William Lo <lo...@gmail.com>
    
    * [GOBBLIN-1710]  Codecov should be optional in CI and not fail Github Actions (#3562)
    
    * [GOBBLIN-1711] Replace Jcenter with maven central (#3566)
    
    * [GOBBLIN-1697]Have a separate resource handler to rely on CDC stream to do message forwarding (#3549)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * fix test case to test orchestor as one listener of flow spec
    
    * remove unintentional change
    
    * [GOBBLIN-1697]Have a separate resource handler to rely on CDC stream to do message forwarding
    
    * fix compilation error
    
    * address comments
    
    * address comments
    
    * address comments
    
    * update outdated javadoc
    
    Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
    
    * [GOBBLIN-1709] Create Iceberg Datasets Finder, Iceberg Dataset and FileSet to generate Copy Entities to support Distcp for Iceberg (#3560)
    
    * initial commit for iceberg distcp.
    
    * adding copy entity helper and icerbeg distcp template and test case.
    
    * Adding unit tests and refactoring method definitions for an Iceberg dataset.
    
    * resolve conflicts after cleaning history
    
    * update iceberg dataset and finder to include javadoc
    
    * addressed comments on PR and aligned code check style
    
    * renamed vars, added logging and updated javadoc
    
    * update dataset descriptor with ternary operation and rename fs to sourceFs
    
    * added source and target fs and update iceberg dataset finder constructor
    
    * Update source and dest dataset methods as protected and add req args constructor
    
    * change the order of attributes for iceberg dataset finder ctor
    
    * update iceberg dataset methods with correct source and target fs
    
    Co-authored-by: Meeth Gala <mg...@linkedin.com>
    
    * [GOBBLIN-1707] Add `IcebergTableTest` unit test (#3564)
    
    * Add `IcebergTableTest` unit test
    
    * Fixup comment and indentation
    
    * Minor correction of `Long` => `Integer`
    
    * Correct comment
    
    * [GOBBLIN-1711] Replace Jcenter with maven central (#3566)
    
    * Minor rename of local var
    
    Co-authored-by: Matthew Ho <ho...@gmail.com>
    
    * [GOBBLIN-1708] Improve TimeAwareRecursiveCopyableDataset to lookback only into datefolders that match range (#3563)
    
    * Check datetime range validity prior to recursing
    
    * Remove unused packages
    
    * Remove extra line
    
    * Reformat function
    
    * Check string prior to parsing
    
    * removed unused import
    
    * Change checkpathdatetimevalidity to use available localdatetime library parsing functions
    
    * Change to isempty
    
    * Modify check path to be flexible
    
    * Update javadoc
    
    * Add unit tests and refactor
    
    * change bind class as GOBBLIN-1697 get merged
    
    Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
    Co-authored-by: umustafi <um...@gmail.com>
    Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
    Co-authored-by: Matthew Ho <ho...@gmail.com>
    Co-authored-by: meethngala <me...@gmail.com>
    Co-authored-by: Meeth Gala <mg...@linkedin.com>
    Co-authored-by: Ratandeep <rd...@gmail.com>
    Co-authored-by: William Lo <lo...@gmail.com>
    Co-authored-by: Jack Moseley <jm...@linkedin.com>
    Co-authored-by: Kip Kohn <ck...@linkedin.com>
    Co-authored-by: Andy Jiang <20...@users.noreply.github.com>
    Co-authored-by: Hanghang Nate Liu <na...@gmail.com>
---
 .../apache/gobblin/service/ServiceConfigKeys.java  |   1 +
 .../java/org/apache/gobblin/MysqlDagStore.java     |   2 +-
 .../apache/gobblin/runtime/api/DagActionStore.java |  95 +++++++++++
 .../dag_action_store/MysqlDagActionStore.java      | 174 +++++++++++++++++++++
 .../dag_action_store/MysqlDagActionStoreTest.java  | 100 ++++++++++++
 .../modules/core/GobblinServiceGuiceModule.java    |  20 ++-
 .../orchestration/AbstractUserQuotaManager.java    |   8 +-
 .../service/modules/orchestration/DagManager.java  | 131 +++++++++++-----
 .../modules/orchestration/DagManagerUtils.java     |  28 ++--
 .../modules/orchestration/FSDagStateStore.java     |   2 +-
 .../modules/orchestration/MysqlDagStateStore.java  |   4 +-
 ...lowExecutionResourceHandlerWithWarmStandby.java |  95 +++++++++++
 .../modules/orchestration/DagManagerFlowTest.java  |  51 ++++--
 .../modules/orchestration/DagManagerTest.java      |  35 +++--
 .../modules/orchestration/FSDagStateStoreTest.java |   6 +-
 .../orchestration/MysqlDagStateStoreTest.java      |   8 +-
 16 files changed, 667 insertions(+), 93 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index dbb9ef072..55b8282fe 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -152,6 +152,7 @@ public class ServiceConfigKeys {
 
   public static final int MAX_FLOW_NAME_LENGTH = 128; // defined in FlowId.pdl
   public static final int MAX_FLOW_GROUP_LENGTH = 128; // defined in FlowId.pdl
+  public static final int MAX_FLOW_EXECUTION_ID_LENGTH = 13; // length of flowExecutionId which is epoch timestamp
   public static final int MAX_JOB_NAME_LENGTH = 374;
   public static final int MAX_JOB_GROUP_LENGTH = 374;
   public static final String STATE_STORE_TABLE_SUFFIX = "gst";
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/MysqlDagStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/MysqlDagStore.java
index e373577b1..18c56b4c9 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/MysqlDagStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/MysqlDagStore.java
@@ -52,7 +52,7 @@ public class MysqlDagStore<T extends State> extends MysqlStateStore<T> {
   protected String getCreateJobStateTableTemplate() {
     int maxStoreName = ServiceConfigKeys.MAX_FLOW_NAME_LENGTH + ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER.length()
         + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH;
-    int maxTableName = 13; // length of flowExecutionId which is epoch timestamp
+    int maxTableName = ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH;
 
     return "CREATE TABLE IF NOT EXISTS $TABLE$ (store_name varchar(" + maxStoreName + ") CHARACTER SET latin1 COLLATE latin1_bin not null,"
         + "table_name varchar(" + maxTableName + ") CHARACTER SET latin1 COLLATE latin1_bin not null,"
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
new file mode 100644
index 000000000..5da8e6d31
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Collection;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+
+public interface DagActionStore {
+  enum DagActionValue {
+    KILL,
+    RESUME
+  }
+
+  @Getter
+  @EqualsAndHashCode
+  class DagAction {
+    String flowGroup;
+    String flowName;
+    String flowExecutionId;
+    DagActionValue dagActionValue;
+    public DagAction(String flowGroup, String flowName, String flowExecutionId, DagActionValue dagActionValue) {
+      this.flowGroup = flowGroup;
+      this.flowName = flowName;
+      this.flowExecutionId = flowExecutionId;
+      this.dagActionValue = dagActionValue;
+    }
+  }
+
+
+  /**
+   * Check if an action exists in dagAction store by flow group, flow name and flow execution id.
+   * @param flowGroup flow group for the dag action
+   * @param flowName flow name for the dag action
+   * @param flowExecutionId flow execution for the dag action
+   * @throws IOException
+   */
+  boolean exists(String flowGroup, String flowName, String flowExecutionId) throws IOException, SQLException;
+
+  /**
+   * Persist the dag action in {@link DagActionStore} for durability
+   * @param flowGroup flow group for the dag action
+   * @param flowName flow name for the dag action
+   * @param flowExecutionId flow execution for the dag action
+   * @param dagActionValue the value of the dag action
+   * @throws IOException
+   */
+  void addDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionValue dagActionValue) throws IOException;
+
+  /**
+   * delete the dag action from {@link DagActionStore}
+   * @param flowGroup flow group for the dag action
+   * @param flowName flow name for the dag action
+   * @param flowExecutionId flow execution for the dag action
+   * @throws IOException
+   * @return true if we successfully delete one record, return false if the record does not exist
+   */
+  boolean deleteDagAction(String flowGroup, String flowName, String flowExecutionId) throws IOException;
+
+  /***
+   * Retrieve action value by the flow group, flow name and flow execution id from the {@link DagActionStore}.
+   * @param flowGroup flow group for the dag action
+   * @param flowName flow name for the dag action
+   * @param flowExecutionId flow execution for the dag action
+   * @throws IOException Exception in retrieving the {@link DagAction}.
+   * @throws SpecNotFoundException If {@link DagAction} being retrieved is not present in store.
+   */
+  DagAction getDagAction(String flowGroup, String flowName, String flowExecutionId) throws IOException, SpecNotFoundException,
+                                                                                           SQLException;
+
+  /***
+   * Get all {@link DagAction}s from the {@link DagActionStore}.
+   * @throws IOException Exception in retrieving {@link DagAction}s.
+   */
+  Collection<DagAction> getDagActions() throws IOException;
+
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
new file mode 100644
index 000000000..7600e304f
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.dag_action_store;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.HashSet;
+import javax.sql.DataSource;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class MysqlDagActionStore implements DagActionStore {
+
+  public static final String CONFIG_PREFIX = "MysqlDagActionStore";
+
+
+  protected final DataSource dataSource;
+  private final String tableName;
+  private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM %s WHERE flow_group = ? AND flow_name =? AND flow_execution_id = ?)";
+
+  protected static final String INSERT_STATEMENT = "INSERT INTO %s (flow_group, flow_name, flow_execution_id, dag_action ) "
+      + "VALUES (?, ?, ?, ?)";
+  private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE flow_group = ? AND flow_name =? AND flow_execution_id = ?";
+  private static final String GET_STATEMENT = "SELECT flow_group, flow_name, flow_execution_id, dag_action FROM %s WHERE flow_group = ? AND flow_name =? AND flow_execution_id = ?";
+  private static final String GET_ALL_STATEMENT = "SELECT flow_group, flow_name, flow_execution_id, dag_action FROM %s";
+  private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (" +
+  "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, "
+      + "flow_execution_id varchar(" + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, "
+      + "dag_action varchar(100) NOT NULL, modified_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP  on update CURRENT_TIMESTAMP NOT NULL, "
+      + "PRIMARY KEY (flow_group,flow_name,flow_execution_id))";
+
+  @Inject
+  public MysqlDagActionStore(Config config) throws IOException {
+    if (config.hasPath(CONFIG_PREFIX)) {
+      config = config.getConfig(CONFIG_PREFIX).withFallback(config);
+    } else {
+      throw new IOException("Please specify the config for MysqlDagActionStore");
+    }
+    this.tableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
+        ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
+
+    this.dataSource = MysqlStateStore.newDataSource(config);
+    try (Connection connection = dataSource.getConnection();
+        PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
+      createStatement.executeUpdate();
+      connection.commit();
+    } catch (SQLException e) {
+      throw new IOException("Failure creation table " + tableName, e);
+    }
+  }
+
+  @Override
+  public boolean exists(String flowGroup, String flowName, String flowExecutionId) throws IOException, SQLException {
+    ResultSet rs = null;
+    try (Connection connection = this.dataSource.getConnection();
+        PreparedStatement existStatement = connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) {
+      int i = 0;
+      existStatement.setString(++i, flowGroup);
+      existStatement.setString(++i, flowName);
+      existStatement.setString(++i, flowExecutionId);
+      rs = existStatement.executeQuery();
+      rs.next();
+      return rs.getBoolean(1);
+    } catch (SQLException e) {
+      throw new IOException(String.format("Failure checking existence for table %s of flow with flow group:%s, flow name:%s and flow execution id:%s",
+          tableName, flowGroup, flowName, flowExecutionId), e);
+    } finally {
+      if (rs != null) {
+        rs.close();
+      }
+    }
+  }
+
+  @Override
+  public void addDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionValue dagActionValue)
+      throws IOException {
+    try (Connection connection = this.dataSource.getConnection();
+        PreparedStatement insertStatement = connection.prepareStatement(String.format(INSERT_STATEMENT, tableName))) {
+      int i = 0;
+      insertStatement.setString(++i, flowGroup);
+      insertStatement.setString(++i, flowName);
+      insertStatement.setString(++i, flowExecutionId);
+      insertStatement.setString(++i, dagActionValue.toString());
+      insertStatement.executeUpdate();
+      connection.commit();
+    } catch (SQLException e) {
+      throw new IOException(String.format("Failure to adding action for table %s of flow with flow group:%s, flow name:%s and flow execution id:%s",
+          tableName, flowGroup, flowName, flowExecutionId), e);
+    }
+  }
+
+  @Override
+  public boolean deleteDagAction(String flowGroup, String flowName, String flowExecutionId) throws IOException {
+    try (Connection connection = this.dataSource.getConnection();
+        PreparedStatement deleteStatement = connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) {
+      int i = 0;
+      deleteStatement.setString(++i, flowGroup);
+      deleteStatement.setString(++i, flowName);
+      deleteStatement.setString(++i, flowExecutionId);
+      int result = deleteStatement.executeUpdate();
+      connection.commit();
+      return result != 0;
+    } catch (SQLException e) {
+      throw new IOException(String.format("Failure to delete action for table %s of flow with flow group:%s, flow name:%s and flow execution id:%s",
+          tableName, flowGroup, flowName, flowExecutionId), e);
+    }
+  }
+
+  @Override
+  public DagAction getDagAction(String flowGroup, String flowName, String flowExecutionId)
+      throws IOException, SQLException {
+    ResultSet rs = null;
+    try (Connection connection = this.dataSource.getConnection();
+        PreparedStatement getStatement = connection.prepareStatement(String.format(GET_STATEMENT, tableName))) {
+      int i = 0;
+      getStatement.setString(++i, flowGroup);
+      getStatement.setString(++i, flowName);
+      getStatement.setString(++i, flowExecutionId);
+      rs = getStatement.executeQuery();
+      if (rs.next()) {
+        return new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), DagActionValue.valueOf(rs.getString(4)));
+      }
+      return null;
+    } catch (SQLException e) {
+      throw new IOException(String.format("Failure get dag action from table %s of flow with flow group:%s, flow name:%s and flow execution id:%s",
+          tableName, flowGroup, flowName, flowExecutionId), e);
+    } finally {
+      if (rs != null) {
+        rs.close();
+      }
+    }
+  }
+
+  @Override
+  public Collection<DagAction> getDagActions() throws IOException {
+    HashSet<DagAction> result = new HashSet<>();
+    try (Connection connection = this.dataSource.getConnection();
+        PreparedStatement getAllStatement = connection.prepareStatement(String.format(GET_ALL_STATEMENT, tableName));
+        ResultSet rs = getAllStatement.executeQuery()) {
+      while (rs.next()) {
+        result.add(
+            new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), DagActionValue.valueOf(rs.getString(4))));
+      }
+      return result;
+    } catch (SQLException e) {
+      throw new IOException(String.format("Failure get dag actions from table %s ", tableName), e);
+    }
+  }
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java
new file mode 100644
index 000000000..0c65f2241
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.dag_action_store;
+
+import java.io.IOException;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.HashSet;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+
+public class MysqlDagActionStoreTest {
+  private static final String USER = "testUser";
+  private static final String PASSWORD = "testPassword";
+  private static final String TABLE = "dag_action_store";
+  private static final String flowGroup = "testFlowGroup";
+  private static final String flowName = "testFlowName";
+  private static final String flowExecutionId = "12345677";
+  private static final String flowExecutionId_2 = "12345678";
+  private static final String flowExecutionId_3 = "12345679";
+  private MysqlDagActionStore mysqlDagActionStore;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+
+    Config config = ConfigBuilder.create()
+        .addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
+        .addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+        .addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+        .addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+        .build();
+
+    this.mysqlDagActionStore = new MysqlDagActionStore(config);
+  }
+
+  @Test
+  public void testAddAction() throws Exception {
+    this.mysqlDagActionStore.addDagAction(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.KILL);
+    //Should not be able to add again when previous one exist
+    Assert.expectThrows(IOException.class,
+        () -> this.mysqlDagActionStore.addDagAction(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.RESUME));
+    //Should be able to add un-exist one
+    this.mysqlDagActionStore.addDagAction(flowGroup, flowName, flowExecutionId_2, DagActionStore.DagActionValue.RESUME);
+  }
+
+  @Test(dependsOnMethods = "testAddAction")
+  public void testExists() throws Exception {
+    Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, flowExecutionId));
+    Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, flowExecutionId_2));
+    Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName, flowExecutionId_3));
+  }
+
+  @Test(dependsOnMethods = "testExists")
+  public void testGetAction() throws IOException, SQLException {
+    Assert.assertEquals(new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.KILL), this.mysqlDagActionStore.getDagAction(flowGroup, flowName, flowExecutionId));
+    Assert.assertEquals(new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId_2, DagActionStore.DagActionValue.RESUME), this.mysqlDagActionStore.getDagAction(flowGroup, flowName, flowExecutionId_2));
+    Collection<DagActionStore.DagAction> dagActions = this.mysqlDagActionStore.getDagActions();
+    Assert.assertEquals(2, dagActions.size());
+    HashSet<DagActionStore.DagAction> set = new HashSet<>();
+    set.add(new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.KILL));
+    set.add(new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId_2, DagActionStore.DagActionValue.RESUME));
+    Assert.assertEquals(dagActions, set);
+  }
+
+  @Test(dependsOnMethods = "testGetAction")
+  public void testDeleteAction() throws IOException, SQLException {
+   this.mysqlDagActionStore.deleteDagAction(flowGroup, flowName, flowExecutionId);
+   Assert.assertEquals(this.mysqlDagActionStore.getDagActions().size(), 1);
+   Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName, flowExecutionId));
+   Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, flowExecutionId_2));
+   Assert.assertNull( this.mysqlDagActionStore.getDagAction(flowGroup, flowName, flowExecutionId));
+  }
+
+}
\ No newline at end of file
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 310073a8c..6b9645f19 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -19,7 +19,12 @@ package org.apache.gobblin.service.modules.core;
 
 import java.util.Objects;
 
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+//import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
+import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
+import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
 import org.apache.gobblin.service.monitoring.GitConfigMonitor;
 import org.apache.helix.HelixManager;
 import org.slf4j.Logger;
@@ -139,10 +144,17 @@ public class GobblinServiceGuiceModule implements Module {
     binder.bindConstant()
         .annotatedWith(Names.named(InjectionNames.WARM_STANDBY_ENABLED))
         .to(serviceConfig.isWarmStandbyEnabled());
-
-    binder.bind(FlowConfigsResourceHandler.class).to(GobblinServiceFlowConfigResourceHandler.class);
-    binder.bind(FlowConfigsV2ResourceHandler.class).to(GobblinServiceFlowConfigV2ResourceHandler.class);
-    binder.bind(FlowExecutionResourceHandler.class).to(GobblinServiceFlowExecutionResourceHandler.class);
+    OptionalBinder.newOptionalBinder(binder, DagActionStore.class);
+    if (serviceConfig.isWarmStandbyEnabled()) {
+      binder.bind(DagActionStore.class).to(MysqlDagActionStore.class);
+      binder.bind(FlowConfigsResourceHandler.class).to(GobblinServiceFlowConfigResourceHandler.class);
+      binder.bind(FlowConfigsV2ResourceHandler.class).to(GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby.class);
+      binder.bind(FlowExecutionResourceHandler.class).to(GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.class);
+    } else {
+      binder.bind(FlowConfigsResourceHandler.class).to(GobblinServiceFlowConfigResourceHandler.class);
+      binder.bind(FlowConfigsV2ResourceHandler.class).to(GobblinServiceFlowConfigV2ResourceHandler.class);
+      binder.bind(FlowExecutionResourceHandler.class).to(GobblinServiceFlowExecutionResourceHandler.class);
+    }
 
 
     binder.bind(FlowConfigsResource.class);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
index 8d6cab041..fad079bf0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
@@ -93,16 +93,16 @@ abstract public class AbstractUserQuotaManager implements UserQuotaManager {
     decrementJobCount(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), CountType.USER_COUNT);
     decrementQuotaUsageForUsers(usersQuotaIncrement);
     decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);
-    runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
+    runningDagIds.remove(DagManagerUtils.generateDagId(dagNode).toString());
   }
 
   protected QuotaCheck increaseAndCheckQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
     QuotaCheck quotaCheck = new QuotaCheck(true, true, true, "");
     // Dag is already being tracked, no need to double increment for retries and multihop flows
-    if (this.runningDagIds.contains(DagManagerUtils.generateDagId(dagNode))) {
+    if (this.runningDagIds.contains(DagManagerUtils.generateDagId(dagNode).toString())) {
       return quotaCheck;
     } else {
-      runningDagIds.add(DagManagerUtils.generateDagId(dagNode));
+      runningDagIds.add(DagManagerUtils.generateDagId(dagNode).toString());
     }
 
     String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
@@ -169,7 +169,7 @@ abstract public class AbstractUserQuotaManager implements UserQuotaManager {
    * Returns true if the dag existed in the set of running dags and was removed successfully
    */
   public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
-    boolean val = runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
+    boolean val = runningDagIds.remove(DagManagerUtils.generateDagId(dagNode).toString());
     if (!val) {
       return false;
     }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 3543c0e16..fcb2ff4c1 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -18,9 +18,13 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import com.codahale.metrics.Meter;
+import com.google.common.base.Joiner;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -49,8 +53,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigException;
 
-import javax.inject.Inject;
-import javax.inject.Singleton;
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -62,6 +65,7 @@ import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
@@ -156,9 +160,27 @@ public class DagManager extends AbstractIdleService {
     }
   }
 
+  @Getter
+  @EqualsAndHashCode
+  public static class DagId {
+    String flowGroup;
+    String flowName;
+    String flowExecutionId;
+    public DagId(String flowGroup, String flowName, String flowExecutionId) {
+      this.flowGroup = flowGroup;
+      this.flowName = flowName;
+      this.flowExecutionId = flowExecutionId;
+    }
+
+    @Override
+    public String toString() {
+      return Joiner.on("_").join(flowGroup, flowName, flowExecutionId);
+    }
+  }
+
   private final BlockingQueue<Dag<JobExecutionPlan>>[] runQueue;
-  private final BlockingQueue<String>[] cancelQueue;
-  private final BlockingQueue<String>[] resumeQueue;
+  private final BlockingQueue<DagId>[] cancelQueue;
+  private final BlockingQueue<DagId>[] resumeQueue;
   DagManagerThread[] dagManagerThreads;
 
   private final ScheduledExecutorService scheduledExecutorPool;
@@ -177,15 +199,16 @@ public class DagManager extends AbstractIdleService {
   private final Optional<EventSubmitter> eventSubmitter;
   private final long failedDagRetentionTime;
   private final DagManagerMetrics dagManagerMetrics;
+  private final Optional<DagActionStore> dagActionStore;
 
   private volatile boolean isActive = false;
 
-  public DagManager(Config config, JobStatusRetriever jobStatusRetriever, boolean instrumentationEnabled) {
+  public DagManager(Config config, JobStatusRetriever jobStatusRetriever, Optional<DagActionStore> dagActionStore, boolean instrumentationEnabled) {
     this.config = config;
     this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, DEFAULT_NUM_THREADS);
     this.runQueue = (BlockingQueue<Dag<JobExecutionPlan>>[]) initializeDagQueue(this.numThreads);
-    this.cancelQueue = (BlockingQueue<String>[]) initializeDagQueue(this.numThreads);
-    this.resumeQueue = (BlockingQueue<String>[]) initializeDagQueue(this.numThreads);
+    this.cancelQueue = (BlockingQueue<DagId>[]) initializeDagQueue(this.numThreads);
+    this.resumeQueue = (BlockingQueue<DagId>[]) initializeDagQueue(this.numThreads);
     this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
     this.pollingInterval = ConfigUtils.getInt(config, JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
     this.retentionPollingInterval = ConfigUtils.getInt(config, FAILED_DAG_POLLING_INTERVAL, DEFAULT_FAILED_DAG_POLLING_INTERVAL);
@@ -197,6 +220,7 @@ public class DagManager extends AbstractIdleService {
     } else {
       this.eventSubmitter = Optional.absent();
     }
+    this.dagActionStore = dagActionStore;
     this.dagManagerMetrics = new DagManagerMetrics(metricContext);
     TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, JOB_START_SLA_UNITS, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
     this.defaultJobStartSlaTimeMillis = jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
@@ -224,9 +248,9 @@ public class DagManager extends AbstractIdleService {
     return queue;
   }
 
-  @Inject
-  public DagManager(Config config, JobStatusRetriever jobStatusRetriever) {
-    this(config, jobStatusRetriever, true);
+  @Inject(optional = true)
+  public DagManager(Config config, JobStatusRetriever jobStatusRetriever, Optional<DagActionStore> dagActionStore) {
+    this(config, jobStatusRetriever, dagActionStore, true);
   }
 
   /** Do Nothing on service startup. Scheduling of {@link DagManagerThread}s and loading of any {@link Dag}s is done
@@ -295,7 +319,7 @@ public class DagManager extends AbstractIdleService {
    */
   private void killFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException {
     int queueId =  DagManagerUtils.getDagQueueId(flowExecutionId, this.numThreads);
-    String dagId = DagManagerUtils.generateDagId(flowGroup, flowName, flowExecutionId);
+    DagId dagId = DagManagerUtils.generateDagId(flowGroup, flowName, flowExecutionId);
     if (!this.cancelQueue[queueId].offer(dagId)) {
       throw new IOException("Could not add dag " + dagId + " to cancellation queue.");
     }
@@ -303,21 +327,27 @@ public class DagManager extends AbstractIdleService {
 
   @Subscribe
   public void handleKillFlowEvent(KillFlowEvent killFlowEvent) {
-    log.info("Received kill request for flow ({}, {}, {})", killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(), killFlowEvent.getFlowExecutionId());
-    try {
-      killFlow(killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(), killFlowEvent.getFlowExecutionId());
-    } catch (IOException e) {
-      log.warn("Failed to kill flow", e);
+    if (isActive) {
+      log.info("Received kill request for flow ({}, {}, {})", killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(),
+          killFlowEvent.getFlowExecutionId());
+      try {
+        killFlow(killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(), killFlowEvent.getFlowExecutionId());
+      } catch (IOException e) {
+        log.warn("Failed to kill flow", e);
+      }
     }
   }
 
   @Subscribe
   public void handleResumeFlowEvent(ResumeFlowEvent resumeFlowEvent) {
-    log.info("Received resume request for flow ({}, {}, {})", resumeFlowEvent.getFlowGroup(), resumeFlowEvent.getFlowName(), resumeFlowEvent.getFlowExecutionId());
-    String dagId = DagManagerUtils.generateDagId(resumeFlowEvent.getFlowGroup(), resumeFlowEvent.getFlowName(), resumeFlowEvent.getFlowExecutionId());
-    int queueId = DagManagerUtils.getDagQueueId(resumeFlowEvent.getFlowExecutionId(), this.numThreads);
-    if (!this.resumeQueue[queueId].offer(dagId)) {
-      log.warn("Could not add dag " + dagId + " to resume queue");
+    if (isActive) {
+      log.info("Received resume request for flow ({}, {}, {})", resumeFlowEvent.getFlowGroup(), resumeFlowEvent.getFlowName(), resumeFlowEvent.getFlowExecutionId());
+      DagId dagId = DagManagerUtils.generateDagId(resumeFlowEvent.getFlowGroup(), resumeFlowEvent.getFlowName(),
+          resumeFlowEvent.getFlowExecutionId());
+      int queueId = DagManagerUtils.getDagQueueId(resumeFlowEvent.getFlowExecutionId(), this.numThreads);
+      if (!this.resumeQueue[queueId].offer(dagId)) {
+        log.warn("Could not add dag " + dagId + " to resume queue");
+      }
     }
   }
 
@@ -356,7 +386,7 @@ public class DagManager extends AbstractIdleService {
         //On startup, the service creates DagManagerThreads that are scheduled at a fixed rate.
         this.dagManagerThreads = new DagManagerThread[numThreads];
         for (int i = 0; i < numThreads; i++) {
-          DagManagerThread dagManagerThread = new DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
+          DagManagerThread dagManagerThread = new DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore, dagActionStore,
               runQueue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, failedDagIds, this.dagManagerMetrics,
               this.defaultJobStartSlaTimeMillis, quotaManager, i);
           this.dagManagerThreads[i] = dagManagerThread;
@@ -369,6 +399,21 @@ public class DagManager extends AbstractIdleService {
         for (Dag<JobExecutionPlan> dag : dags) {
           addDag(dag, false, false);
         }
+        if (dagActionStore.isPresent()) {
+          Collection<DagActionStore.DagAction> dagActions = dagActionStore.get().getDagActions();
+          for (DagActionStore.DagAction action : dagActions) {
+            switch (action.getDagActionValue()) {
+              case KILL:
+                this.handleKillFlowEvent(new KillFlowEvent(action.getFlowGroup(), action.getFlowName(), Long.parseLong(action.getFlowExecutionId())));
+                break;
+              case RESUME:
+                this.handleResumeFlowEvent(new ResumeFlowEvent(action.getFlowGroup(), action.getFlowName(), Long.parseLong(action.getFlowExecutionId())));
+                break;
+              default:
+                log.warn("Unsupported dagAction: " + action.getDagActionValue().toString());
+            }
+          }
+        }
       } else { //Mark the DagManager inactive.
         log.info("Inactivating the DagManager. Shutting down all DagManager threads");
         this.scheduledExecutorPool.shutdown();
@@ -413,16 +458,17 @@ public class DagManager extends AbstractIdleService {
     private final DagStateStore dagStateStore;
     private final DagStateStore failedDagStateStore;
     private final BlockingQueue<Dag<JobExecutionPlan>> queue;
-    private final BlockingQueue<String> cancelQueue;
-    private final BlockingQueue<String> resumeQueue;
+    private final BlockingQueue<DagId> cancelQueue;
+    private final BlockingQueue<DagId> resumeQueue;
     private final Long defaultJobStartSlaTimeMillis;
+    private final Optional<DagActionStore> dagActionStore;
     private final Optional<Meter> dagManagerThreadHeartbeat;
     /**
      * Constructor.
      */
     DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, DagStateStore failedDagStateStore,
-        BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> cancelQueue, BlockingQueue<String> resumeQueue,
-        boolean instrumentationEnabled, Set<String> failedDagIds, DagManagerMetrics dagManagerMetrics,
+        Optional<DagActionStore> dagActionStore, BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<DagId> cancelQueue,
+        BlockingQueue<DagId> resumeQueue, boolean instrumentationEnabled, Set<String> failedDagIds, DagManagerMetrics dagManagerMetrics,
         Long defaultJobStartSla, UserQuotaManager quotaManager, int dagMangerThreadId) {
       this.jobStatusRetriever = jobStatusRetriever;
       this.dagStateStore = dagStateStore;
@@ -434,6 +480,7 @@ public class DagManager extends AbstractIdleService {
       this.dagManagerMetrics = dagManagerMetrics;
       this.defaultJobStartSlaTimeMillis = defaultJobStartSla;
       this.quotaManager = quotaManager;
+      this.dagActionStore = dagActionStore;
 
       if (instrumentationEnabled) {
         this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
@@ -459,7 +506,7 @@ public class DagManager extends AbstractIdleService {
     @Override
     public void run() {
       try {
-        String nextDagToCancel = cancelQueue.poll();
+        DagId nextDagToCancel = cancelQueue.poll();
         //Poll the cancelQueue for a new Dag to cancel.
         if (nextDagToCancel != null) {
           cancelDag(nextDagToCancel);
@@ -478,7 +525,7 @@ public class DagManager extends AbstractIdleService {
         }
 
         while (!resumeQueue.isEmpty()) {
-          String dagId = resumeQueue.poll();
+          DagId dagId = resumeQueue.poll();
           beginResumingDag(dagId);
         }
 
@@ -498,18 +545,27 @@ public class DagManager extends AbstractIdleService {
       }
     }
 
+    private void clearUpDagAction(DagId dagId) throws IOException {
+      if (this.dagActionStore.isPresent()) {
+        this.dagActionStore.get().deleteDagAction(dagId.flowGroup, dagId.flowName, dagId.flowExecutionId);
+      }
+    }
+
     /**
      * Begin resuming a dag by setting the status of both the dag and the failed/cancelled dag nodes to {@link ExecutionStatus#PENDING_RESUME},
      * and also sending events so that this status will be reflected in the job status state store.
      */
-    private void beginResumingDag(String dagId) throws IOException {
+    private void beginResumingDag(DagId dagIdToResume) throws IOException {
+      String dagId= dagIdToResume.toString();
       if (!this.failedDagIds.contains(dagId)) {
         log.warn("No dag found with dagId " + dagId + ", so cannot resume flow");
+        clearUpDagAction(dagIdToResume);
         return;
       }
       Dag<JobExecutionPlan> dag = this.failedDagStateStore.getDag(dagId);
       if (dag == null) {
         log.error("Dag " + dagId + " was found in memory but not found in failed dag state store");
+        clearUpDagAction(dagIdToResume);
         return;
       }
 
@@ -560,6 +616,7 @@ public class DagManager extends AbstractIdleService {
         if (dagReady) {
           this.dagStateStore.writeCheckpoint(dag.getValue());
           this.failedDagStateStore.cleanUp(dag.getValue());
+          clearUpDagAction(DagManagerUtils.generateDagId(dag.getValue()));
           this.failedDagIds.remove(dag.getKey());
           this.resumingDags.remove(dag.getKey());
           initialize(dag.getValue());
@@ -573,7 +630,8 @@ public class DagManager extends AbstractIdleService {
      * @throws ExecutionException executionException
      * @throws InterruptedException interruptedException
      */
-    private void cancelDag(String dagToCancel) throws ExecutionException, InterruptedException {
+    private void cancelDag(DagId dagId) throws ExecutionException, InterruptedException, IOException {
+      String dagToCancel = dagId.toString();
       log.info("Cancel flow with DagId {}", dagToCancel);
       if (this.dagToJobs.containsKey(dagToCancel)) {
         List<DagNode<JobExecutionPlan>> dagNodesToCancel = this.dagToJobs.get(dagToCancel);
@@ -587,6 +645,7 @@ public class DagManager extends AbstractIdleService {
       } else {
         log.warn("Did not find Dag with id {}, it might be already cancelled/finished.", dagToCancel);
       }
+      clearUpDagAction(dagId);
     }
 
     private void cancelDagNode(DagNode<JobExecutionPlan> dagNodeToCancel) throws ExecutionException, InterruptedException {
@@ -615,7 +674,7 @@ public class DagManager extends AbstractIdleService {
     private void initialize(Dag<JobExecutionPlan> dag)
         throws IOException {
       //Add Dag to the map of running dags
-      String dagId = DagManagerUtils.generateDagId(dag);
+      String dagId = DagManagerUtils.generateDagId(dag).toString();
       log.info("Initializing Dag {}", DagManagerUtils.getFullyQualifiedDagName(dag));
       if (this.dags.containsKey(dagId)) {
         log.warn("Already tracking a dag with dagId {}, skipping.", dagId);
@@ -730,7 +789,7 @@ public class DagManager extends AbstractIdleService {
       }
 
       for (DagNode<JobExecutionPlan> dagNode: nodesToCleanUp) {
-        String dagId = DagManagerUtils.generateDagId(dagNode);
+        String dagId = DagManagerUtils.generateDagId(dagNode).toString();
         deleteJobState(dagId, dagNode);
       }
     }
@@ -759,7 +818,7 @@ public class DagManager extends AbstractIdleService {
         dagManagerMetrics.incrementCountsStartSlaExceeded(node);
         cancelDagNode(node);
 
-        String dagId = DagManagerUtils.generateDagId(node);
+        String dagId = DagManagerUtils.generateDagId(node).toString();
         this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
         this.dags.get(dagId).setMessage("Flow killed because no update received for " + timeOutForJobStart + " ms after orchestration");
         return true;
@@ -792,7 +851,7 @@ public class DagManager extends AbstractIdleService {
     private boolean slaKillIfNeeded(DagNode<JobExecutionPlan> node) throws ExecutionException, InterruptedException {
       long flowStartTime = DagManagerUtils.getFlowStartTime(node);
       long currentTime = System.currentTimeMillis();
-      String dagId = DagManagerUtils.generateDagId(node);
+      String dagId = DagManagerUtils.generateDagId(node).toString();
 
       long flowSla;
       if (dagToSLA.containsKey(dagId)) {
@@ -924,7 +983,7 @@ public class DagManager extends AbstractIdleService {
         Future<?> addSpecFuture = producer.addSpec(jobSpec);
         dagNode.getValue().setJobFuture(Optional.of(addSpecFuture));
         //Persist the dag
-        this.dagStateStore.writeCheckpoint(this.dags.get(DagManagerUtils.generateDagId(dagNode)));
+        this.dagStateStore.writeCheckpoint(this.dags.get(DagManagerUtils.generateDagId(dagNode).toString()));
 
         addSpecFuture.get();
 
@@ -957,7 +1016,7 @@ public class DagManager extends AbstractIdleService {
     private Map<String, Set<DagNode<JobExecutionPlan>>> onJobFinish(DagNode<JobExecutionPlan> dagNode)
         throws IOException {
       Dag<JobExecutionPlan> dag = this.jobToDag.get(dagNode);
-      String dagId = DagManagerUtils.generateDagId(dag);
+      String dagId = DagManagerUtils.generateDagId(dag).toString();
       String jobName = DagManagerUtils.getFullyQualifiedJobName(dagNode);
       ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
       log.info("Job {} of Dag {} has finished with status {}", jobName, dagId, jobStatus.name());
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index dc79524b3..d6fb0c99f 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -29,7 +29,6 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
@@ -83,29 +82,34 @@ public class DagManagerUtils {
     return Long.parseLong(dagId.substring(dagId.lastIndexOf('_') + 1));
   }
 
+
   /**
-   * Generate a dagId from the given {@link Dag} instance.
+   * Generate a dagId object from the given {@link Dag} instance.
    * @param dag instance of a {@link Dag}.
-   * @return a String id associated corresponding to the {@link Dag} instance.
+   * @return a DagId object associated corresponding to the {@link Dag} instance.
    */
-  static String generateDagId(Dag<JobExecutionPlan> dag) {
+  static DagManager.DagId generateDagId(Dag<JobExecutionPlan> dag) {
     return generateDagId(dag.getStartNodes().get(0).getValue().getJobSpec().getConfig());
   }
 
-  static String generateDagId(Dag.DagNode<JobExecutionPlan> dagNode) {
-    return generateDagId(dagNode.getValue().getJobSpec().getConfig());
-  }
-
-  private static String generateDagId(Config jobConfig) {
+  private static DagManager.DagId generateDagId(Config jobConfig) {
     String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
     String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
     long flowExecutionId = jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
 
-    return generateDagId(flowGroup, flowName, flowExecutionId);
+    return new DagManager.DagId(flowGroup, flowName, String.valueOf(flowExecutionId));
+  }
+
+  static DagManager.DagId generateDagId(Dag.DagNode<JobExecutionPlan> dagNode) {
+    return generateDagId(dagNode.getValue().getJobSpec().getConfig());
+  }
+
+  static DagManager.DagId generateDagId(String flowGroup, String flowName, long flowExecutionId) {
+    return generateDagId(flowGroup, flowName, String.valueOf(flowExecutionId));
   }
 
-  static String generateDagId(String flowGroup, String flowName, long flowExecutionId) {
-    return Joiner.on("_").join(flowGroup, flowName, flowExecutionId);
+  static DagManager.DagId generateDagId(String flowGroup, String flowName, String flowExecutionId) {
+    return new DagManager.DagId(flowGroup, flowName, flowExecutionId);
   }
 
   /**
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
index 6cc3cdb32..238c19edc 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
@@ -101,7 +101,7 @@ public class FSDagStateStore implements DagStateStore {
    */
   @Override
   public synchronized void cleanUp(Dag<JobExecutionPlan> dag) {
-    cleanUp(DagManagerUtils.generateDagId(dag));
+    cleanUp(DagManagerUtils.generateDagId(dag).toString());
   }
 
   /**
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
index 9f98a4bad..2a11fef26 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
@@ -107,13 +107,13 @@ public class MysqlDagStateStore implements DagStateStore {
   @Override
   public void writeCheckpoint(Dag<JobExecutionPlan> dag)
       throws IOException {
-    mysqlStateStore.put(getStoreNameFromDagId(generateDagId(dag)), getTableNameFromDagId(generateDagId(dag)), convertDagIntoState(dag));
+    mysqlStateStore.put(getStoreNameFromDagId(generateDagId(dag).toString()), getTableNameFromDagId(generateDagId(dag).toString()), convertDagIntoState(dag));
   }
 
   @Override
   public void cleanUp(Dag<JobExecutionPlan> dag)
       throws IOException {
-    cleanUp(generateDagId(dag));
+    cleanUp(generateDagId(dag).toString());
   }
 
   @Override
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
new file mode 100644
index 000000000..3f35152be
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.restli;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Inject;
+import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.server.RestLiServiceException;
+import com.linkedin.restli.server.UpdateResponse;
+import java.io.IOException;
+import java.sql.SQLException;
+import javax.inject.Named;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.util.InjectionNames;
+import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.helix.HelixManager;
+
+@Slf4j
+public class GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends GobblinServiceFlowExecutionResourceHandler{
+  private DagActionStore dagActionStore;
+  @Inject
+  public GobblinServiceFlowExecutionResourceHandlerWithWarmStandby(FlowExecutionResourceLocalHandler handler,
+      @Named(GobblinServiceManager.SERVICE_EVENT_BUS_NAME) EventBus eventBus,
+      Optional<HelixManager> manager, @Named(InjectionNames.FORCE_LEADER) boolean forceLeader, DagActionStore dagActionStore) {
+    super(handler, eventBus, manager, forceLeader);
+    this.dagActionStore = dagActionStore;
+  }
+
+
+  @Override
+  public void resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, EmptyRecord> key) {
+    String flowGroup = key.getKey().getFlowGroup();
+    String flowName = key.getKey().getFlowName();
+    Long flowExecutionId = key.getKey().getFlowExecutionId();
+    try {
+      this.dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId.toString(), DagActionStore.DagActionValue.RESUME);
+    } catch (IOException e) {
+      log.warn(
+          String.format("Failed to add execution resume action for flow %s %s %s to dag action store due to", flowGroup,
+              flowName, flowExecutionId), e);
+      this.handleException(flowGroup, flowName, flowExecutionId.toString(), e);
+    }
+
+  }
+
+  private void handleException (String flowGroup, String flowName, String flowExecutionId, Exception e) {
+    try {
+      if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId)) {
+        throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, e.getMessage());
+      } else {
+        throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
+      }
+    } catch (IOException | SQLException ex) {
+      throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
+    }
+  }
+
+  @Override
+  public UpdateResponse delete(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, EmptyRecord> key) {
+
+    String flowGroup = key.getKey().getFlowGroup();
+    String flowName = key.getKey().getFlowName();
+    Long flowExecutionId = key.getKey().getFlowExecutionId();
+    try {
+      this.dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId.toString(), DagActionStore.DagActionValue.KILL);
+      return new UpdateResponse(HttpStatus.S_200_OK);
+    } catch (IOException e) {
+      log.warn(
+          String.format("Failed to add execution delete action for flow %s %s %s to dag action store due to", flowGroup,
+              flowName, flowExecutionId), e);
+      handleException(flowGroup, flowName, flowExecutionId.toString(), e);
+      return new UpdateResponse(HttpStatus.S_500_INTERNAL_SERVER_ERROR);
+    }
+  }
+}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index cf18ddf9f..71ddd9333 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
+import com.google.common.base.Optional;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collections;
@@ -25,6 +26,11 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
@@ -57,14 +63,37 @@ public class DagManagerFlowTest {
   MockedDagManager dagManager;
   int dagNumThreads;
   static final String ERROR_MESSAGE = "Waiting for the map to update";
+  private static final String USER = "testUser";
+  private static final String PASSWORD = "testPassword";
+  private static final String TABLE = "dag_action_store";
+  private static final String flowGroup = "testFlowGroup";
+  private static final String flowName = "testFlowName";
+  private static final String flowExecutionId = "12345677";
+  private static final String flowExecutionId_2 = "12345678";
+  private DagActionStore dagActionStore;
 
   @BeforeClass
-  public void setUp() {
+  public void setUp() throws Exception {
     Properties props = new Properties();
     props.put(DagManager.JOB_STATUS_POLLING_INTERVAL_KEY, 1);
-    dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props), false);
+    ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+
+    Config config = ConfigBuilder.create()
+        .addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
+        .addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+        .addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+        .addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+        .build();
+
+    dagActionStore = new MysqlDagActionStore(config);
+    dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.KILL);
+    dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId_2, DagActionStore.DagActionValue.RESUME);
+    dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props), Optional.of(dagActionStore), false);
     dagManager.setActive(true);
     this.dagNumThreads = dagManager.getNumThreads();
+    Thread.sleep(10000);
+    // On active, should proceed request and delete action entry
+    Assert.assertEquals(dagActionStore.getDagActions().size(), 0);
   }
 
   @Test
@@ -77,9 +106,9 @@ public class DagManagerFlowTest {
     Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("1", flowExecutionId2, "FINISH_RUNNING", 1);
     Dag<JobExecutionPlan> dag3 = DagManagerTest.buildDag("2", flowExecutionId3, "FINISH_RUNNING", 1);
 
-    String dagId1 = DagManagerUtils.generateDagId(dag1);
-    String dagId2 = DagManagerUtils.generateDagId(dag2);
-    String dagId3 = DagManagerUtils.generateDagId(dag3);
+    String dagId1 = DagManagerUtils.generateDagId(dag1).toString();
+    String dagId2 = DagManagerUtils.generateDagId(dag2).toString();
+    String dagId3 = DagManagerUtils.generateDagId(dag3).toString();
 
     int queue1 = DagManagerUtils.getDagQueueId(dag1, dagNumThreads);
     int queue2 = DagManagerUtils.getDagQueueId(dag2, dagNumThreads);
@@ -144,7 +173,7 @@ public class DagManagerFlowTest {
   void testFlowSlaWithoutConfig() throws Exception {
     long flowExecutionId = System.currentTimeMillis();
     Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("3", flowExecutionId, "FINISH_RUNNING", 1);
-    String dagId = DagManagerUtils.generateDagId(dag);
+    String dagId = DagManagerUtils.generateDagId(dag).toString();
     int queue = DagManagerUtils.getDagQueueId(dag, dagNumThreads);
 
     when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow3"), eq("group3"), anyInt()))
@@ -181,7 +210,7 @@ public class DagManagerFlowTest {
   void testFlowSlaWithConfig() throws Exception {
     long flowExecutionId = System.currentTimeMillis();
     Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("4", flowExecutionId, "FINISH_RUNNING", 1);
-    String dagId = DagManagerUtils.generateDagId(dag);
+    String dagId = DagManagerUtils.generateDagId(dag).toString();
     int queue = DagManagerUtils.getDagQueueId(dag, dagNumThreads);
 
     when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow4"), eq("group4"), anyInt()))
@@ -221,7 +250,7 @@ public class DagManagerFlowTest {
   void testOrphanFlowKill() throws Exception {
     Long flowExecutionId = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(10);
     Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("6", flowExecutionId, "FINISH_RUNNING", 1);
-    String dagId = DagManagerUtils.generateDagId(dag);
+    String dagId = DagManagerUtils.generateDagId(dag).toString();
     int queue = DagManagerUtils.getDagQueueId(dag, dagNumThreads);
 
     // change config to set a small sla
@@ -296,7 +325,11 @@ class CancelPredicate implements Predicate<Void> {
 class MockedDagManager extends DagManager {
 
   public MockedDagManager(Config config, boolean instrumentationEnabled) {
-    super(config, createJobStatusRetriever(), instrumentationEnabled);
+    super(config, createJobStatusRetriever(), Optional.absent(), instrumentationEnabled);
+  }
+
+  public MockedDagManager(Config config, Optional<DagActionStore> dagactionStore, boolean instrumentationEnabled) {
+    super(config, createJobStatusRetriever(), dagactionStore, instrumentationEnabled);
   }
 
   private static JobStatusRetriever createJobStatusRetriever() {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index d85c708f4..655c4f895 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -18,6 +18,7 @@ package org.apache.gobblin.service.modules.orchestration;
 
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -75,8 +76,8 @@ public class DagManagerTest {
   private DagManagerMetrics _dagManagerMetrics;
   private DagManager.DagManagerThread _dagManagerThread;
   private LinkedBlockingQueue<Dag<JobExecutionPlan>> queue;
-  private LinkedBlockingQueue<String> cancelQueue;
-  private LinkedBlockingQueue<String> resumeQueue;
+  private LinkedBlockingQueue<DagManager.DagId> cancelQueue;
+  private LinkedBlockingQueue<DagManager.DagId> resumeQueue;
   private Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag;
   private Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs;
   private Map<String, Dag<JobExecutionPlan>> dags;
@@ -103,7 +104,8 @@ public class DagManagerTest {
     this._gobblinServiceQuotaManager = new InMemoryUserQuotaManager(quotaConfig);
     this._dagManagerMetrics = new DagManagerMetrics(metricContext);
     this._dagManagerMetrics.activate();
-    this._dagManagerThread = new DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, _failedDagStateStore, queue, cancelQueue,
+    this._dagManagerThread = new DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, _failedDagStateStore,
+        Optional.absent(), queue, cancelQueue,
         resumeQueue, true, new HashSet<>(), this._dagManagerMetrics, START_SLA_DEFAULT, _gobblinServiceQuotaManager, 0);
 
     Field jobToDagField = DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
@@ -206,7 +208,7 @@ public class DagManagerTest {
     String jobName2 = "job2";
 
     Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, "FINISH_RUNNING", true);
-    String dagId = DagManagerUtils.generateDagId(dag);
+    String dagId = DagManagerUtils.generateDagId(dag).toString();
 
     //Add a dag to the queue of dags
     this.queue.offer(dag);
@@ -287,7 +289,7 @@ public class DagManagerTest {
       String jobName2 = "job2";
 
       Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, failureOption, false);
-      String dagId = DagManagerUtils.generateDagId(dag);
+      String dagId = DagManagerUtils.generateDagId(dag).toString();
 
       //Add a dag to the queue of dags
       this.queue.offer(dag);
@@ -391,7 +393,7 @@ public class DagManagerTest {
     String jobName2 = "job2";
 
     Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, "FINISH_RUNNING", true);
-    String dagId = DagManagerUtils.generateDagId(dag);
+    String dagId = DagManagerUtils.generateDagId(dag).toString();
 
     //Add a dag to the queue of dags
     this.queue.offer(dag);
@@ -443,7 +445,7 @@ public class DagManagerTest {
     Assert.assertTrue(this.failedDagIds.contains(dagId));
 
     // Resume dag
-    this.resumeQueue.offer(dagId);
+    this.resumeQueue.offer(DagManagerUtils.generateDagId(dag));
 
     // Job2 rerunning
     this._dagManagerThread.run();
@@ -470,7 +472,7 @@ public class DagManagerTest {
     String jobName2 = "job2";
 
     Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, "FINISH_RUNNING", true);
-    String dagId = DagManagerUtils.generateDagId(dag);
+    String dagId = DagManagerUtils.generateDagId(dag).toString();
 
     //Add a dag to the queue of dags
     this.queue.offer(dag);
@@ -549,7 +551,7 @@ public class DagManagerTest {
     String jobName0 = "job0";
 
     Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, "FINISH_RUNNING", true);
-    String dagId = DagManagerUtils.generateDagId(dag);
+    String dagId = DagManagerUtils.generateDagId(dag).toString();
 
     //Add a dag to the queue of dags
     this.queue.offer(dag);
@@ -614,7 +616,7 @@ public class DagManagerTest {
     String jobName2 = "job2";
 
     Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, "FINISH_RUNNING", true);
-    String dagId = DagManagerUtils.generateDagId(dag);
+    String dagId = DagManagerUtils.generateDagId(dag).toString();
 
     //Add a dag to the queue of dags
     this.queue.offer(dag);
@@ -664,13 +666,13 @@ public class DagManagerTest {
     }
 
     // Cancel job2
-    this.cancelQueue.offer(dagId);
+    this.cancelQueue.offer(DagManagerUtils.generateDagId(dag));
 
     this._dagManagerThread.run();
     Assert.assertTrue(this.failedDagIds.contains(dagId));
 
     // Resume dag
-    this.resumeQueue.offer(dagId);
+    this.resumeQueue.offer(DagManagerUtils.generateDagId(dag));
 
     // Job2 rerunning
     this._dagManagerThread.run();
@@ -697,8 +699,8 @@ public class DagManagerTest {
     Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, "FINISH_RUNNING", false);
     Dag<JobExecutionPlan> dag1 = buildDag(flowGroupId1, flowExecutionId+1, "FINISH_RUNNING", false);
 
-    String dagId = DagManagerUtils.generateDagId(dag);
-    String dagId1 = DagManagerUtils.generateDagId(dag1);
+    String dagId = DagManagerUtils.generateDagId(dag).toString();
+    String dagId1 = DagManagerUtils.generateDagId(dag1).toString();
 
 
     //Add a dag to the queue of dags
@@ -828,7 +830,6 @@ public class DagManagerTest {
     JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor);
     jobExecutionPlans.add(jobExecutionPlan);
     Dag<JobExecutionPlan> dag = (new JobExecutionPlanDagFactory()).createDag(jobExecutionPlans);
-    String dagId = DagManagerUtils.generateDagId(dag);
 
     //Add a dag to the queue of dags
     this.queue.offer(dag);
@@ -1225,11 +1226,11 @@ public class DagManagerTest {
     private final Map<String, Dag<JobExecutionPlan>> dags = new ConcurrentHashMap<>();
 
     public void writeCheckpoint(Dag<JobExecutionPlan> dag) {
-      dags.put(DagManagerUtils.generateDagId(dag), dag);
+      dags.put(DagManagerUtils.generateDagId(dag).toString(), dag);
     }
 
     public void cleanUp(Dag<JobExecutionPlan> dag) {
-      cleanUp(DagManagerUtils.generateDagId(dag));
+      cleanUp(DagManagerUtils.generateDagId(dag).toString());
     }
 
     public void cleanUp(String dagId) {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
index 849b8801e..0806d77ca 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
@@ -112,7 +112,7 @@ public class FSDagStateStoreTest {
 
     this._dagStateStore.writeCheckpoint(dag);
     Assert.assertTrue(dagFile.exists());
-    this._dagStateStore.cleanUp(DagManagerUtils.generateDagId(dag));
+    this._dagStateStore.cleanUp(DagManagerUtils.generateDagId(dag).toString());
     Assert.assertFalse(dagFile.exists());
   }
 
@@ -129,7 +129,7 @@ public class FSDagStateStoreTest {
 
     List<Dag<JobExecutionPlan>> dags = this._dagStateStore.getDags();
     Assert.assertEquals(dags.size(), 2);
-    Dag<JobExecutionPlan> singleDag = this._dagStateStore.getDag(DagManagerUtils.generateDagId(dags.get(0)));
+    Dag<JobExecutionPlan> singleDag = this._dagStateStore.getDag(DagManagerUtils.generateDagId(dags.get(0)).toString());
     dags.add(singleDag);
     for (Dag<JobExecutionPlan> dag: dags) {
       Assert.assertEquals(dag.getNodes().size(), 2);
@@ -145,7 +145,7 @@ public class FSDagStateStoreTest {
     Set<String> dagIds = this._dagStateStore.getDagIds();
     Assert.assertEquals(dagIds.size(), 2);
     for (Dag<JobExecutionPlan> dag: dags) {
-      Assert.assertTrue(dagIds.contains(DagManagerUtils.generateDagId(dag)));
+      Assert.assertTrue(dagIds.contains(DagManagerUtils.generateDagId(dag).toString()));
     }
   }
 
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
index 60f0f6ca1..dfbbd6a92 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
@@ -80,15 +80,15 @@ public class MysqlDagStateStoreTest {
     _dagStateStore.writeCheckpoint(dag_1);
 
     // Verify get one dag
-    Dag<JobExecutionPlan> dag = _dagStateStore.getDag(DagManagerUtils.generateDagId(dag_0));
+    Dag<JobExecutionPlan> dag = _dagStateStore.getDag(DagManagerUtils.generateDagId(dag_0).toString());
     Assert.assertEquals(dag.getNodes().get(0), dag_0.getNodes().get(0));
     Assert.assertEquals(dag.getNodes().get(1), dag_0.getNodes().get(1));
 
     // Verify get dagIds
     Set<String> dagIds = _dagStateStore.getDagIds();
     Assert.assertEquals(dagIds.size(), 2);
-    Assert.assertTrue(dagIds.contains(DagManagerUtils.generateDagId(dag_0)));
-    Assert.assertTrue(dagIds.contains(DagManagerUtils.generateDagId(dag_1)));
+    Assert.assertTrue(dagIds.contains(DagManagerUtils.generateDagId(dag_0).toString()));
+    Assert.assertTrue(dagIds.contains(DagManagerUtils.generateDagId(dag_1).toString()));
 
     // Verify get all dags
     List<Dag<JobExecutionPlan>> dags = _dagStateStore.getDags();
@@ -145,7 +145,7 @@ public class MysqlDagStateStoreTest {
     Assert.assertEquals(dags.size(), 2);
 
     _dagStateStore.cleanUp(dags.get(0));
-    _dagStateStore.cleanUp(DagManagerUtils.generateDagId(dags.get(1)));
+    _dagStateStore.cleanUp(DagManagerUtils.generateDagId(dags.get(1)).toString());
 
     dags = _dagStateStore.getDags();
     Assert.assertEquals(dags.size(), 0);