You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/06/11 16:13:23 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-790] DagStateStore MySQL

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9dcc79d  [GOBBLIN-790] DagStateStore MySQL
9dcc79d is described below

commit 9dcc79dd406bb347b908e5a8709c40ac6ed34489
Author: autumnust <le...@linkedin.com>
AuthorDate: Tue Jun 11 09:13:16 2019 -0700

    [GOBBLIN-790] DagStateStore MySQL
    
    Closes #2656 from autumnust/DagStataToStateStore
---
 .../apache/gobblin/metastore/MysqlStateStore.java  |  73 ++++++---
 .../service/modules/orchestration/DagManager.java  |  22 ++-
 .../modules/orchestration/DagManagerUtils.java     |  43 ++++--
 .../modules/orchestration/DagStateStore.java       |   2 +-
 .../modules/orchestration/FSDagStateStore.java     |   3 +-
 .../modules/orchestration/MysqlDagStateStore.java  | 146 ++++++++++++++++++
 .../modules/orchestration/DagManagerTest.java      |   3 +-
 .../modules/orchestration/DagTestUtils.java        |  93 ++++++++++++
 .../modules/orchestration/FSDagStateStoreTest.java |  64 +-------
 .../orchestration/MysqlDagStateStoreTest.java      | 167 +++++++++++++++++++++
 10 files changed, 517 insertions(+), 99 deletions(-)

diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index b7694f6..a833a1d 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -90,6 +90,8 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
   private static final String SELECT_JOB_STATE_WITH_LIKE_TEMPLATE =
       "SELECT state FROM $TABLE$ WHERE store_name = ? and table_name like ?";
 
+  private static final String SELECT_ALL_JOBS_STATE = "SELECT state FROM $TABLE$";
+
   private static final String SELECT_JOB_STATE_EXISTS_TEMPLATE =
       "SELECT 1 FROM $TABLE$ WHERE store_name = ? and table_name = ?";
 
@@ -123,6 +125,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
 
   private final String UPSERT_JOB_STATE_SQL;
   private final String SELECT_JOB_STATE_SQL;
+  private final String SELECT_ALL_JOBS_STATE_SQL;
   private final String SELECT_JOB_STATE_WITH_LIKE_SQL;
   private final String SELECT_JOB_STATE_EXISTS_SQL;
   private final String SELECT_JOB_STATE_NAMES_SQL;
@@ -149,6 +152,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
     UPSERT_JOB_STATE_SQL = UPSERT_JOB_STATE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
     SELECT_JOB_STATE_SQL = SELECT_JOB_STATE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
     SELECT_JOB_STATE_WITH_LIKE_SQL = SELECT_JOB_STATE_WITH_LIKE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
+    SELECT_ALL_JOBS_STATE_SQL = SELECT_ALL_JOBS_STATE.replace("$TABLE$", stateStoreTableName);
     SELECT_JOB_STATE_EXISTS_SQL = SELECT_JOB_STATE_EXISTS_TEMPLATE.replace("$TABLE$", stateStoreTableName);
     SELECT_JOB_STATE_NAMES_SQL = SELECT_JOB_STATE_NAMES_TEMPLATE.replace("$TABLE$", stateStoreTableName);
     DELETE_JOB_STORE_SQL = DELETE_JOB_STORE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
