You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/10 18:59:33 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-725] add a mysql based job status retriever

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b28ddaf  [GOBBLIN-725] add a mysql based job status retriever
b28ddaf is described below

commit b28ddaf16948dd270b43ac2fc2b27824d4b6a5f8
Author: Arjun <ab...@linkedin.com>
AuthorDate: Wed Apr 10 11:59:27 2019 -0700

    [GOBBLIN-725] add a mysql based job status retriever
    
    Closes #2592 from
    arjun4084346/jobstatusstoretomysql
---
 .../gobblin/configuration/ConfigurationKeys.java   |   1 +
 .../metastore/FileContextBasedFsStateStore.java    |   1 +
 .../metastore/MysqlJobStatusStateStore.java        |  58 +++++++++
 .../metastore/MysqlJobStatusStateStoreFactory.java |  47 ++++++++
 .../org/apache/gobblin/kafka/KafkaTestBase.java    |   4 +
 .../metrics/kafka/KafkaEventKeyValueReporter.java  |   2 +-
 .../org/apache/gobblin/service/FlowStatusTest.java |   7 ++
 .../service/monitoring/JobStatusRetriever.java     |  36 +++++-
 gobblin-service/build.gradle                       |   1 +
 .../service/monitoring/FsJobStatusRetriever.java   |  49 ++------
 .../service/monitoring/KafkaJobStatusMonitor.java  |  29 ++++-
 .../monitoring/MysqlJobStatusRetriever.java        | 103 ++++++++++++++++
 .../service/modules/core/GobblinServiceHATest.java |  30 ++---
 .../modules/core/GobblinServiceManagerTest.java    |  14 +++
 .../monitoring/FsJobStatusRetrieverTest.java       | 129 +--------------------
 ...rieverTest.java => JobStatusRetrieverTest.java} | 119 ++++++++-----------
 .../monitoring/KafkaAvroJobStatusMonitorTest.java  |   8 +-
 .../monitoring/MysqlJobStatusRetrieverTest.java    |  54 +++++++++
 18 files changed, 425 insertions(+), 267 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 75fbc35..3b30c43 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -50,6 +50,7 @@ public class ConfigurationKeys {
   public static final String STATE_STORE_TYPE_KEY = "state.store.type";
   public static final String DATASET_STATE_STORE_PREFIX = "dataset";
   public static final String DATASET_STATE_STORE_TYPE_KEY = DATASET_STATE_STORE_PREFIX + ".state.store.type";
+  public static final String STATE_STORE_FACTORY_CLASS_KEY = "stateStoreFactoryClass";
   public static final String INTERMEDIATE_STATE_STORE_PREFIX = "intermediate";
   public static final String INTERMEDIATE_STATE_STORE_TYPE_KEY = INTERMEDIATE_STATE_STORE_PREFIX + ".state.store.type";
   public static final String DEFAULT_STATE_STORE_TYPE = "fs";
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStore.java
index ade7a50..1fb74a4 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStore.java
@@ -49,6 +49,7 @@ import org.apache.gobblin.util.HadoopUtils;
 public class FileContextBasedFsStateStore<T extends State> extends FsStateStore<T> {
   private FileContext fc;
 
+
   public FileContextBasedFsStateStore(String fsUri, String storeRootDir, Class stateClass)
       throws IOException {
     super(fsUri, storeRootDir, stateClass);
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
new file mode 100644
index 0000000..e84df60
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import java.io.IOException;
+import java.util.List;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.configuration.State;
+
+/**
+ * An implementation of {@link MysqlStateStore} backed by MySQL to store JobStatuses.
+ *
+ * @param <T> state object type
+ **/
+public class MysqlJobStatusStateStore<T extends State> extends MysqlStateStore {
+  /**
+   * Manages the persistence and retrieval of {@link State} in a MySQL database
+   * @param dataSource the {@link DataSource} object for connecting to MySQL
+   * @param stateStoreTableName the table for storing the state in rows keyed by two levels (store_name, table_name)
+   * @param compressedValues should values be compressed for storage?
+   * @param stateClass class of the {@link State}s stored in this state store
+   * @throws IOException
+   */
+  public MysqlJobStatusStateStore(DataSource dataSource, String stateStoreTableName, boolean compressedValues,
+      Class<T> stateClass)
+      throws IOException {
+    super(dataSource, stateStoreTableName, compressedValues, stateClass);
+  }
+
+  /**
+   * Returns all the job statuses for a flow group, flow name, flow execution id
+   * @param storeName
+   * @param flowExecutionId
+   * @return
+   * @throws IOException
+   */
+  public List<T> getAll(String storeName, long flowExecutionId) throws IOException {
+    return getAll(storeName, flowExecutionId + "%", true);
+  }
+
+}
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreFactory.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreFactory.java
new file mode 100644
index 0000000..f571046
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import org.apache.commons.dbcp.BasicDataSource;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class MysqlJobStatusStateStoreFactory extends MysqlStateStoreFactory {
+  @Override
+  public <T extends State> MysqlJobStatusStateStore<T> createStateStore(Config config, Class<T> stateClass) {
+    String stateStoreTableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
+        ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
+    boolean compressedValues = ConfigUtils.getBoolean(config, ConfigurationKeys.STATE_STORE_COMPRESSED_VALUES_KEY,
+        ConfigurationKeys.DEFAULT_STATE_STORE_COMPRESSED_VALUES);
+
+    try {
+      BasicDataSource basicDataSource = MysqlDataSourceFactory.get(config,
+          SharedResourcesBrokerFactory.getImplicitBroker());
+
+      return new MysqlJobStatusStateStore(basicDataSource, stateStoreTableName, compressedValues, stateClass);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create MysqlStateStore with factory", e);
+    }
+  }
+}
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java
index a89f29c..3dd0cd0 100644
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java
+++ b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java
@@ -247,5 +247,9 @@ public class KafkaTestBase implements Closeable {
   public int getKafkaServerPort() {
     return _kafkaServerSuite.getKafkaServerPort();
   }
+
+  public String getZkConnectString() {
+    return _kafkaServerSuite.getZkConnectString();
+  }
 }
 
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java
index cfe0dbc..983643e 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java
@@ -63,7 +63,7 @@ public class KafkaEventKeyValueReporter extends KafkaEventReporter {
           if (nextEvent.getMetadata().containsKey(keyPart)) {
             sb.append(nextEvent.getMetadata().get(keyPart));
           } else {
-            log.error("{} not found in the GobblinTrackingEvent. Setting key to null.", keyPart);
+            log.debug("{} not found in the GobblinTrackingEvent. Setting key to null.", keyPart);
             sb = null;
             break;
           }
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index f339eb2..c908b83 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -39,6 +39,8 @@ import com.google.inject.name.Names;
 import com.linkedin.restli.server.resources.BaseResource;
 
 import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.restli.EmbeddedRestliServer;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
@@ -64,6 +66,11 @@ public class FlowStatusTest {
     }
 
     @Override
+    public StateStore<State> getStateStore() {
+      return null;
+    }
+
+    @Override
     public List<Long> getLatestExecutionIdsForFlow(String flowName, String flowGroup, int count) {
       if (_listOfJobStatusLists == null) {
         return null;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 7f60d61..91bfa7e 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -22,7 +22,13 @@ import java.util.List;
 
 import com.google.common.collect.Iterators;
 
+import lombok.Getter;
+
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.event.TimingEvent;
 
 
 /**
@@ -32,7 +38,6 @@ import org.apache.gobblin.annotation.Alpha;
 public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker {
   public static final String EVENT_NAME_FIELD = "eventName";
   public static final String NA_KEY = "NA";
-  public static final String STATE_STORE_KEY_SEPARATION_CHARACTER = ".";
 
   public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup,
       long flowExecutionId);
@@ -55,4 +60,33 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
     return latestExecutionId == -1L ? Iterators.<JobStatus>emptyIterator()
         : getJobStatusesForFlowExecution(flowName, flowGroup, latestExecutionId);
   }
+
+
+  /**
+   *
+   * @param jobState instance of {@link State}
+   * @return deserialize {@link State} into a {@link JobStatus}.
+   */
+  protected JobStatus getJobStatus(State jobState) {
+    String flowGroup = jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+    String flowName = jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+    long flowExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
+    String jobName = jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+    String jobGroup = jobState.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+    long jobExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, "0"));
+    String eventName = jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+    long startTime = Long.parseLong(jobState.getProp(TimingEvent.METADATA_START_TIME, "0"));
+    long endTime = Long.parseLong(jobState.getProp(TimingEvent.METADATA_END_TIME, "0"));
+    String message = jobState.getProp(TimingEvent.METADATA_MESSAGE, "");
+    String lowWatermark = jobState.getProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, "");
+    String highWatermark = jobState.getProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, "");
+    long processedCount = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.PROCESSED_COUNT_FIELD, "0"));
+
+    return JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
+        jobName(jobName).jobGroup(jobGroup).jobExecutionId(jobExecutionId).eventName(eventName).
+        lowWatermark(lowWatermark).highWatermark(highWatermark).startTime(startTime).endTime(endTime).
+        message(message).processedCount(processedCount).build();
+  }
+
+  public abstract StateStore<State> getStateStore();
 }
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index d7e8ea2..ab56a3e 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -72,6 +72,7 @@ dependencies {
 
   testCompile project(":gobblin-example")
   testCompile project(path: ":gobblin-modules:gobblin-kafka-08:", configuration: "tests")
+  testCompile project(path: ":gobblin-metastore", configuration: "testFixtures")
   testCompile project(":gobblin-test-utils")
   testCompile externalDependency.byteman
   testCompile externalDependency.bytemanBmunit
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index 8312d69..f35e076 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -67,7 +67,7 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
     Preconditions.checkArgument(flowGroup != null, "FlowGroup cannot be null");
 
     Predicate<String> flowExecutionIdPredicate = input -> input.startsWith(String.valueOf(flowExecutionId) + ".");
-    String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
+    String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
     try {
       List<JobStatus> jobStatuses = new ArrayList<>();
       List<String> tableNames = this.stateStore.getTableNames(storeName, flowExecutionIdPredicate);
@@ -96,9 +96,8 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
     Preconditions.checkArgument(jobGroup != null, "jobGroup cannot be null");
 
     try {
-      String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
-      String tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName,
-          KafkaJobStatusMonitor.STATE_STORE_TABLE_SUFFIX);
+      String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
+      String tableName = KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup, jobName);
       List<State> jobStates = this.stateStore.getAll(storeName, tableName);
       if (jobStates.isEmpty()) {
         return Iterators.emptyIterator();
@@ -122,13 +121,11 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
     Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
     Preconditions.checkArgument(count > 0, "Number of execution ids must be at least 1.");
     try {
-      String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
+      String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
       List<String> tableNames = this.stateStore.getTableNames(storeName, input -> true);
-      if (tableNames.isEmpty()) {
-        return null;
-      }
-      Set<Long> flowExecutionIds =
-          new TreeSet<>(tableNames.stream().map(this::getExecutionIdFromTableName).collect(Collectors.toList())).descendingSet();
+      Set<Long> flowExecutionIds = new TreeSet<>(tableNames.stream()
+          .map(KafkaJobStatusMonitor::getExecutionIdFromTableName)
+          .collect(Collectors.toList())).descendingSet();
       return ImmutableList.copyOf(Iterables.limit(flowExecutionIds, count));
     } catch (Exception e) {
       return null;
@@ -136,36 +133,6 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
   }
 
   /**
-   *
-   * @param jobState instance of {@link State}
-   * @return deserialize {@link State} into a {@link JobStatus}.
-   */
-  private JobStatus getJobStatus(State jobState) {
-    String flowGroup = jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
-    String flowName = jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
-    long flowExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
-    String jobName = jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
-    String jobGroup = jobState.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
-    long jobExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, "0"));
-    String eventName = jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
-    long startTime = Long.parseLong(jobState.getProp(TimingEvent.METADATA_START_TIME, "0"));
-    long endTime = Long.parseLong(jobState.getProp(TimingEvent.METADATA_END_TIME, "0"));
-    String message = jobState.getProp(TimingEvent.METADATA_MESSAGE, "");
-    String lowWatermark = jobState.getProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, "");
-    String highWatermark = jobState.getProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, "");
-    long processedCount = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.PROCESSED_COUNT_FIELD, "0"));
-
-    return JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
-        jobName(jobName).jobGroup(jobGroup).jobExecutionId(jobExecutionId).eventName(eventName).
-        lowWatermark(lowWatermark).highWatermark(highWatermark).startTime(startTime).endTime(endTime).
-        message(message).processedCount(processedCount).build();
-  }
-
-  private long getExecutionIdFromTableName(String tableName) {
-    return Long.parseLong(Splitter.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(0));
-  }
-
-  /**
    * A helper method to determine if {@link JobStatus}es for jobs without a jobGroup/jobName should be filtered out.
    * Once a job has been orchestrated, {@link JobStatus}es without a jobGroup/jobName can be filtered out.
    * @param tableNames
@@ -174,6 +141,6 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
    */
   private boolean shouldFilterJobStatus(List<String> tableNames, String tableName) {
     return tableNames.size() > 1 && JobStatusRetriever.NA_KEY
-        .equals(Splitter.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(1));
+        .equals(Splitter.on(KafkaJobStatusMonitor.STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(1));
   }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 179a107..1675dea 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -32,8 +33,10 @@ import kafka.message.MessageAndMetadata;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
+import org.apache.gobblin.metastore.MysqlStateStoreFactory;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metastore.util.StateStoreCleanerRunnable;
 import org.apache.gobblin.metrics.event.TimingEvent;
@@ -53,16 +56,15 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
   //We use table suffix that is different from the Gobblin job state store suffix of jst to avoid confusion.
   //gst refers to the state store suffix for GaaS-orchestrated Gobblin jobs.
   public static final String STATE_STORE_TABLE_SUFFIX = "gst";
+  public static final String STATE_STORE_KEY_SEPARATION_CHARACTER = ".";
 
   static final String JOB_STATUS_MONITOR_TOPIC_KEY = "topic";
   static final String JOB_STATUS_MONITOR_NUM_THREADS_KEY = "numThreads";
   static final String JOB_STATUS_MONITOR_CLASS_KEY = "class";
   static final String DEFAULT_JOB_STATUS_MONITOR_CLASS = KafkaAvroJobStatusMonitor.class.getName();
-  static final String STATE_STORE_FACTORY_CLASS_KEY = "stateStoreFactoryClass";
 
   private static final String KAFKA_AUTO_OFFSET_RESET_KEY = "auto.offset.reset";
   private static final String KAFKA_AUTO_OFFSET_RESET_SMALLEST = "smallest";
-  private static final String KAFKA_AUTO_OFFSET_RESET_LARGEST = "largest";
 
   @Getter
   private final StateStore<org.apache.gobblin.configuration.State> stateStore;
@@ -73,7 +75,7 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
   public KafkaJobStatusMonitor(String topic, Config config, int numThreads)
       throws ReflectiveOperationException {
     super(topic, config.withFallback(DEFAULTS), numThreads);
-    String stateStoreFactoryClass = ConfigUtils.getString(config, STATE_STORE_FACTORY_CLASS_KEY, FileContextBasedFsStateStoreFactory.class.getName());
+    String stateStoreFactoryClass = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, FileContextBasedFsStateStoreFactory.class.getName());
 
     this.stateStore =
         ((StateStore.Factory) Class.forName(stateStoreFactoryClass).newInstance()).createStateStore(config, org.apache.gobblin.configuration.State.class);
@@ -125,13 +127,30 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
       throws IOException {
     String flowName = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
     String flowGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
-    String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
     String flowExecutionId = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
     String jobName = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,JobStatusRetriever.NA_KEY);
     String jobGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY);
-    String tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName, STATE_STORE_TABLE_SUFFIX);
+
+    String storeName = jobStatusStoreName(flowGroup, flowName);
+    String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);
     this.stateStore.put(storeName, tableName, jobStatus);
   }
 
+  public static String jobStatusTableName(String flowExecutionId, String jobGroup, String jobName) {
+    return Joiner.on(STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName, STATE_STORE_TABLE_SUFFIX);
+  }
+
+  public static String jobStatusTableName(long flowExecutionId, String jobGroup, String jobName) {
+    return jobStatusTableName(String.valueOf(flowExecutionId), jobGroup, jobName);
+  }
+
+  public static String jobStatusStoreName(String flowGroup, String flowName) {
+    return Joiner.on(STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
+  }
+
+  public static long getExecutionIdFromTableName(String tableName) {
+    return Long.parseLong(Splitter.on(STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(0));
+  }
+
   public abstract org.apache.gobblin.configuration.State parseJobStatus(byte[] message) throws IOException;
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
new file mode 100644
index 0000000..627e421
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
+import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+/**
+ * Mysql based Retriever for {@link JobStatus}.
+ */
+public class MysqlJobStatusRetriever extends JobStatusRetriever {
+
+  public static final String CONF_PREFIX = "mysqlJobStatusRetriever";
+  @Getter
+  private MysqlJobStatusStateStore<State> stateStore;
+
+  public MysqlJobStatusRetriever(Config config) throws ReflectiveOperationException {
+    config = config.getConfig(CONF_PREFIX).withFallback(config);
+    this.stateStore = (MysqlJobStatusStateStoreFactory.class.newInstance()).createStateStore(config, State.class);
+  }
+
+  @Override
+  public Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup, long flowExecutionId) {
+    String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
+    try {
+      List<State> jobStatusStates = this.stateStore.getAll(storeName, flowExecutionId);
+      return filterAndGetJobStatuses(jobStatusStates);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup, long flowExecutionId,
+      String jobName, String jobGroup) {
+    String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
+    String tableName = KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup, jobName);
+
+    try {
+      List<State> jobStatusStates = this.stateStore.getAll(storeName, tableName);
+      return filterAndGetJobStatuses(jobStatusStates);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public List<Long> getLatestExecutionIdsForFlow(String flowName, String flowGroup, int count) {
+    String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
+
+    try {
+      List<State> jobStatusStates = this.stateStore.getAll(storeName);
+      List<Long> flowExecutionIds = jobStatusStates.stream()
+          .map(state -> Long.parseLong(state.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)))
+          .collect(Collectors.toList());
+      return ImmutableList.copyOf(Iterables.limit(new TreeSet<>(flowExecutionIds).descendingSet(), count));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Iterator<JobStatus> filterAndGetJobStatuses(List<State> jobStatusStates) {
+    int totalJobStatuses = jobStatusStates.size();
+
+    return jobStatusStates.stream()
+        .filter(state -> !(totalJobStatuses > 1 && state.getProp(JobStatusRetriever.EVENT_NAME_FIELD)
+            .equals(ExecutionStatus.COMPILED.name())))
+        .map(this::getJobStatus)
+        .iterator();
+  }
+}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index 11b2c62..efb1a6d 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -34,23 +34,18 @@ import org.testng.annotations.Test;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
 import com.linkedin.data.template.StringMap;
 import com.linkedin.restli.client.RestLiResponseException;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
-import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
-import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
+import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowConfigClient;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.Schedule;
 import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.utils.HelixUtils;
 import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
@@ -59,7 +54,6 @@ import org.apache.gobblin.util.ConfigUtils;
 public class GobblinServiceHATest {
 
   private static final Logger logger = LoggerFactory.getLogger(GobblinServiceHATest.class);
-  private static Gson gson = new GsonBuilder().setPrettyPrinting().create();
 
   private static final String QUARTZ_INSTANCE_NAME = "org.quartz.scheduler.instanceName";
   private static final String QUARTZ_THREAD_POOL_COUNT = "org.quartz.threadPool.threadCount";
@@ -91,22 +85,11 @@ public class GobblinServiceHATest {
   private static final String TEST_FLOW_NAME_2 = "testFlow2";
   private static final String TEST_SCHEDULE_2 = "0 1/0 * ? * *";
   private static final String TEST_TEMPLATE_URI_2 = "FS:///templates/test.template";
-  private static final String TEST_DUMMY_GROUP_NAME_2 = "dummyGroup";
-  private static final String TEST_DUMMY_FLOW_NAME_2 = "dummyFlow";
 
   private static final String TEST_GOBBLIN_EXECUTOR_NAME = "testGobblinExecutor";
   private static final String TEST_SOURCE_NAME = "testSource";
   private static final String TEST_SINK_NAME = "testSink";
 
-  private ServiceBasedAppLauncher serviceLauncher;
-  private TopologyCatalog topologyCatalog;
-  private TopologySpec topologySpec;
-
-  private FlowCatalog flowCatalog;
-  private FlowSpec flowSpec;
-
-  private Orchestrator orchestrator;
-
   private GobblinServiceManager node1GobblinServiceManager;
   private FlowConfigClient node1FlowConfigClient;
 
@@ -133,6 +116,8 @@ public class GobblinServiceHATest {
     logger.info("Testing ZK Server listening on: " + testingZKServer.getConnectString());
     HelixUtils.createGobblinHelixCluster(testingZKServer.getConnectString(), TEST_HELIX_CLUSTER_NAME);
 
+    ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
     Properties commonServiceCoreProperties = new Properties();
     commonServiceCoreProperties.put(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY, testingZKServer.getConnectString());
     commonServiceCoreProperties.put(ServiceConfigKeys.HELIX_CLUSTER_NAME_KEY, TEST_HELIX_CLUSTER_NAME);
@@ -148,6 +133,11 @@ public class GobblinServiceHATest {
         "org.gobblin.service.InMemorySpecExecutor");
     commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".specExecInstance.capabilities",
         TEST_SOURCE_NAME + ":" + TEST_SINK_NAME);
+    commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_USER_KEY, "testUser");
+    commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "testPassword");
+    commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl());
+    commonServiceCoreProperties.put("zookeeper.connect", testingZKServer.getConnectString());
+    commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, MysqlJobStatusStateStoreFactory.class.getName());
 
     Properties node1ServiceCoreProperties = new Properties();
     node1ServiceCoreProperties.putAll(commonServiceCoreProperties);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
index 32dc529..efe1b95 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
@@ -48,6 +48,10 @@ import com.linkedin.data.template.StringMap;
 import com.linkedin.restli.client.RestLiResponseException;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.service.FlowConfig;
@@ -93,8 +97,18 @@ public class GobblinServiceManagerTest {
   public void setup() throws Exception {
     cleanUpDir(SERVICE_WORK_DIR);
     cleanUpDir(SPEC_STORE_PARENT_DIR);
+    ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    KafkaTestBase kafkaTestHelper = new KafkaTestBase();
+    kafkaTestHelper.startServers();
 
     Properties serviceCoreProperties = new Properties();
+    serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_USER_KEY, "testUser");
+    serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "testPassword");
+    serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl());
+    serviceCoreProperties.put("zookeeper.connect", kafkaTestHelper.getZkConnectString());
+    serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, MysqlJobStatusStateStoreFactory.class.getName());
+
     serviceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, TOPOLOGY_SPEC_STORE_DIR);
     serviceCoreProperties.put(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, FLOW_SPEC_STORE_DIR);
     serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY, TEST_GOBBLIN_EXECUTOR_NAME);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