@@ -335,33 +339,30 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
             SELECT_JOB_STATE_WITH_LIKE_SQL : SELECT_JOB_STATE_SQL)) {
       queryStatement.setString(1, storeName);
       queryStatement.setString(2, tableName);
-
-      try (ResultSet rs = queryStatement.executeQuery()) {
-        while (rs.next()) {
-          Blob blob = rs.getBlob(1);
-          Text key = new Text();
-
-          try (InputStream is = StreamUtils.isCompressed(blob.getBytes(1, 2)) ?
-              new GZIPInputStream(blob.getBinaryStream()) : blob.getBinaryStream();
-              DataInputStream dis = new DataInputStream(is)) {
-            // keep deserializing while we have data
-            while (dis.available() > 0) {
-              T state = this.stateClass.newInstance();
-              key.readString(dis);
-              state.readFields(dis);
-              states.add(state);
-            }
-          } catch (EOFException e) {
-            // no more data. GZIPInputStream.available() doesn't return 0 until after EOF.
-          }
-        }
-      }
+      execGetAllStatement(queryStatement, states);
     } catch (RuntimeException re) {
       throw re;
     } catch (Exception e) {
       throw new IOException("failure retrieving state from storeName " + storeName + " tableName " + tableName, e);
     }
+    return states;
+  }
+
+  /**
+   * An additional {@link #getAll()} method to retrieve all entries in a table.
+   *
+   */
+  public List<T> getAll() throws IOException {
+    List<T> states = Lists.newArrayList();
 
+    try (Connection connection = dataSource.getConnection();
+        PreparedStatement queryStatement = connection.prepareStatement(SELECT_ALL_JOBS_STATE_SQL)) {
+      execGetAllStatement(queryStatement, states);
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (Exception e) {
+      throw new IOException(String.format("failure retrieving all states with the SQL[%s]", SELECT_ALL_JOBS_STATE_SQL), e);
+    }
     return states;
   }
 
@@ -375,6 +376,36 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
     return getAll(storeName, "%", true);
   }
 