index fed5c47..fac2ff0 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
@@ -41,140 +41,23 @@ import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.service.ExecutionStatus;
 
 
-public class FsJobStatusRetrieverTest {
-  private FsJobStatusRetriever jobStatusRetriever;
-  private FileContextBasedFsStateStore fsStateStore;
-  private String stateStoreDir = "/tmp/jobStatusRetrieverTest/statestore";
+public class FsJobStatusRetrieverTest extends JobStatusRetrieverTest {
 
-  private String flowGroup = "myFlowGroup";
-  private String flowName = "myFlowName";
-  private String jobGroup;
-  private String myJobGroup = "myJobGroup";
-  private long jobExecutionId = 1111L;
-  private String message = "https://myServer:8143/1234/1111";
+  private String stateStoreDir = "/tmp/jobStatusRetrieverTest/statestore";
 
   @BeforeClass
   public void setUp() throws Exception {
-    cleanUpDir(stateStoreDir);
+    cleanUpDir();
     Config config = ConfigFactory.empty().withValue(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
         ConfigValueFactory.fromAnyRef(stateStoreDir));
     this.jobStatusRetriever = new FsJobStatusRetriever(config);
-    this.fsStateStore = this.jobStatusRetriever.getStateStore();
-  }
-
-  private void addJobStatusToStateStore(Long flowExecutionId, String jobName) throws IOException {
-    Properties properties = new Properties();
-    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
-    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
-    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(flowExecutionId));
-    properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
-    if (!jobName.equals(JobStatusRetriever.NA_KEY)) {
-      this.jobGroup = myJobGroup;
-      properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, myJobGroup);
-      properties.setProperty(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, String.valueOf(this.jobExecutionId));
-      properties.setProperty(TimingEvent.METADATA_MESSAGE, this.message);
-      properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
-    } else {
-      this.jobGroup = JobStatusRetriever.NA_KEY;
-      properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY);
-      properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPILED.name());
-    }
-    properties.setProperty(TimingEvent.METADATA_START_TIME, "1");
-    properties.setProperty(TimingEvent.METADATA_END_TIME, "2");
-    State jobStatus = new State(properties);
-
-    String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
-    String tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName, KafkaJobStatusMonitor.STATE_STORE_TABLE_SUFFIX);
-
-    this.fsStateStore.put(storeName, tableName, jobStatus);
   }
 