+  /**
+   * An helper function extracted from getAll method originally that has side effects:
+   * - Executing queryStatement
+   * - Put the result into List<state> object.
+   * @throws SQLException
+   * @throws Exception
+   */
+  private void execGetAllStatement(PreparedStatement queryStatement, List<T> states) throws SQLException, Exception {
+    try (ResultSet rs = queryStatement.executeQuery()) {
+      while (rs.next()) {
+        Blob blob = rs.getBlob(1);
+        Text key = new Text();
+
+        try (InputStream is = StreamUtils.isCompressed(blob.getBytes(1, 2)) ?
+            new GZIPInputStream(blob.getBinaryStream()) : blob.getBinaryStream();
+            DataInputStream dis = new DataInputStream(is)) {
+          // keep deserializing while we have data
+          while (dis.available() > 0) {
+            T state = this.stateClass.newInstance();
+            key.readString(dis);
+            state.readFields(dis);
+            states.add(state);
+          }
+        } catch (EOFException e) {
+          // no more data. GZIPInputStream.available() doesn't return 0 until after EOF.
+        }
+      }
+    }
+  }
+
   @Override
   public List<String> getTableNames(String storeName, Predicate<String> predicate) throws IOException {
     List<String> names = Lists.newArrayList();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 8140648..27da869 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -98,7 +98,8 @@ import static org.apache.gobblin.service.ExecutionStatus.valueOf;
 public class DagManager extends AbstractIdleService {
   public static final String DEFAULT_FLOW_FAILURE_OPTION = FailureOption.FINISH_ALL_POSSIBLE.name();
 
-  private static final String DAG_MANAGER_PREFIX = "gobblin.service.dagManager.";
+  public static final String DAG_MANAGER_PREFIX = "gobblin.service.dagManager.";
+
   private static final String JOB_STATUS_RETRIEVER_KEY = DAG_MANAGER_PREFIX + "jobStatusRetriever";
   private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
   private static final Integer DEFAULT_NUM_THREADS = 3;
@@ -109,8 +110,6 @@ public class DagManager extends AbstractIdleService {
   private static final String DEFAULT_JOB_STATUS_RETRIEVER_CLASS = FsJobStatusRetriever.class.getName();
   private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX + "dagStateStoreClass";
 
-  static final String DAG_STATESTORE_DIR = DAG_MANAGER_PREFIX + "dagStateStoreDir";
-
   /**
    * Action to be performed on a {@link Dag}, in case of a job failure. Currently, we allow 2 modes:
    * <ul>
@@ -582,11 +581,20 @@ public class DagManager extends AbstractIdleService {
       }
     }
 
-    private void cleanUpDag(String dagId) {
-      Dag<JobExecutionPlan> dag = this.dags.get(dagId);
-      this.dagToJobs.remove(dagId);
-      this.dagStateStore.cleanUp(dag);
+    /**
+     * Note that removal of a {@link Dag} entry in {@link #dags} needs to be happen after {@link #cleanUp()}
+     * since the real {@link Dag} object is required for {@link #cleanUp()},
+     * and cleaning of all relevant states need to be atomic
+     * @param dagId
+     */
+    private synchronized void cleanUpDag(String dagId) {
+       try {
+         this.dagStateStore.cleanUp(dags.get(dagId));
+       } catch (IOException ioe) {
+         log.error(String.format("Failed to clean %s from backStore due to:", dagId), ioe);
+       }
       this.dags.remove(dagId);
+      this.dagToJobs.remove(dagId);
     }
   }
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 1c6a086..14fc2bc 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -30,6 +30,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
@@ -38,17 +39,40 @@ import org.apache.gobblin.util.ConfigUtils;
 
 
 public class DagManagerUtils {
+
+  static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
+    Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    return new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+  }
+
+  static long getFlowExecId(Dag<JobExecutionPlan> dag) {
+    Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
+    return jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+  }
+
   /**
    * Generate a dagId from the given {@link Dag} instance.
    * @param dag instance of a {@link Dag}.
    * @return a String id associated corresponding to the {@link Dag} instance.
    */
   static String generateDagId(Dag<JobExecutionPlan> dag) {
-    Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
-    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
-    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
-    Long flowExecutionId = jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
-    return Joiner.on("_").join(flowGroup, flowName, flowExecutionId);
+    FlowId flowId = getFlowId(dag);
+    Long flowExecutionId = getFlowExecId(dag);
+    return Joiner.on("_").join(flowId.getFlowGroup(), flowId.getFlowName(), flowExecutionId);
+  }
+
+  /**
+   * Generate a FlowId from the given {@link Dag} instance.
+   * FlowId, comparing to DagId, doesn't contain FlowExecutionId so different {@link Dag} could possibly have same
+   * {@link FlowId}.
+   * @param dag
+   * @return
+   */
+  static String generateFlowIdInString(Dag<JobExecutionPlan> dag) {
+    FlowId flowId = getFlowId(dag);
+    return Joiner.on("_").join(flowId.getFlowGroup(), flowId.getFlowName());
   }
 
   /**
@@ -57,12 +81,9 @@ public class DagManagerUtils {
    * @return fully qualified name of the underlying {@link Dag}.
    */
   static String getFullyQualifiedDagName(Dag<JobExecutionPlan> dag) {
-    Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
-    String flowGroup = ConfigUtils.getString(jobConfig, ConfigurationKeys.FLOW_GROUP_KEY, "");
-    String flowName = ConfigUtils.getString(jobConfig, ConfigurationKeys.FLOW_NAME_KEY, "");
-    Long flowExecutionId = ConfigUtils.getLong(jobConfig, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 0L);
-
-    return "(flowGroup: " + flowGroup + ", flowName: " + flowName + ", flowExecutionId: " + flowExecutionId + ")";
+    FlowId flowid = getFlowId(dag);
+    long flowExecutionId = getFlowExecId(dag);
+    return "(flowGroup: " + flowid.getFlowGroup() + ", flowName: " + flowid.getFlowName() + ", flowExecutionId: " + flowExecutionId + ")";
   }
 
   static String getJobName(DagNode<JobExecutionPlan> dagNode) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
index 55ce6f0..cee56c2 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
@@ -44,7 +44,7 @@ public interface DagStateStore {
    * Delete the {@link Dag} from the backing store, typically upon completion of execution.
    * @param dag The dag completed/cancelled from execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}.
    */
-  void cleanUp(Dag<JobExecutionPlan> dag);
+  void cleanUp(Dag<JobExecutionPlan> dag) throws IOException;
 
   /**
    * Load all currently running {@link Dag}s from the underlying store. Typically, invoked when a new {@link DagManager}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
index 62789a8..11e5f16 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
@@ -50,12 +50,13 @@ import org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
 @Slf4j
 public class FSDagStateStore implements DagStateStore {
   public static final String DAG_FILE_EXTENSION = ".dag";
+  static final String DAG_STATESTORE_DIR = DagManager.DAG_MANAGER_PREFIX + "dagStateStoreDir";
 
   private final String dagCheckpointDir;
   private final GsonSerDe<List<JobExecutionPlan>> serDe;
 
   public FSDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) throws IOException {
-    this.dagCheckpointDir = config.getString(DagManager.DAG_STATESTORE_DIR);
+    this.dagCheckpointDir = config.getString(DAG_STATESTORE_DIR);
     File checkpointDir = new File(this.dagCheckpointDir);
     if (!checkpointDir.exists()) {
       if (!checkpointDir.mkdirs()) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
new file mode 100644
index 0000000..5e2bd83
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.MysqlStateStoreFactory;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.GsonSerDe;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
+
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonSerializer;
+import com.google.gson.reflect.TypeToken;
+import com.typesafe.config.Config;
+
+import static org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX;
+import static org.apache.gobblin.service.modules.orchestration.DagManagerUtils.generateFlowIdInString;
+import static org.apache.gobblin.service.modules.orchestration.DagManagerUtils.getFlowExecId;
+
+
+/**
+ * A implementation of {@link DagStateStore} using MySQL as a backup, leverage {@link MysqlStateStore}.
+ * It implements interfaces of {@link DagStateStore} but delegating responsibilities to methods provided
+ * in {@link MysqlStateStore}.
+ * It also implements conversion between {@link Dag<JobExecutionPlan>} to {@link State}.
+ *
+ * The schema of this will simply be:
+ * | storeName | tableName | State |
+ * where storeName represents FlowId, a combination of FlowGroup and FlowName, and tableName represents FlowExecutionId.
+ * State is a pocket for serialized {@link Dag} object.
+ *
+ *
+ */
+public class MysqlDagStateStore implements DagStateStore {
+
+  public static final String CONFIG_PREFIX = GOBBLIN_SERVICE_PREFIX + "mysqlDagStateStore";
+  public static final String DAG_KEY_IN_STATE = "dag";
+
+  /**
+   * The schema of {@link MysqlStateStore} is fixed but the columns are semantically projected into Dag's context:
+   * - The 'storeName' is FlowId.
+   * - The 'tableName' is FlowExecutionId.
+   */
+  private MysqlStateStore<State> mysqlStateStore;
+  private final GsonSerDe<List<JobExecutionPlan>> serDe;
+  private JobExecutionPlanDagFactory jobExecPlanDagFactory;
+
+  public MysqlDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) {
+    if (config.hasPath(CONFIG_PREFIX)) {
+      config = config.getConfig(CONFIG_PREFIX).withFallback(config);
+    }
+
+    this.mysqlStateStore = (MysqlStateStore<State>) createStateStore(config);
+
+    JsonSerializer<List<JobExecutionPlan>> serializer = new JobExecutionPlanListSerializer();
+    JsonDeserializer<List<JobExecutionPlan>> deserializer = new JobExecutionPlanListDeserializer(topologySpecMap);
+    Type typeToken = new TypeToken<List<JobExecutionPlan>>() {
+    }.getType();
+    this.serDe = new GsonSerDe<>(typeToken, serializer, deserializer);
+    this.jobExecPlanDagFactory = new JobExecutionPlanDagFactory();
+  }
+
+  /**
+   * Creating an instance of StateStore.
+   */
+  protected StateStore<State> createStateStore(Config config) {
+    try {
+      return (MysqlStateStoreFactory.class.newInstance()).createStateStore(config, State.class);
+    } catch (ReflectiveOperationException rfoe) {
+      throw new RuntimeException("A MySQL StateStore cannot be correctly initialized due to:", rfoe);
+    }
+  }
+
+  @Override
+  public void writeCheckpoint(Dag<JobExecutionPlan> dag)
+      throws IOException {
+    mysqlStateStore.put(generateFlowIdInString(dag), getFlowExecId(dag) + "", convertDagIntoState(dag));
+  }
+
+  @Override
+  public void cleanUp(Dag<JobExecutionPlan> dag)
+      throws IOException {
+    mysqlStateStore.delete(generateFlowIdInString(dag), getFlowExecId(dag) + "");
+  }
+
+  @Override
+  public List<Dag<JobExecutionPlan>> getDags()
+      throws IOException {
+    return mysqlStateStore.getAll().stream().map(this::convertStateObjIntoDag).collect(Collectors.toList());
+  }
+
+  /**
+   * For {@link Dag} to work with {@link MysqlStateStore}, it needs to be packaged into a {@link State} object.
+   * The way that it does is simply serialize the {@link Dag} first and use the key {@link #DAG_KEY_IN_STATE}
+   * to be pair with it.
+   *
+   * The serialization step is required for readability and portability of serde lib.
+   * @param dag The dag to be converted.
+   * @return An {@link State} object that contains a single k-v pair for {@link Dag}.
+   */
+  private State convertDagIntoState(Dag<JobExecutionPlan> dag) {
+    State outputState = new State();
+
+    // Make sure the object has been serialized.
+    List<JobExecutionPlan> jobExecutionPlanList =
+        dag.getNodes().stream().map(Dag.DagNode::getValue).collect(Collectors.toList());
+    outputState.setProp(DAG_KEY_IN_STATE, serDe.serialize(jobExecutionPlanList));
+    return outputState;
+  }
+
+  /**
+   * Get the {@link Dag} out of a {@link State} pocket.
+   */
+  private Dag<JobExecutionPlan> convertStateObjIntoDag(State state) {
+    String serializedJobExecPlanList = state.getProp(DAG_KEY_IN_STATE);
+    return jobExecPlanDagFactory.createDag(serDe.deserialize(serializedJobExecPlanList));
+  }
+}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index a67e5dd..562f8d8 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -44,7 +44,6 @@ import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
@@ -71,7 +70,7 @@ public class DagManagerTest {
   public void setUp() throws Exception {
     FileUtils.deleteDirectory(new File(this.dagStateStoreDir));
     Config config = ConfigFactory.empty()
-        .withValue(DagManager.DAG_STATESTORE_DIR, ConfigValueFactory.fromAnyRef(this.dagStateStoreDir));
+        .withValue(FSDagStateStore.DAG_STATESTORE_DIR, ConfigValueFactory.fromAnyRef(this.dagStateStoreDir));
 
     this._dagStateStore = new FSDagStateStore(config, new HashMap<>());
     this._jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagTestUtils.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagTestUtils.java
new file mode 100644
index 0000000..f63c6ad
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagTestUtils.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.hadoop.fs.Path;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+
+public class DagTestUtils {
+  private DagTestUtils() {
+
+  }
+
+  public static TopologySpec buildNaiveTopologySpec(String specUriInString) {
+    String specStoreDir = "/tmp/specStoreDir";
+    Properties properties = new Properties();
+    properties.put("specStore.fs.dir", specStoreDir);
+    properties.put("specExecInstance.capabilities", "source:destination");
+    properties.put("specExecInstance.uri", specUriInString);
+    properties.put("uri",specUriInString);
+
+    Config specExecConfig = ConfigUtils.propertiesToConfig(properties);
+    SpecExecutor specExecutorInstanceProducer = new InMemorySpecExecutor(specExecConfig);
+    TopologySpec.Builder topologySpecBuilder = TopologySpec.builder(new Path(specStoreDir).toUri())
+        .withConfig(specExecConfig)
+        .withDescription("test")
+        .withVersion("1")
+        .withSpecExecutor(specExecutorInstanceProducer);
+
+    return topologySpecBuilder.build();
+  }
+
+  /**
+   * Create a {@link Dag < JobExecutionPlan >} with one parent and one child.
+   * @return a Dag.
+   */
+  public static Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId) throws URISyntaxException {
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      String suffix = Integer.toString(i);
+      Config jobConfig = ConfigBuilder.create().
+          addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id).
+          addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id).
+          addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId).
+          addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).build();
+      if (i > 0) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job" + (i - 1)));
+      }
+      JobSpec js = JobSpec.builder("test_job" + suffix).withVersion(suffix).withConfig(jobConfig).
+          withTemplate(new URI("job" + suffix)).build();
+
+      SpecExecutor specExecutor = buildNaiveTopologySpec("mySpecExecutor").getSpecExecutor();
+      JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor);
+      jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
+      jobExecutionPlans.add(jobExecutionPlan);
+    }
+    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+  }
+}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
index 8611b66..a38929a 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
@@ -20,14 +20,11 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.fs.Path;
 import org.junit.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -38,17 +35,11 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