-  @Test
-  public void testGetJobStatusesForFlowExecution() throws IOException {
-    Long flowExecutionId = 1234L;
-    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY);
-
-    Iterator<JobStatus> jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
-    Assert.assertTrue(jobStatusIterator.hasNext());
-    JobStatus jobStatus = jobStatusIterator.next();
-    Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.COMPILED.name());
-    Assert.assertEquals(jobStatus.getJobName(), (JobStatusRetriever.NA_KEY));
-    Assert.assertEquals(jobStatus.getJobGroup(), JobStatusRetriever.NA_KEY);
-    Assert.assertEquals(jobStatus.getProcessedCount(), 0);
-    Assert.assertEquals(jobStatus.getLowWatermark(), "");
-    Assert.assertEquals(jobStatus.getHighWatermark(), "");
-
-    addJobStatusToStateStore(flowExecutionId,"myJobName1");
-    jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
-    jobStatus = jobStatusIterator.next();
-    Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.RUNNING.name());
-    Assert.assertEquals(jobStatus.getJobName(), "myJobName1");
-    Assert.assertEquals(jobStatus.getJobGroup(), jobGroup);
-    Assert.assertFalse(jobStatusIterator.hasNext());
-
-    addJobStatusToStateStore(flowExecutionId,"myJobName2");
-    jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
-    Assert.assertTrue(jobStatusIterator.hasNext());
-    jobStatus = jobStatusIterator.next();
-    Assert.assertTrue(jobStatus.getJobName().equals("myJobName1") || jobStatus.getJobName().equals("myJobName2"));
-
-    String jobName = jobStatus.getJobName();
-    String nextExpectedJobName = ("myJobName1".equals(jobName)) ? "myJobName2" : "myJobName1";
-    Assert.assertTrue(jobStatusIterator.hasNext());
-    jobStatus = jobStatusIterator.next();
-    Assert.assertEquals(jobStatus.getJobName(), nextExpectedJobName);
-  }
-
-  @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution")
-  public void testGetJobStatusesForFlowExecution1() {
-    long flowExecutionId = 1234L;
-    String jobName = "myJobName1";
-    String jobGroup = "myJobGroup";
-    Iterator<JobStatus> jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId, jobName, jobGroup);
-
-    Assert.assertTrue(jobStatusIterator.hasNext());
-    JobStatus jobStatus = jobStatusIterator.next();
-    Assert.assertEquals(jobStatus.getJobName(), jobName);
-    Assert.assertEquals(jobStatus.getJobGroup(), jobGroup);
-    Assert.assertEquals(jobStatus.getJobExecutionId(), jobExecutionId);
-    Assert.assertEquals(jobStatus.getFlowName(), flowName);
-    Assert.assertEquals(jobStatus.getFlowGroup(), flowGroup);
-    Assert.assertEquals(jobStatus.getFlowExecutionId(), flowExecutionId);
-    Assert.assertEquals(jobStatus.getMessage(), message);
-  }
-
-  @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution1")
-  public void testGetLatestExecutionIdsForFlow() throws Exception {
-    //Add new flow execution to state store
-    long flowExecutionId1 = 1235L;
-    addJobStatusToStateStore(flowExecutionId1, "myJobName1");
-    long latestExecutionIdForFlow = this.jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup);
-    Assert.assertEquals(latestExecutionIdForFlow, flowExecutionId1);
-
-    long flowExecutionId2 = 1236L;
-    addJobStatusToStateStore(flowExecutionId2, "myJobName1");
-
-    //State store now has 3 flow executions - 1234, 1235, 1236. Get the latest 2 executions i.e. 1235 and 1236.
-    List<Long> latestFlowExecutionIds = this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 2);
-    Assert.assertEquals(latestFlowExecutionIds.size(), 2);
-    Assert.assertEquals(latestFlowExecutionIds.get(0), (Long) flowExecutionId2);
-    Assert.assertEquals(latestFlowExecutionIds.get(1), (Long) flowExecutionId1);
-
-    //Remove all flow executions from state store
-    cleanUpDir(stateStoreDir);
-    Assert.assertNull(this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1));
-    Assert.assertEquals(this.jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup), -1L);
-  }
-
-  private void cleanUpDir(String dir) throws Exception {
-    File specStoreDir = new File(dir);
+  @Override
+  protected void cleanUpDir() throws Exception {
+    File specStoreDir = new File(this.stateStoreDir);
     if (specStoreDir.exists()) {
       FileUtils.deleteDirectory(specStoreDir);
     }
   }
-
-  @AfterClass
-  public void tearDown() throws Exception {
-    cleanUpDir(stateStoreDir);
-  }
 }
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
similarity index 57%
copy from gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
copy to gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index fed5c47..83cb0fe 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -17,65 +17,47 @@
 
 package org.apache.gobblin.service.monitoring;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 
-import org.apache.commons.io.FileUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Joiner;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.service.ExecutionStatus;
 
 
-public class FsJobStatusRetrieverTest {
-  private FsJobStatusRetriever jobStatusRetriever;
-  private FileContextBasedFsStateStore fsStateStore;
-  private String stateStoreDir = "/tmp/jobStatusRetrieverTest/statestore";
-
-  private String flowGroup = "myFlowGroup";
-  private String flowName = "myFlowName";
+public abstract class JobStatusRetrieverTest {
+  protected static final String FLOW_GROUP = "myFlowGroup";
+  protected static final String FLOW_NAME = "myFlowName";
   private String jobGroup;
-  private String myJobGroup = "myJobGroup";
-  private long jobExecutionId = 1111L;
-  private String message = "https://myServer:8143/1234/1111";
-
-  @BeforeClass
-  public void setUp() throws Exception {
-    cleanUpDir(stateStoreDir);
-    Config config = ConfigFactory.empty().withValue(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
-        ConfigValueFactory.fromAnyRef(stateStoreDir));
-    this.jobStatusRetriever = new FsJobStatusRetriever(config);
-    this.fsStateStore = this.jobStatusRetriever.getStateStore();
-  }
+  private static final String myJobGroup = "myJobGroup";
+  private static final String MY_JOB_NAME_1 = "myJobName1";
+  private static final String MY_JOB_NAME_2 = "myJobName2";
+  private static final long JOB_EXECUTION_ID = 1111L;
+  private static final String MESSAGE = "https://myServer:8143/1234/1111";
+  JobStatusRetriever jobStatusRetriever;
+
+  abstract void setUp() throws Exception;
 
-  private void addJobStatusToStateStore(Long flowExecutionId, String jobName) throws IOException {
+  protected void addJobStatusToStateStore(Long flowExecutionId, String jobName) throws IOException {
     Properties properties = new Properties();
-    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
-    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
+    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, FLOW_GROUP);
+    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, FLOW_NAME);
     properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(flowExecutionId));
     properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
     if (!jobName.equals(JobStatusRetriever.NA_KEY)) {
-      this.jobGroup = myJobGroup;
+      jobGroup = myJobGroup;
       properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, myJobGroup);
-      properties.setProperty(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, String.valueOf(this.jobExecutionId));
-      properties.setProperty(TimingEvent.METADATA_MESSAGE, this.message);
+      properties.setProperty(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, String.valueOf(JOB_EXECUTION_ID));
+      properties.setProperty(TimingEvent.METADATA_MESSAGE, MESSAGE);
       properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
     } else {
-      this.jobGroup = JobStatusRetriever.NA_KEY;
+      jobGroup = JobStatusRetriever.NA_KEY;
       properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY);
       properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPILED.name());
     }