-import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
-import org.apache.gobblin.util.ConfigUtils;
 
 
 public class FSDagStateStoreTest {
@@ -64,63 +55,24 @@ public class FSDagStateStoreTest {
       throws IOException, URISyntaxException {
     this.checkpointDir = new File(dagStateStoreDir);
     FileUtils.deleteDirectory(this.checkpointDir);
-    Config config = ConfigFactory.empty().withValue(DagManager.DAG_STATESTORE_DIR, ConfigValueFactory.fromAnyRef(
+    Config config = ConfigFactory.empty().withValue(FSDagStateStore.DAG_STATESTORE_DIR, ConfigValueFactory.fromAnyRef(
         this.dagStateStoreDir));
     this.topologySpecMap = new HashMap<>();
 
-    String specStoreDir = "/tmp/specStoreDir";
-    String specExecInstance = "mySpecExecutor";
-    Properties properties = new Properties();
-    properties.put("specStore.fs.dir", specStoreDir);
-    properties.put("specExecInstance.capabilities", "source:destination");
-    properties.put("specExecInstance.uri", specExecInstance);
-    properties.put("uri",specExecInstance);
-
-    Config specExecConfig = ConfigUtils.propertiesToConfig(properties);
-    SpecExecutor specExecutorInstanceProducer = new InMemorySpecExecutor(specExecConfig);
-    TopologySpec.Builder topologySpecBuilder = TopologySpec.builder(new Path(specStoreDir).toUri())
-        .withConfig(specExecConfig)
-        .withDescription("test")
-        .withVersion("1")
-        .withSpecExecutor(specExecutorInstanceProducer);
-    this.topologySpec = topologySpecBuilder.build();
-    this.specExecURI = new URI(specExecInstance);
+    // Construct the TopologySpec and its map.
+    String specExecInstanceUriInString = "mySpecExecutor";
+    this.topologySpec = DagTestUtils.buildNaiveTopologySpec(specExecInstanceUriInString);
+    this.specExecURI = new URI(specExecInstanceUriInString);
     this.topologySpecMap.put(this.specExecURI, topologySpec);
 
     this._dagStateStore = new FSDagStateStore(config, this.topologySpecMap);
   }
 
-  /**
-   * Create a {@link Dag<JobExecutionPlan>} with one parent and one child.
-   * @return a Dag.
-   */
-  public Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId) throws URISyntaxException {
-    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
-    for (int i = 0; i < 2; i++) {
-      String suffix = Integer.toString(i);
-      Config jobConfig = ConfigBuilder.create().
-          addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id).
-          addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id).
-          addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId).
-          addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).build();
-      if (i > 0) {
-        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job" + (i - 1)));
-      }
-      JobSpec js = JobSpec.builder("test_job" + suffix).withVersion(suffix).withConfig(jobConfig).
-          withTemplate(new URI("job" + suffix)).build();
-      SpecExecutor specExecutor = this.topologySpec.getSpecExecutor();
-      JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor);
-      jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
-      jobExecutionPlans.add(jobExecutionPlan);
-    }
-    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
-  }
-
   @Test
   public void testWriteCheckpoint() throws IOException, URISyntaxException {
     long flowExecutionId = System.currentTimeMillis();
     String flowGroupId = "0";
-    Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId);
+    Dag<JobExecutionPlan> dag = DagTestUtils.buildDag(flowGroupId, flowExecutionId);
     this._dagStateStore.writeCheckpoint(dag);
 
     String fileName = DagManagerUtils.generateDagId(dag) + FSDagStateStore.DAG_FILE_EXTENSION;