@@ -83,10 +65,10 @@ public class FsJobStatusRetrieverTest {
     properties.setProperty(TimingEvent.METADATA_END_TIME, "2");
     State jobStatus = new State(properties);
 
-    String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
-    String tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName, KafkaJobStatusMonitor.STATE_STORE_TABLE_SUFFIX);
+    String storeName = KafkaJobStatusMonitor.jobStatusStoreName(FLOW_GROUP, FLOW_NAME);
+    String tableName = KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup, jobName);
 
-    this.fsStateStore.put(storeName, tableName, jobStatus);
+    this.jobStatusRetriever.getStateStore().put(storeName, tableName, jobStatus);
   }
 
   @Test
@@ -94,7 +76,8 @@ public class FsJobStatusRetrieverTest {
     Long flowExecutionId = 1234L;
     addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY);
 
-    Iterator<JobStatus> jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
+    Iterator<JobStatus>
+        jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
     Assert.assertTrue(jobStatusIterator.hasNext());
     JobStatus jobStatus = jobStatusIterator.next();
     Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.COMPILED.name());
@@ -104,22 +87,22 @@ public class FsJobStatusRetrieverTest {
     Assert.assertEquals(jobStatus.getLowWatermark(), "");
     Assert.assertEquals(jobStatus.getHighWatermark(), "");
 
-    addJobStatusToStateStore(flowExecutionId,"myJobName1");
-    jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1);
+    jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
     jobStatus = jobStatusIterator.next();
     Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.RUNNING.name());
-    Assert.assertEquals(jobStatus.getJobName(), "myJobName1");
+    Assert.assertEquals(jobStatus.getJobName(), MY_JOB_NAME_1);
     Assert.assertEquals(jobStatus.getJobGroup(), jobGroup);
     Assert.assertFalse(jobStatusIterator.hasNext());
 
-    addJobStatusToStateStore(flowExecutionId,"myJobName2");
-    jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
+    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_2);
+    jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
     Assert.assertTrue(jobStatusIterator.hasNext());
     jobStatus = jobStatusIterator.next();
-    Assert.assertTrue(jobStatus.getJobName().equals("myJobName1") || jobStatus.getJobName().equals("myJobName2"));
+    Assert.assertTrue(jobStatus.getJobName().equals(MY_JOB_NAME_1) || jobStatus.getJobName().equals(MY_JOB_NAME_2));
 
     String jobName = jobStatus.getJobName();
-    String nextExpectedJobName = ("myJobName1".equals(jobName)) ? "myJobName2" : "myJobName1";
+    String nextExpectedJobName = (MY_JOB_NAME_1.equals(jobName)) ? MY_JOB_NAME_2 : MY_JOB_NAME_1;
     Assert.assertTrue(jobStatusIterator.hasNext());
     jobStatus = jobStatusIterator.next();
     Assert.assertEquals(jobStatus.getJobName(), nextExpectedJobName);
@@ -128,53 +111,47 @@ public class FsJobStatusRetrieverTest {
   @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution")
   public void testGetJobStatusesForFlowExecution1() {
     long flowExecutionId = 1234L;
-    String jobName = "myJobName1";
-    String jobGroup = "myJobGroup";
-    Iterator<JobStatus> jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId, jobName, jobGroup);
+    Iterator<JobStatus> jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId,
+        MY_JOB_NAME_1, myJobGroup);
 
     Assert.assertTrue(jobStatusIterator.hasNext());
     JobStatus jobStatus = jobStatusIterator.next();
-    Assert.assertEquals(jobStatus.getJobName(), jobName);
-    Assert.assertEquals(jobStatus.getJobGroup(), jobGroup);
-    Assert.assertEquals(jobStatus.getJobExecutionId(), jobExecutionId);
-    Assert.assertEquals(jobStatus.getFlowName(), flowName);
-    Assert.assertEquals(jobStatus.getFlowGroup(), flowGroup);
+    Assert.assertEquals(jobStatus.getJobName(), MY_JOB_NAME_1);
+    Assert.assertEquals(jobStatus.getJobGroup(), myJobGroup);
+    Assert.assertEquals(jobStatus.getJobExecutionId(), JOB_EXECUTION_ID);
+    Assert.assertEquals(jobStatus.getFlowName(), FLOW_NAME);
+    Assert.assertEquals(jobStatus.getFlowGroup(), FLOW_GROUP);
     Assert.assertEquals(jobStatus.getFlowExecutionId(), flowExecutionId);