@@ -148,7 +100,7 @@ public class FSDagStateStoreTest {
   public void testCleanUp() throws IOException, URISyntaxException {
     long flowExecutionId = System.currentTimeMillis();
     String flowGroupId = "0";
-    Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId);
+    Dag<JobExecutionPlan> dag = DagTestUtils.buildDag(flowGroupId, flowExecutionId);
     this._dagStateStore.writeCheckpoint(dag);
     String fileName = DagManagerUtils.generateDagId(dag) + FSDagStateStore.DAG_FILE_EXTENSION;
     File dagFile = new File(this.checkpointDir, fileName);
@@ -164,7 +116,7 @@ public class FSDagStateStoreTest {
     List<Long> flowExecutionIds = Lists.newArrayList(System.currentTimeMillis(), System.currentTimeMillis() + 1);
     for (int i = 0; i < 2; i++) {
       String flowGroupId = Integer.toString(i);
-      Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionIds.get(i));
+      Dag<JobExecutionPlan> dag = DagTestUtils.buildDag(flowGroupId, flowExecutionIds.get(i));
       this._dagStateStore.writeCheckpoint(dag);
     }
 
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
new file mode 100644
index 0000000..6ad85d6
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+
+/**
+ * Mainly testing functionalities related to DagStateStore but not Mysql-related components.
+ */
+public class MysqlDagStateStoreTest {
+
+  private DagStateStore _dagStateStore;
+  private Map<URI, TopologySpec> topologySpecMap;
+
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
+
+  @BeforeClass
+  public void setUp() throws Exception {
+
+
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+
+    // Constructing TopologySpecMap.
+    this.topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    this.topologySpecMap.put(specExecURI, topologySpec);
+
+    this._dagStateStore = new TestMysqlDagStateStore(configBuilder.build(), this.topologySpecMap);
+  }
+
+
+  @Test
+  public void testWriteCheckpointAndGetAll() throws Exception{
+    Dag<JobExecutionPlan> dag_0 = DagTestUtils.buildDag("random_0", 123L);
+    Dag<JobExecutionPlan> dag_1 = DagTestUtils.buildDag("random_1", 456L);
+    _dagStateStore.writeCheckpoint(dag_0);
+    _dagStateStore.writeCheckpoint(dag_1);
+
+    List<Dag<JobExecutionPlan>> dags = _dagStateStore.getDags();
+    Assert.assertEquals(dags.size(), 2);
+
+    // Verify dag contents
+    Dag<JobExecutionPlan> dagDeserialized = dags.get(0);
+    Assert.assertEquals(dagDeserialized.getNodes().size(), 2);
+    Assert.assertEquals(dagDeserialized.getStartNodes().size(), 1);
+    Assert.assertEquals(dagDeserialized.getEndNodes().size(), 1);
+    Dag.DagNode<JobExecutionPlan> child = dagDeserialized.getEndNodes().get(0);
+    Dag.DagNode<JobExecutionPlan> parent = dagDeserialized.getStartNodes().get(0);
+    Assert.assertEquals(dagDeserialized.getParentChildMap().size(), 1);
+    Assert.assertTrue(dagDeserialized.getParentChildMap().get(parent).contains(child));
+
+    for (int i = 0; i < 2; i++) {
+      JobExecutionPlan plan = dagDeserialized.getNodes().get(i).getValue();
+      Config jobConfig = plan.getJobSpec().getConfig();
+      Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY), "group" + "random_0");
+      Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY), "flow" + "random_0");
+      Assert.assertEquals(jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), 123L);
+      Assert.assertEquals(plan.getExecutionStatus(), ExecutionStatus.RUNNING);
+    }
+
+    dagDeserialized = dags.get(1);
+    Assert.assertEquals(dagDeserialized.getNodes().size(), 2);
+    Assert.assertEquals(dagDeserialized.getStartNodes().size(), 1);
+    Assert.assertEquals(dagDeserialized.getEndNodes().size(), 1);
+    child = dagDeserialized.getEndNodes().get(0);
+    parent = dagDeserialized.getStartNodes().get(0);
+    Assert.assertEquals(dagDeserialized.getParentChildMap().size(), 1);
+    Assert.assertTrue(dagDeserialized.getParentChildMap().get(parent).contains(child));
+
+    for (int i = 0; i < 2; i++) {
+      JobExecutionPlan plan = dagDeserialized.getNodes().get(i).getValue();
+      Config jobConfig = plan.getJobSpec().getConfig();
+      Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY), "group" + "random_1");
+      Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY), "flow" + "random_1");
+      Assert.assertEquals(jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), 456L);
+      Assert.assertEquals(plan.getExecutionStatus(), ExecutionStatus.RUNNING);
+    }
+  }
+
+  @Test (dependsOnMethods = "testWriteCheckpointAndGetAll")
+  public void testCleanUp() throws Exception {
+    Dag<JobExecutionPlan> dag_0 = DagTestUtils.buildDag("random_0", 123L);
+    Dag<JobExecutionPlan> dag_1 = DagTestUtils.buildDag("random_1", 456L);
+    _dagStateStore.writeCheckpoint(dag_0);
+    _dagStateStore.writeCheckpoint(dag_1);
+
+    List<Dag<JobExecutionPlan>> dags = _dagStateStore.getDags();
+    Assert.assertEquals(dags.size(), 2);
+
+    _dagStateStore.cleanUp(dags.get(0));
+    _dagStateStore.cleanUp(dags.get(1));
+
+    dags = _dagStateStore.getDags();
+    Assert.assertEquals(dags.size(), 0);
+  }
+
+  /**
+   * Only overwrite {@link #createStateStore(Config)} method to directly return a mysqlStateStore
+   * backed by mocked db.
+   */
+  public class TestMysqlDagStateStore extends MysqlDagStateStore {
+    public TestMysqlDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) {
+      super(config, topologySpecMap);
+    }
+
+    @Override
+    protected StateStore<State> createStateStore(Config config) {
+      try {
+        // Setting up mock DB
+        ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+        String jdbcUrl = testMetastoreDatabase.getJdbcUrl();
+        BasicDataSource mySqlDs = new BasicDataSource();
+
+        mySqlDs.setDriverClassName(ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER);
+        mySqlDs.setDefaultAutoCommit(false);
+        mySqlDs.setUrl(jdbcUrl);
+        mySqlDs.setUsername(TEST_USER);
+        mySqlDs.setPassword(TEST_PASSWORD);
+
+        return new MysqlStateStore<>(mySqlDs, TEST_DAG_STATE_STORE, false, State.class);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}
\ No newline at end of file