-    Assert.assertEquals(jobStatus.getMessage(), message);
+    Assert.assertEquals(jobStatus.getMessage(), MESSAGE);
   }
 
   @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution1")
   public void testGetLatestExecutionIdsForFlow() throws Exception {
     //Add new flow execution to state store
     long flowExecutionId1 = 1235L;
-    addJobStatusToStateStore(flowExecutionId1, "myJobName1");
-    long latestExecutionIdForFlow = this.jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup);
+    addJobStatusToStateStore(flowExecutionId1, MY_JOB_NAME_1);
+    long latestExecutionIdForFlow = this.jobStatusRetriever.getLatestExecutionIdForFlow(FLOW_NAME, FLOW_GROUP);
     Assert.assertEquals(latestExecutionIdForFlow, flowExecutionId1);
 
     long flowExecutionId2 = 1236L;
-    addJobStatusToStateStore(flowExecutionId2, "myJobName1");
+    addJobStatusToStateStore(flowExecutionId2, MY_JOB_NAME_1);
 
     //State store now has 3 flow executions - 1234, 1235, 1236. Get the latest 2 executions i.e. 1235 and 1236.
-    List<Long> latestFlowExecutionIds = this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 2);
+    List<Long> latestFlowExecutionIds = this.jobStatusRetriever.getLatestExecutionIdsForFlow(FLOW_NAME, FLOW_GROUP, 2);
     Assert.assertEquals(latestFlowExecutionIds.size(), 2);
     Assert.assertEquals(latestFlowExecutionIds.get(0), (Long) flowExecutionId2);
     Assert.assertEquals(latestFlowExecutionIds.get(1), (Long) flowExecutionId1);
 
     //Remove all flow executions from state store
-    cleanUpDir(stateStoreDir);
-    Assert.assertNull(this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1));
-    Assert.assertEquals(this.jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup), -1L);
+    cleanUpDir();
+    Assert.assertEquals(this.jobStatusRetriever.getLatestExecutionIdsForFlow(FLOW_NAME, FLOW_GROUP, 1).size(), 0);
+    Assert.assertEquals(this.jobStatusRetriever.getLatestExecutionIdForFlow(FLOW_NAME, FLOW_GROUP), -1L);
   }
 
-  private void cleanUpDir(String dir) throws Exception {
-    File specStoreDir = new File(dir);
-    if (specStoreDir.exists()) {
-      FileUtils.deleteDirectory(specStoreDir);
-    }
-  }
+  abstract void cleanUpDir() throws Exception;
 
   @AfterClass
   public void tearDown() throws Exception {
-    cleanUpDir(stateStoreDir);
+    cleanUpDir();
   }
-}
\ No newline at end of file
+}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
index d34fafd..ad48f34 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
@@ -126,9 +126,8 @@ public class KafkaAvroJobStatusMonitorTest {
     jobStatusMonitor.processMessage(messageAndMetadata);
 
     StateStore stateStore = jobStatusMonitor.getStateStore();
-    String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(this.flowGroup, this.flowName);
-    String tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(this.flowExecutionId, "NA", "NA",
-        KafkaJobStatusMonitor.STATE_STORE_TABLE_SUFFIX);
+    String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
+    String tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
     List<State> stateList  = stateStore.getAll(storeName, tableName);
     Assert.assertEquals(stateList.size(), 1);
     State state = stateList.get(0);
@@ -137,8 +136,7 @@ public class KafkaAvroJobStatusMonitorTest {
     messageAndMetadata = iterator.next();
     jobStatusMonitor.processMessage(messageAndMetadata);
 
-    tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(this.flowExecutionId, this.jobGroup, this.jobName,
-        KafkaJobStatusMonitor.STATE_STORE_TABLE_SUFFIX);
+    tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, this.jobGroup, this.jobName);
     stateList  = stateStore.getAll(storeName, tableName);
     Assert.assertEquals(stateList.size(), 1);
     state = stateList.get(0);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
new file mode 100644
index 0000000..45b80a8
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import org.testng.annotations.BeforeClass;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+
+
+public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
+  private MysqlJobStatusStateStore dbJobStateStore;
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+
+  @BeforeClass
+  @Override
+  public void setUp() throws Exception {
+    ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+    String jdbcUrl = testMetastoreDatabase.getJdbcUrl();
+
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    configBuilder.addPrimitive(MysqlJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, jdbcUrl);
+    configBuilder.addPrimitive(MysqlJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER);
+    configBuilder.addPrimitive(MysqlJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD);
+
+    this.jobStatusRetriever = new MysqlJobStatusRetriever(configBuilder.build());
+    this.dbJobStateStore = ((MysqlJobStatusRetriever) this.jobStatusRetriever).getStateStore();
+    cleanUpDir();
+  }
+
+  @Override
+  void cleanUpDir() throws Exception {
+    this.dbJobStateStore.delete(KafkaJobStatusMonitor.jobStatusStoreName(FLOW_GROUP, FLOW_NAME));
+  }
+}