You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ap...@apache.org on 2021/08/12 04:01:12 UTC

[gobblin] branch master updated: [GOBBLIN-1457] Add persistence for troubleshooter in Gobblin service (#3327)

This is an automated email from the ASF dual-hosted git repository.

aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new de3a694  [GOBBLIN-1457] Add persistence for troubleshooter in Gobblin service (#3327)
de3a694 is described below

commit de3a6941d0d9ab0eeb1035a8d8ea04aaa65bf07e
Author: Alex Prokofiev <ap...@linkedin.com>
AuthorDate: Wed Aug 11 21:01:05 2021 -0700

    [GOBBLIN-1457] Add persistence for troubleshooter in Gobblin service (#3327)
    
    Previously, Gobblin service kept the last few job issues in memory.
    In this commit, we add MySql-based persistence for job issues.
    
    We also introduce Flyway-based migrations to Gobblin service DB, so that
    we can adjust the table schemas in the future, and add new tables
    in a consistent way.
---
 .../apache/gobblin/service/ServiceConfigKeys.java  |  34 ++
 .../apache/gobblin/util/ClassAliasResolver.java    |   3 +-
 .../gobblin/metastore/DatabaseJobHistoryStore.java |   3 +-
 .../apache/gobblin/metastore/MysqlStateStore.java  |   5 +-
 .../util/DatabaseJobHistoryStoreSchemaManager.java |  17 +-
 .../metastore/util/MysqlDataSourceUtils.java       |  32 +-
 .../InMemoryMultiContextIssueRepository.java       |  85 +++--
 .../gobblin/runtime/troubleshooter/Issue.java      |  10 +
 .../troubleshooter/JobIssueEventHandler.java       |  13 +-
 .../MultiContextIssueRepository.java               |  16 +-
 .../InMemoryMultiContextIssueRepositoryTest.java   |   4 +-
 .../troubleshooter/JobIssueEventHandlerTest.java   |   2 +-
 .../MultiContextIssueRepositoryTest.java           |   5 +-
 gobblin-service/build.gradle                       |   5 +
 .../modules/core/GobblinServiceGuiceModule.java    |  19 +-
 .../modules/core/GobblinServiceManager.java        |  11 +
 .../service/modules/db/ServiceDatabaseManager.java |  64 ++++
 .../modules/db/ServiceDatabaseProvider.java        |  22 +-
 .../modules/db/ServiceDatabaseProviderImpl.java    | 116 ++++++
 .../modules/orchestration/Orchestrator.java        |   6 +-
 .../MySqlMultiContextIssueRepository.java          | 292 ++++++++++++++
 .../service/db/migration/V1__AddIssuesTable.sql    |  55 +++
 .../gobblin/service/GobblinServiceManagerTest.java |  12 +
 .../gobblin/service/TestServiceDatabaseConfig.java |  22 +-
 .../service/modules/core/GobblinServiceHATest.java |  14 +
 .../modules/core/GobblinServiceRedirectTest.java   |  13 +
 .../MySqlMultiContextIssueRepositoryTest.java      | 419 +++++++++++++++++++++
 gradle/scripts/dependencyDefinitions.gradle        |   9 +-
 28 files changed, 1198 insertions(+), 110 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 773d3a0..7697bc2 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -17,6 +17,8 @@
 
 package org.apache.gobblin.service;
 
+import java.time.Duration;
+
 import org.apache.gobblin.annotation.Alpha;
 
 @Alpha
@@ -147,4 +149,36 @@ public class ServiceConfigKeys {
   public static final String STATE_STORE_KEY_SEPARATION_CHARACTER = ".";
   public static final String DAG_STORE_KEY_SEPARATION_CHARACTER = "_";
 
+
+  // Service database connection
+
+  public static final String SERVICE_DB_URL_KEY = GOBBLIN_SERVICE_PREFIX + "db.url";
+  public static final String SERVICE_DB_USERNAME = GOBBLIN_SERVICE_PREFIX + "db.username";
+  public static final String SERVICE_DB_PASSWORD = GOBBLIN_SERVICE_PREFIX + "db.password";
+  public static final String SERVICE_DB_MAX_CONNECTIONS = GOBBLIN_SERVICE_PREFIX + "db.maxConnections";
+  public static final String SERVICE_DB_MAX_CONNECTION_LIFETIME = GOBBLIN_SERVICE_PREFIX + "db.maxConnectionLifetime";
+
+  // Mysql-based issues repository
+  public static final String MYSQL_ISSUE_REPO_PREFIX = GOBBLIN_SERVICE_PREFIX + "issueRepo.mysql.";
+
+  public static final String MYSQL_ISSUE_REPO_CLEANUP_INTERVAL = MYSQL_ISSUE_REPO_PREFIX + "cleanupInterval";
+  public static final Duration DEFAULT_MYSQL_ISSUE_REPO_CLEANUP_INTERVAL = Duration.ofHours(1);
+
+  public static final String MYSQL_ISSUE_REPO_MAX_ISSUES_TO_KEEP = MYSQL_ISSUE_REPO_PREFIX + "maxIssuesToKeep";
+  public static final long DEFAULT_MYSQL_ISSUE_REPO_MAX_ISSUES_TO_KEEP = 10 * 1000 * 1000;
+
+  public static final String MYSQL_ISSUE_REPO_DELETE_ISSUES_OLDER_THAN =
+      MYSQL_ISSUE_REPO_PREFIX + "deleteIssuesOlderThan";
+  public static final Duration DEFAULT_MYSQL_ISSUE_REPO_DELETE_ISSUES_OLDER_THAN = Duration.ofDays(30);
+
+  // In-memory issue repository
+  public static final String MEMORY_ISSUE_REPO_PREFIX = GOBBLIN_SERVICE_PREFIX + "issueRepo.memory.";
+
+  public static final String MEMORY_ISSUE_REPO_MAX_CONTEXT_COUNT = MEMORY_ISSUE_REPO_PREFIX + "maxContextCount";
+  public static final int DEFAULT_MEMORY_ISSUE_REPO_MAX_CONTEXT_COUNT = 100;
+
+  public static final String MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT = MEMORY_ISSUE_REPO_PREFIX + "maxIssuesPerContext";
+  public static final int DEFAULT_MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT= 20;
+
+  public static final String ISSUE_REPO_CLASS = GOBBLIN_SERVICE_PREFIX + "issueRepo.class";
 }
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/util/ClassAliasResolver.java b/gobblin-api/src/main/java/org/apache/gobblin/util/ClassAliasResolver.java
index 97708ef..4cb2c7d 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/util/ClassAliasResolver.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/util/ClassAliasResolver.java
@@ -101,7 +101,8 @@ public class ClassAliasResolver<T> {
    * and is also a subclass of {@link #subtypeOf}, if it fails it returns a class object for name
    * <code>aliasOrClassName</code>.
    */
-  public Class<? extends T> resolveClass(final String aliasOrClassName) throws ClassNotFoundException {
+  public Class<? extends T> resolveClass(final String aliasOrClassName)
+      throws ClassNotFoundException {
     if (this.aliasToClassCache.containsKey(aliasOrClassName.toUpperCase())) {
       return this.aliasToClassCache.get(aliasOrClassName.toUpperCase());
     }
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatabaseJobHistoryStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatabaseJobHistoryStore.java
index 280005d..19a81b0 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatabaseJobHistoryStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatabaseJobHistoryStore.java
@@ -87,8 +87,7 @@ public class DatabaseJobHistoryStore implements JobHistoryStore {
   }
 
   private static MigrationVersion getDatabaseVersion(DataSource dataSource) throws FlywayException {
-    Flyway flyway = new Flyway();
-    flyway.setDataSource(dataSource);
+    Flyway flyway = Flyway.configure().dataSource(dataSource).load();
     MigrationInfoService info = flyway.info();
     MigrationVersion currentVersion = MigrationVersion.EMPTY;
     if (info.current() != null) {
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index 3b54f42..3e01344 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -54,6 +54,7 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.metadata.StateStoreEntryManager;
 import org.apache.gobblin.metastore.predicates.StateStorePredicate;
 import org.apache.gobblin.metastore.predicates.StoreNamePredicate;
+import org.apache.gobblin.metastore.util.MysqlDataSourceUtils;
 import org.apache.gobblin.password.PasswordManager;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.io.StreamUtils;
@@ -187,9 +188,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
     basicDataSource.setDriverClassName(ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_JDBC_DRIVER_KEY,
         ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER));
     // MySQL server can timeout a connection so need to validate connections before use
-    // This query will fail if db is in read-only mode, otherwise read-only connections may continue to fail and not get evicted
-    // See https://stackoverflow.com/questions/39552146/evicting-connections-to-a-read-only-node-in-a-cluster-from-the-connection-pool
-    basicDataSource.setValidationQuery("select case when @@read_only = 0 then 1 else (select table_name from information_schema.tables) end as `1`");
+    basicDataSource.setValidationQuery(MysqlDataSourceUtils.QUERY_CONNECTION_IS_VALID_AND_NOT_READONLY);
     basicDataSource.setTestOnBorrow(true);
     basicDataSource.setDefaultAutoCommit(false);
     basicDataSource.setTimeBetweenEvictionRunsMillis(60000);
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/DatabaseJobHistoryStoreSchemaManager.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/DatabaseJobHistoryStoreSchemaManager.java
index ba1aa08..1ad50c1 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/DatabaseJobHistoryStoreSchemaManager.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/DatabaseJobHistoryStoreSchemaManager.java
@@ -23,23 +23,24 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 
-import com.google.common.io.Closer;
-
-import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.configuration.AbstractConfiguration;
 import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.configuration.SystemConfiguration;
 import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.annotation.Alias;
-import org.apache.gobblin.runtime.cli.CliApplication;
+import org.flywaydb.core.Flyway;
 import org.flywaydb.core.api.FlywayException;
 import org.flywaydb.core.api.MigrationInfoService;
-import org.flywaydb.core.Flyway;
 import org.flywaydb.core.internal.info.MigrationInfoDumper;
 
+import com.google.common.io.Closer;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.metastore.DatabaseJobHistoryStore;
+import org.apache.gobblin.runtime.cli.CliApplication;
 
 
 /**
@@ -54,9 +55,7 @@ public class DatabaseJobHistoryStoreSchemaManager implements CliApplication, Clo
   private final Flyway flyway;
 
   private DatabaseJobHistoryStoreSchemaManager(Properties properties) {
-    flyway = new Flyway();
-    flyway.configure(properties);
-    flyway.setClassLoader(this.getClass().getClassLoader());
+    flyway = Flyway.configure(this.getClass().getClassLoader()).configuration(properties).load();
   }
 
   public static DataSourceBuilder builder() {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/MysqlDataSourceUtils.java
similarity index 52%
copy from gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
copy to gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/MysqlDataSourceUtils.java
index e3eafbd..1c141a7 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/MysqlDataSourceUtils.java
@@ -15,24 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.runtime.troubleshooter;
+package org.apache.gobblin.metastore.util;
 
-import java.util.List;
-
-/**
- * Stores issues from multiple jobs, flows and other contexts
- *
- * @see AutomaticTroubleshooter
- * */
-public interface MultiContextIssueRepository {
-
-  List<Issue> getAll(String contextId)
-      throws TroubleshooterException;
-
-  void put(String contextId, Issue issue)
-      throws TroubleshooterException;
-
-  void remove(String contextId, String issueCode)
-      throws TroubleshooterException;
+public final class MysqlDataSourceUtils {
+  /**
+   *  This query will validate that MySQL connection is active and Mysql instance is writable.
+   *
+   *  If a database failover happened, and current replica became read-only, this query will fail and
+   *  connection will be removed from the pool.
+   *
+   *  See https://stackoverflow.com/questions/39552146/evicting-connections-to-a-read-only-node-in-a-cluster-from-the-connection-pool
+   * */
+  public static final String QUERY_CONNECTION_IS_VALID_AND_NOT_READONLY =
+      "select case when @@read_only = 0 then 1 else (select table_name from information_schema.tables) end as `1`";
 
+  private MysqlDataSourceUtils() {
+  }
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java
index 6727f3c..530798d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java
@@ -19,47 +19,42 @@ package org.apache.gobblin.runtime.troubleshooter;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.commons.collections4.map.LRUMap;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 
 import javax.inject.Inject;
 import javax.inject.Singleton;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+import org.apache.gobblin.service.ServiceConfigKeys;
 
-import org.apache.gobblin.util.ConfigUtils;
 
 /**
  * Stores issues from multiple jobs, flows or other contexts in memory.
  *
- * To limit the memory consumption, it will keep only the last {@link #MAX_CONTEXT_COUNT} contexts,
+ * To limit the memory consumption, it will keep only the last {@link Configuration#maxContextCount} contexts,
  * and older ones will be discarded.
  * */
 @Singleton
-public class InMemoryMultiContextIssueRepository implements MultiContextIssueRepository {
-  public static final int DEFAULT_MAX_CONTEXT_COUNT = 100;
-
-  public static final String CONFIG_PREFIX = "gobblin.troubleshooter.inMemoryIssueRepository.";
-  public static final String MAX_CONTEXT_COUNT = CONFIG_PREFIX + "maxContextCount";
-  public static final String MAX_ISSUE_PER_CONTEXT = CONFIG_PREFIX + "maxIssuesPerContext";
-
+public class InMemoryMultiContextIssueRepository extends AbstractIdleService implements MultiContextIssueRepository {
   private final LRUMap<String, InMemoryIssueRepository> contextIssues;
-  private final int maxIssuesPerContext;
+  private final Configuration configuration;
 
   public InMemoryMultiContextIssueRepository() {
-    this(ConfigFactory.empty());
+    this(Configuration.builder().build());
   }
 
   @Inject
-  public InMemoryMultiContextIssueRepository(Config config) {
-    this(ConfigUtils.getInt(config, MAX_CONTEXT_COUNT, DEFAULT_MAX_CONTEXT_COUNT),
-         ConfigUtils.getInt(config, MAX_ISSUE_PER_CONTEXT, InMemoryIssueRepository.DEFAULT_MAX_SIZE));
-  }
-
-  public InMemoryMultiContextIssueRepository(int maxContextCount, int maxIssuesPerContext) {
-    contextIssues = new LRUMap<>(maxContextCount);
-    this.maxIssuesPerContext = maxIssuesPerContext;
+  public InMemoryMultiContextIssueRepository(Configuration configuration) {
+    this.configuration = Objects.requireNonNull(configuration);
+    contextIssues = new LRUMap<>(configuration.getMaxContextCount());
   }
 
   @Override
@@ -78,14 +73,23 @@ public class InMemoryMultiContextIssueRepository implements MultiContextIssueRep
   @Override
   public synchronized void put(String contextId, Issue issue)
       throws TroubleshooterException {
-
-    InMemoryIssueRepository issueRepository =
-        contextIssues.computeIfAbsent(contextId, s -> new InMemoryIssueRepository(maxIssuesPerContext));
+    InMemoryIssueRepository issueRepository = contextIssues
+        .computeIfAbsent(contextId, s -> new InMemoryIssueRepository(configuration.getMaxIssuesPerContext()));
 
     issueRepository.put(issue);
   }
 
   @Override
+  public synchronized void put(String contextId, List<Issue> issues)
+      throws TroubleshooterException {
+
+    InMemoryIssueRepository issueRepository = contextIssues
+        .computeIfAbsent(contextId, s -> new InMemoryIssueRepository(configuration.getMaxIssuesPerContext()));
+
+    issueRepository.put(issues);
+  }
+
+  @Override
   public synchronized void remove(String contextId, String issueCode)
       throws TroubleshooterException {
 
@@ -95,4 +99,39 @@ public class InMemoryMultiContextIssueRepository implements MultiContextIssueRep
       issueRepository.remove(issueCode);
     }
   }
+
+  @Override
+  protected void startUp()
+      throws Exception {
+  }
+
+  @Override
+  protected void shutDown()
+      throws Exception {
+  }
+
+  @Builder
+  @Getter
+  @AllArgsConstructor
+  @NoArgsConstructor
+  public static class Configuration {
+
+    @Builder.Default
+    private int maxContextCount = ServiceConfigKeys.DEFAULT_MEMORY_ISSUE_REPO_MAX_CONTEXT_COUNT;
+
+    @Builder.Default
+    private int maxIssuesPerContext = ServiceConfigKeys.DEFAULT_MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT;
+
+    @Inject
+    public Configuration(Config innerConfig) {
+      this();
+      if (innerConfig.hasPath(ServiceConfigKeys.MEMORY_ISSUE_REPO_MAX_CONTEXT_COUNT)) {
+        maxContextCount = innerConfig.getInt(ServiceConfigKeys.MEMORY_ISSUE_REPO_MAX_CONTEXT_COUNT);
+      }
+
+      if (innerConfig.hasPath(ServiceConfigKeys.MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT)) {
+        maxIssuesPerContext = innerConfig.getInt(ServiceConfigKeys.MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT);
+      }
+    }
+  }
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java
index de4bafd..7f694b2 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import lombok.Builder;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
+import lombok.ToString;
 
 
 /**
@@ -36,7 +37,10 @@ import lombok.Getter;
 @Builder
 @Getter
 @EqualsAndHashCode
+@ToString
 public class Issue {
+  public static final int MAX_ISSUE_CODE_LENGTH = 100;
+  public static final int MAX_CLASSNAME_LENGTH = 1000;
 
   private final ZonedDateTime time;
   private final IssueSeverity severity;
@@ -45,6 +49,8 @@ public class Issue {
    * Unique code that identifies a specific problem.
    *
    * It can be used for making programmatic decisions on how to handle and recover from this issue.
+   *
+   * The code length should be less than {@link Issue.MAX_ISSUE_CODE_LENGTH}
    * */
   private final String code;
 
@@ -64,11 +70,15 @@ public class Issue {
    * Unique name of the component that produced the issue.
    *
    * This is a full name of the class that logged the error or generated the issue.
+   *
+   * The class name length should be less than {@link Issue.MAX_CLASSNAME_LENGTH}
    * */
   private final String sourceClass;
 
   /**
    * If the issue was generated from an exception, then a full exception class name should be stored here.
+   *
+   * The class name length should be less than {@link Issue.MAX_CLASSNAME_LENGTH}
    */
   private final String exceptionClass;
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java
index bccd01b..a6e7292 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java
@@ -45,22 +45,22 @@ import org.apache.gobblin.util.ConfigUtils;
 public class JobIssueEventHandler {
 
   public static final String CONFIG_PREFIX = "gobblin.troubleshooter.jobIssueEventHandler.";
-  public static final String LOG_RECEIVED_EVENTS = CONFIG_PREFIX + "logReceiveEvents";
+  public static final String LOG_RECEIVED_EVENTS = CONFIG_PREFIX + "logReceivedEvents";
 
   private static final Logger issueLogger =
       LoggerFactory.getLogger("org.apache.gobblin.runtime.troubleshooter.JobIssueLogger");
 
   private final MultiContextIssueRepository issueRepository;
-  private final boolean logReceiveEvents;
+  private final boolean logReceivedEvents;
 
   @Inject
   public JobIssueEventHandler(MultiContextIssueRepository issueRepository, Config config) {
     this(issueRepository, ConfigUtils.getBoolean(config, LOG_RECEIVED_EVENTS, true));
   }
 
-  public JobIssueEventHandler(MultiContextIssueRepository issueRepository, boolean logReceiveEvents) {
+  public JobIssueEventHandler(MultiContextIssueRepository issueRepository, boolean logReceivedEvents) {
     this.issueRepository = Objects.requireNonNull(issueRepository);
-    this.logReceiveEvents = logReceiveEvents;
+    this.logReceivedEvents = logReceivedEvents;
   }
 
   public void processEvent(GobblinTrackingEvent event) {
@@ -83,10 +83,11 @@ public class JobIssueEventHandler {
     try {
       issueRepository.put(contextId, issueEvent.getIssue());
     } catch (TroubleshooterException e) {
-      log.warn("Failed to save issue to repository. Issue code: " + issueEvent.getIssue().getCode(), e);
+      log.warn(String.format("Failed to save issue to repository. Issue time: %s, code: %s",
+                             issueEvent.getIssue().getTime(), issueEvent.getIssue().getCode()), e);
     }
 
-    if (logReceiveEvents) {
+    if (logReceivedEvents) {
       logEvent(issueEvent);
     }
   }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
index e3eafbd..1d30ed3 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
@@ -19,20 +19,30 @@ package org.apache.gobblin.runtime.troubleshooter;
 
 import java.util.List;
 
+import com.google.common.util.concurrent.Service;
+
+
 /**
- * Stores issues from multiple jobs, flows and other contexts
+ * Stores issues from multiple jobs, flows and other contexts.
+ *
+ * For each context, there can only be one issue with a specific code.
  *
  * @see AutomaticTroubleshooter
  * */
-public interface MultiContextIssueRepository {
+public interface MultiContextIssueRepository extends Service {
 
+  /**
+   * Will return issues in the same order as they were put into the repository.
+   * */
   List<Issue> getAll(String contextId)
       throws TroubleshooterException;
 
   void put(String contextId, Issue issue)
       throws TroubleshooterException;
 
-  void remove(String contextId, String issueCode)
+  void put(String contextId, List<Issue> issues)
       throws TroubleshooterException;
 
+  void remove(String contextId, String issueCode)
+      throws TroubleshooterException;
 }
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepositoryTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepositoryTest.java
index f4f3e97..241b20d 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepositoryTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepositoryTest.java
@@ -34,7 +34,9 @@ public class InMemoryMultiContextIssueRepositoryTest extends MultiContextIssueRe
     int jobCount = 100;
     int jobCapacity = 50;
 
-    MultiContextIssueRepository repository = new InMemoryMultiContextIssueRepository(50, 10);
+    MultiContextIssueRepository repository = new InMemoryMultiContextIssueRepository(
+        InMemoryMultiContextIssueRepository.Configuration.builder().maxContextCount(50).maxIssuesPerContext(10)
+            .build());
 
     for (int j = 0; j < jobCount; j++) {
       repository.put("job" + j, getTestIssue("issue 1", "code1"));
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java
index b479935..c03f81f 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java
@@ -43,7 +43,7 @@ public class JobIssueEventHandlerTest {
 
     eventHandler.processEvent(eventBuilder.build());
 
-    verify(issueRepository).put(any(), any());
+    verify(issueRepository).put(any(), (Issue) any());
   }
 
   private Issue getTestIssue(String summary, String code) {
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepositoryTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepositoryTest.java
index a524408..1244cca 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepositoryTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepositoryTest.java
@@ -21,6 +21,8 @@ import java.util.List;
 
 import org.testng.annotations.Test;
 
+import org.apache.gobblin.service.ServiceConfigKeys;
+
 import static org.testng.Assert.assertEquals;
 
 
@@ -90,7 +92,7 @@ public abstract class MultiContextIssueRepositoryTest {
       throws Exception {
 
     int jobCount = 10;
-    int issueCount = 50;
+    int issueCount = ServiceConfigKeys.DEFAULT_MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT;
 
     MultiContextIssueRepository repository = getRepository();
 
@@ -102,6 +104,7 @@ public abstract class MultiContextIssueRepositoryTest {
 
     for (int j = 0; j < jobCount; j++) {
       List<Issue> retrievedIssues = repository.getAll("job" + j);
+      assertEquals(retrievedIssues.size(), issueCount);
       for (int i = 0; i < issueCount; i++) {
         assertEquals(String.valueOf(i), retrievedIssues.get(i).getCode());
       }
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index c32885b..68f3ac7 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -43,7 +43,9 @@ dependencies {
   compile externalDependency.curatorFramework
   compile externalDependency.curatorClient
   compile externalDependency.curatorRecipes
+  compile externalDependency.commonsDbcp2
   compile externalDependency.findBugsAnnotations
+  compile externalDependency.flyway
   compile externalDependency.gson
   compile externalDependency.guava
   compile externalDependency.guavaretrying
@@ -76,6 +78,7 @@ dependencies {
   testCompile project(":gobblin-runtime").sourceSets.test.output
   testCompile project(path: ":gobblin-metastore", configuration: "testFixtures")
   testCompile project(":gobblin-test-utils")
+  testCompile externalDependency.assertj
   testCompile externalDependency.byteman
   testCompile externalDependency.bytemanBmunit
   testCompile externalDependency.calciteCore
@@ -86,6 +89,8 @@ dependencies {
   testCompile externalDependency.hamcrest
   testCompile externalDependency.jhyde
   testCompile externalDependency.mockito
+  testCompile externalDependency.testContainers
+  testCompile externalDependency.testContainersMysql
 }
 
 // Begin HACK to get around POM being depenendent on the (empty) gobblin-rest-api instead of gobblin-rest-api-rest-client
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 21db6d7..41e0174 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -59,6 +59,9 @@ import org.apache.gobblin.service.GroupOwnershipService;
 import org.apache.gobblin.service.NoopRequesterService;
 import org.apache.gobblin.service.RequesterService;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
+import org.apache.gobblin.service.modules.db.ServiceDatabaseProvider;
+import org.apache.gobblin.service.modules.db.ServiceDatabaseProviderImpl;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
@@ -66,6 +69,7 @@ import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2Resou
 import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandler;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
+import org.apache.gobblin.service.modules.troubleshooter.MySqlMultiContextIssueRepository;
 import org.apache.gobblin.service.modules.utils.HelixUtils;
 import org.apache.gobblin.service.modules.utils.InjectionNames;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
@@ -205,12 +209,23 @@ public class GobblinServiceGuiceModule implements Module {
             JOB_STATUS_RETRIEVER_CLASS_KEY, FsJobStatusRetriever.class.getName()));
 
     if (serviceConfig.isRestLIServerEnabled()) {
-      binder.bind(EmbeddedRestliServer.class).toProvider(EmbeddedRestliServerProvider.class).in(Singleton.class);
+      binder.bind(EmbeddedRestliServer.class).toProvider(EmbeddedRestliServerProvider.class);
     }
 
     binder.bind(GobblinServiceManager.class);
 
-    binder.bind(MultiContextIssueRepository.class).to(InMemoryMultiContextIssueRepository.class);
+    binder.bind(ServiceDatabaseProvider.class).to(ServiceDatabaseProviderImpl.class);
+    binder.bind(ServiceDatabaseProviderImpl.Configuration.class);
+
+    binder.bind(ServiceDatabaseManager.class);
+
+    binder.bind(MultiContextIssueRepository.class)
+        .to(getClassByNameOrAlias(MultiContextIssueRepository.class, serviceConfig.getInnerConfig(),
+                                  ServiceConfigKeys.ISSUE_REPO_CLASS,
+                                  InMemoryMultiContextIssueRepository.class.getName()));
+
+    binder.bind(MySqlMultiContextIssueRepository.Configuration.class);
+    binder.bind(InMemoryMultiContextIssueRepository.Configuration.class);
 
     binder.bind(JobIssueEventHandler.class);
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 8c2a49c..9bd6cd8 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -78,6 +78,7 @@ import org.apache.gobblin.runtime.app.ApplicationLauncher;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowConfigClient;
@@ -90,6 +91,7 @@ import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.GroupOwnershipService;
 import org.apache.gobblin.service.Schedule;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
@@ -189,6 +191,12 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
   @Inject(optional = true)
   protected KafkaJobStatusMonitor jobStatusMonitor;
 
+  @Inject
+  protected MultiContextIssueRepository issueRepository;
+
+  @Inject
+  protected ServiceDatabaseManager databaseManager;
+
   protected Optional<HelixLeaderState> helixLeaderGauges;
 
   @Inject(optional = true)
@@ -348,6 +356,9 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
       this.serviceLauncher.addService(dagManager);
     }
 
+    this.serviceLauncher.addService(databaseManager);
+    this.serviceLauncher.addService(issueRepository);
+
     if (configuration.isJobStatusMonitorEnabled()) {
       this.serviceLauncher.addService(jobStatusMonitor);
     }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/db/ServiceDatabaseManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/db/ServiceDatabaseManager.java
new file mode 100644
index 0000000..da1ad05
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/db/ServiceDatabaseManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.db;
+
+import java.util.Objects;
+
+import org.flywaydb.core.Flyway;
+
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * This class initializes and migrates the database schema for Gobblin service.
+ *
+ * We use Flyway to run the migrations that are defined in resources/org/apache/gobblin/service/db/migration
+ * */
+@Singleton
+@Slf4j
+public class ServiceDatabaseManager extends AbstractIdleService {
+
+  private final ServiceDatabaseProvider databaseProvider;
+
+  @Inject
+  public ServiceDatabaseManager(ServiceDatabaseProvider databaseProvider) {
+    this.databaseProvider = Objects.requireNonNull(databaseProvider);
+  }
+
+  @Override
+  protected void startUp()
+      throws Exception {
+
+    Flyway flyway =
+        Flyway.configure().locations("classpath:org/apache/gobblin/service/db/migration").failOnMissingLocations(true)
+            .dataSource(databaseProvider.getDatasource()).load();
+
+    log.info("Ensuring service database is migrated to latest schema");
+    // Start the migration
+    flyway.migrate();
+  }
+
+  @Override
+  protected void shutDown()
+      throws Exception {
+
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/db/ServiceDatabaseProvider.java
similarity index 63%
copy from gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
copy to gobblin-service/src/main/java/org/apache/gobblin/service/modules/db/ServiceDatabaseProvider.java
index e3eafbd..da30441 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/db/ServiceDatabaseProvider.java
@@ -15,24 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.runtime.troubleshooter;
+package org.apache.gobblin.service.modules.db;
+
+import javax.sql.DataSource;
 
-import java.util.List;
 
 /**
- * Stores issues from multiple jobs, flows and other contexts
+ * Provides access to Gobblin service database.
  *
- * @see AutomaticTroubleshooter
+ * DB schema is defined using Flyway migrations.
  * */
-public interface MultiContextIssueRepository {
-
-  List<Issue> getAll(String contextId)
-      throws TroubleshooterException;
-
-  void put(String contextId, Issue issue)
-      throws TroubleshooterException;
-
-  void remove(String contextId, String issueCode)
-      throws TroubleshooterException;
-
+public interface ServiceDatabaseProvider {
+  DataSource getDatasource();
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/db/ServiceDatabaseProviderImpl.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/db/ServiceDatabaseProviderImpl.java
new file mode 100644
index 0000000..bb05ba6
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/db/ServiceDatabaseProviderImpl.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.db;
+
+import java.time.Duration;
+import java.util.Objects;
+
+import org.apache.commons.dbcp2.BasicDataSource;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import javax.sql.DataSource;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+import org.apache.gobblin.metastore.util.MysqlDataSourceUtils;
+import org.apache.gobblin.password.PasswordManager;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class ServiceDatabaseProviderImpl implements ServiceDatabaseProvider {
+
+  private final Configuration configuration;
+  private BasicDataSource dataSource;
+
+  @Inject
+  public ServiceDatabaseProviderImpl(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  public DataSource getDatasource() {
+    ensureDataSource();
+    return dataSource;
+  }
+
+  private synchronized void ensureDataSource() {
+    if (dataSource != null) {
+      return;
+    }
+
+    dataSource = new BasicDataSource();
+
+    dataSource.setUrl(configuration.getUrl());
+    dataSource.setUsername(configuration.getUserName());
+    dataSource.setPassword(configuration.getPassword());
+
+    // MySQL server can timeout a connection so we need to validate connections.
+    dataSource.setValidationQuery(MysqlDataSourceUtils.QUERY_CONNECTION_IS_VALID_AND_NOT_READONLY);
+    dataSource.setValidationQueryTimeout(5);
+
+    // To improve performance, we only check connections on creation, and set a maximum connection lifetime
+    // If database goes to read-only mode, then connection would not work correctly for up to configured lifetime
+    dataSource.setTestOnCreate(true);
+    dataSource.setMaxConnLifetimeMillis(configuration.getMaxConnectionLifetime().toMillis());
+    dataSource.setTimeBetweenEvictionRunsMillis(Duration.ofSeconds(10).toMillis());
+    dataSource.setMinIdle(2);
+    dataSource.setMaxTotal(configuration.getMaxConnections());
+  }
+
+  @Builder
+  @AllArgsConstructor
+  @Getter
+  @NoArgsConstructor
+  public static class Configuration {
+
+    private String url;
+    private String userName;
+    private String password;
+
+    @Builder.Default
+    private Duration maxConnectionLifetime = Duration.ofMinutes(1);
+
+    @Builder.Default
+    private int maxConnections = 100;
+
+    @Inject
+    public Configuration(Config config) {
+      this();
+      Objects.requireNonNull(config, "Config cannot be null");
+
+      url = config.getString(ServiceConfigKeys.SERVICE_DB_URL_KEY);
+
+      PasswordManager passwordManager = PasswordManager.getInstance(ConfigUtils.configToProperties(config));
+
+      userName = config.getString(ServiceConfigKeys.SERVICE_DB_USERNAME);
+      password = passwordManager.readPassword(config.getString(ServiceConfigKeys.SERVICE_DB_PASSWORD));
+
+      if(config.hasPath(ServiceConfigKeys.SERVICE_DB_MAX_CONNECTIONS)){
+        maxConnections = config.getInt(ServiceConfigKeys.SERVICE_DB_MAX_CONNECTIONS);
+      }
+
+      if(config.hasPath(ServiceConfigKeys.SERVICE_DB_MAX_CONNECTION_LIFETIME)){
+        maxConnectionLifetime = config.getDuration(ServiceConfigKeys.SERVICE_DB_MAX_CONNECTION_LIFETIME);
+      }
+    }
+  }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index b256ae2..9058fb0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -105,8 +105,8 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
   private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
 
 
-  public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, Optional<TopologyCatalog> topologyCatalog,
-      Optional<DagManager> dagManager, Optional<Logger> log, boolean instrumentationEnabled) {
+  public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log,
+      FlowStatusGenerator flowStatusGenerator, boolean instrumentationEnabled) {
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
     this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
     this.topologyCatalog = topologyCatalog;
@@ -154,7 +154,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
   @Inject
   public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, Optional<TopologyCatalog> topologyCatalog,
       Optional<DagManager> dagManager, Optional<Logger> log) {
-    this(config, flowStatusGenerator, topologyCatalog, dagManager, log, true);
+    this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true);
   }
 
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepository.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepository.java
new file mode 100644
index 0000000..3c4c60d
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepository.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.troubleshooter;
+
+import java.lang.reflect.Type;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.gson.reflect.TypeToken;
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
+import org.apache.gobblin.runtime.util.GsonUtils;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.db.ServiceDatabaseProvider;
+
+
+@Singleton
+@Slf4j
+public class MySqlMultiContextIssueRepository extends AbstractIdleService implements MultiContextIssueRepository {
+
+  private final ServiceDatabaseProvider databaseProvider;
+  private final Configuration configuration;
+  private ScheduledExecutorService scheduledExecutor;
+
+  public MySqlMultiContextIssueRepository(ServiceDatabaseProvider databaseProvider) {
+    this(databaseProvider, MySqlMultiContextIssueRepository.Configuration.builder().build());
+  }
+
+  @Inject
+  public MySqlMultiContextIssueRepository(ServiceDatabaseProvider databaseProvider, Configuration configuration) {
+    this.databaseProvider = Objects.requireNonNull(databaseProvider);
+    this.configuration = Objects.requireNonNull(configuration);
+  }
+
+  @Override
+  public List<Issue> getAll(String contextId)
+      throws TroubleshooterException {
+    Objects.requireNonNull(contextId, "contextId should not be null");
+
+    String querySql = "select code, time, severity, summary, details, source_class, exception_class, properties "
+        + "from issues where context_id = ? order by position";
+
+    try (Connection connection = databaseProvider.getDatasource().getConnection();
+        PreparedStatement statement = connection.prepareStatement(querySql)) {
+
+      statement.setString(1, contextId);
+
+      ArrayList<Issue> issues = new ArrayList<>();
+
+      try (ResultSet results = statement.executeQuery()) {
+        while (results.next()) {
+          Issue.IssueBuilder issue = Issue.builder();
+          issue.code(results.getString(1));
+          issue.time(ZonedDateTime.ofInstant(Instant.ofEpochMilli(results.getTimestamp(2).getTime()), ZoneOffset.UTC));
+          issue.severity(IssueSeverity.valueOf(results.getString(3)));
+          issue.summary(results.getString(4));
+          issue.details(results.getString(5));
+          issue.sourceClass(results.getString(6));
+          issue.exceptionClass(results.getString(7));
+
+          String serializedProperties = results.getString(8);
+          if (serializedProperties != null) {
+            Type mapType = new TypeToken<HashMap<String, String>>() {
+            }.getType();
+
+            HashMap<String, String> properties =
+                GsonUtils.GSON_WITH_DATE_HANDLING.fromJson(serializedProperties, mapType);
+            issue.properties(properties);
+          }
+
+          issues.add(issue.build());
+        }
+      }
+
+      return issues;
+    } catch (SQLException e) {
+      throw new TroubleshooterException("Cannot read issues from the database", e);
+    }
+  }
+
+  @Override
+  public void put(String contextId, Issue issue)
+      throws TroubleshooterException {
+    Objects.requireNonNull(contextId, "contextId should not be null");
+    Objects.requireNonNull(issue, "issue should not be null");
+
+    put(contextId, Collections.singletonList(issue));
+  }
+
+  @Override
+  public void put(String contextId, List<Issue> issues)
+      throws TroubleshooterException {
+    Objects.requireNonNull(contextId, "contextId should not be null");
+    Objects.requireNonNull(issues, "issues should not be null");
+
+    String statementSql =
+        "replace into issues (context_id, code, time, severity,summary,details,source_class,exception_class,properties) "
+            + "values (?,?,?,?,?,?,?,?,?)";
+
+    try (Connection connection = databaseProvider.getDatasource().getConnection();
+        PreparedStatement statement = connection.prepareStatement(statementSql)) {
+      connection.setAutoCommit(false);
+
+      for (Issue issue : issues) {
+        statement.setString(1, contextId);
+        statement.setString(2, issue.getCode());
+        statement.setTimestamp(3, new Timestamp(issue.getTime().toInstant().toEpochMilli()));
+        statement.setString(4, issue.getSeverity().toString());
+        statement.setString(5, issue.getSummary());
+        statement.setString(6, issue.getDetails());
+        statement.setString(7, issue.getSourceClass());
+        statement.setString(8, issue.getExceptionClass());
+
+        String serializedProperties = null;
+        if (issue.getProperties() != null) {
+          serializedProperties = GsonUtils.GSON_WITH_DATE_HANDLING.toJson(issue.getProperties());
+        }
+        statement.setString(9, serializedProperties);
+
+        statement.executeUpdate();
+      }
+      connection.commit();
+    } catch (SQLException e) {
+      throw new TroubleshooterException("Cannot save issue to the database", e);
+    }
+  }
+
+  @Override
+  public void remove(String contextId, String issueCode)
+      throws TroubleshooterException {
+    Objects.requireNonNull(contextId, "contextId should not be null");
+    Objects.requireNonNull(issueCode, "issueCode should not be null");
+
+    String statementSql = "delete from issues where context_id=? and code=?";
+    try (Connection connection = databaseProvider.getDatasource().getConnection();
+        PreparedStatement statement = connection.prepareStatement(statementSql)) {
+      statement.setString(1, contextId);
+      statement.setString(2, issueCode);
+
+      statement.executeUpdate();
+    } catch (SQLException e) {
+      throw new TroubleshooterException("Cannot remove issue from the database", e);
+    }
+  }
+
+  @Override
+  protected void startUp()
+      throws Exception {
+    scheduledExecutor = Executors.newScheduledThreadPool(1);
+    scheduledExecutor.scheduleAtFixedRate(this::cleanupOldIssues, configuration.cleanupInterval.toMillis(),
+                                          configuration.cleanupInterval.toMillis(), TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  protected void shutDown()
+      throws Exception {
+    scheduledExecutor.shutdown();
+  }
+
+  private void cleanupOldIssues() {
+    try {
+      deleteIssuesOlderThan(ZonedDateTime.now().minus(configuration.deleteIssuesOlderThan));
+      deleteOldIssuesOverTheCount(configuration.maxIssuesToKeep);
+    } catch (Exception ex) {
+      log.warn("Failed to cleanup old issues", ex);
+    }
+  }
+
+  @VisibleForTesting
+  public void deleteIssuesOlderThan(ZonedDateTime olderThanDate)
+      throws SQLException {
+
+    try (Connection connection = databaseProvider.getDatasource().getConnection();
+        PreparedStatement statement = connection.prepareStatement("delete from issues where time < ?")) {
+
+      Instant deleteBefore = olderThanDate.withZoneSameInstant(ZoneOffset.UTC).toInstant();
+      statement.setTimestamp(1, new Timestamp(deleteBefore.toEpochMilli()));
+
+      Stopwatch stopwatch = Stopwatch.createStarted();
+      int deletedIssues = statement.executeUpdate();
+      log.info("Deleted {} issues that are older than {} in {} ms", deletedIssues, deleteBefore,
+               stopwatch.elapsed(TimeUnit.MILLISECONDS));
+    }
+  }
+
+  @VisibleForTesting
+  public void deleteOldIssuesOverTheCount(long maxIssuesToKeep)
+      throws SQLException {
+
+    try (Connection connection = databaseProvider.getDatasource().getConnection();
+        PreparedStatement countQuery = connection.prepareStatement("select count(*) from issues");
+        ResultSet resultSet = countQuery.executeQuery()) {
+
+      resultSet.next();
+      long totalIssueCount = resultSet.getLong(1);
+
+      long issuesToRemove = totalIssueCount - maxIssuesToKeep;
+      if (issuesToRemove <= 0) {
+        return;
+      }
+
+      // position is a table-wide auto-increment field. older issues will have smaller position.
+      try (PreparedStatement deleteStatement = connection
+          .prepareStatement("delete from issues order by position limit ?")) {
+
+        deleteStatement.setLong(1, issuesToRemove);
+
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        int deletedIssues = deleteStatement.executeUpdate();
+        log.info("Deleted {} issues to keep the total issue count under {} in {} ms", deletedIssues, maxIssuesToKeep,
+                 stopwatch.elapsed(TimeUnit.MILLISECONDS));
+      }
+    }
+  }
+
+  @Builder
+  @Getter
+  @AllArgsConstructor
+  @NoArgsConstructor
+  public static class Configuration {
+    @Builder.Default
+    private Duration cleanupInterval = ServiceConfigKeys.DEFAULT_MYSQL_ISSUE_REPO_CLEANUP_INTERVAL;
+
+    @Builder.Default
+    private long maxIssuesToKeep = ServiceConfigKeys.DEFAULT_MYSQL_ISSUE_REPO_MAX_ISSUES_TO_KEEP;
+
+    @Builder.Default
+    private Duration deleteIssuesOlderThan = ServiceConfigKeys.DEFAULT_MYSQL_ISSUE_REPO_DELETE_ISSUES_OLDER_THAN;
+
+    @Inject
+    public Configuration(Config innerConfig) {
+      this(); // see https://github.com/projectlombok/lombok/issues/1347
+
+      if (innerConfig.hasPath(ServiceConfigKeys.MYSQL_ISSUE_REPO_CLEANUP_INTERVAL)) {
+        cleanupInterval = innerConfig.getDuration(ServiceConfigKeys.MYSQL_ISSUE_REPO_CLEANUP_INTERVAL);
+      }
+
+      if (innerConfig.hasPath(ServiceConfigKeys.MYSQL_ISSUE_REPO_MAX_ISSUES_TO_KEEP)) {
+        maxIssuesToKeep = innerConfig.getLong(ServiceConfigKeys.MYSQL_ISSUE_REPO_MAX_ISSUES_TO_KEEP);
+      }
+
+      if (innerConfig.hasPath(ServiceConfigKeys.MYSQL_ISSUE_REPO_DELETE_ISSUES_OLDER_THAN)) {
+        deleteIssuesOlderThan = innerConfig.getDuration(ServiceConfigKeys.MYSQL_ISSUE_REPO_DELETE_ISSUES_OLDER_THAN);
+      }
+    }
+  }
+}
diff --git a/gobblin-service/src/main/resources/org/apache/gobblin/service/db/migration/V1__AddIssuesTable.sql b/gobblin-service/src/main/resources/org/apache/gobblin/service/db/migration/V1__AddIssuesTable.sql
new file mode 100644
index 0000000..ccb9ff8
--- /dev/null
+++ b/gobblin-service/src/main/resources/org/apache/gobblin/service/db/migration/V1__AddIssuesTable.sql
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+--
+--  Licensed to the Apache Software Foundation (ASF) under one or more
+--  contributor license agreements.  See the NOTICE file distributed with
+--  The ASF licenses this file to You under the Apache License, Version 2.0
+--  (the "License"); you may not use this file except in compliance with
+--  the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+--  Unless required by applicable law or agreed to in writing, software
+--  distributed under the License is distributed on an "AS IS" BASIS,
+--  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+--  See the License for the specific language governing permissions and
+--  limitations under the License.
+--
+
+create table issues
+(
+    context_id varchar(660) not null, -- context_id + code length should fit into mysql key size limits
+    code varchar(100) not null,
+    time datetime not null,
+    position bigint not null auto_increment, -- it is used to sort issues by insertion order
+    severity varchar(10) not null,
+    summary mediumtext not null,
+    details mediumtext null,
+    source_class varchar(1000) null,
+    exception_class varchar(1000) null,
+    properties json null,
+
+    constraint issues_pk
+        primary key (context_id, code),
+
+    key issues_position (position)
+);
+
+create index issues_time_index
+	on issues (time);
+
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 82eb270..2572fda 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -35,6 +35,7 @@ import org.eclipse.jgit.transport.RefSpec;
 import org.eclipse.jgit.util.FS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MySQLContainer;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -101,6 +102,8 @@ public class GobblinServiceManagerTest {
   private GobblinServiceManager gobblinServiceManager;
   private FlowConfigV2Client flowConfigClient;
 
+  private MySQLContainer mysql;
+
   private Git gitForPush;
   private TestingServer testingServer;
   Properties serviceCoreProperties = new Properties();
@@ -114,6 +117,13 @@ public class GobblinServiceManagerTest {
   public void setup() throws Exception {
     cleanUpDir(SERVICE_WORK_DIR);
     cleanUpDir(SPEC_STORE_PARENT_DIR);
+
+    mysql = new MySQLContainer("mysql:" + TestServiceDatabaseConfig.MysqlVersion);
+    mysql.start();
+    serviceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_URL_KEY, mysql.getJdbcUrl());
+    serviceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_USERNAME, mysql.getUsername());
+    serviceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_PASSWORD, mysql.getPassword());
+
     ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
 
     testingServer = new TestingServer(true);
@@ -212,6 +222,8 @@ public class GobblinServiceManagerTest {
     } catch(Exception e) {
       System.err.println("Failed to close ZK testing server.");
     }
+
+    mysql.stop();
   }
 
   /**
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java b/gobblin-service/src/test/java/org/apache/gobblin/service/TestServiceDatabaseConfig.java
similarity index 62%
copy from gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
copy to gobblin-service/src/test/java/org/apache/gobblin/service/TestServiceDatabaseConfig.java
index e3eafbd..801ff3d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/TestServiceDatabaseConfig.java
@@ -15,24 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.runtime.troubleshooter;
-
-import java.util.List;
-
-/**
- * Stores issues from multiple jobs, flows and other contexts
- *
- * @see AutomaticTroubleshooter
- * */
-public interface MultiContextIssueRepository {
-
-  List<Issue> getAll(String contextId)
-      throws TroubleshooterException;
-
-  void put(String contextId, Issue issue)
-      throws TroubleshooterException;
-
-  void remove(String contextId, String issueCode)
-      throws TroubleshooterException;
+package org.apache.gobblin.service;
 
+public class TestServiceDatabaseConfig {
+  public static final String MysqlVersion = "8.0.20";
 }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index 4263b31..3a03be7 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.eclipse.jetty.http.HttpStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MySQLContainer;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -47,6 +48,7 @@ import org.apache.gobblin.service.FlowConfigClient;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.Schedule;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.TestServiceDatabaseConfig;
 import org.apache.gobblin.service.modules.utils.HelixUtils;
 import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
@@ -100,6 +102,8 @@ public class GobblinServiceHATest {
 
   private TestingServer testingZKServer;
 
+  private MySQLContainer mysql;
+
   @BeforeClass
   public void setup() throws Exception {
     // Clean up common Flow Spec Dir
@@ -118,9 +122,17 @@ public class GobblinServiceHATest {
     logger.info("Testing ZK Server listening on: " + testingZKServer.getConnectString());
     HelixUtils.createGobblinHelixCluster(testingZKServer.getConnectString(), TEST_HELIX_CLUSTER_NAME);
 
+
     ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
 
     Properties commonServiceCoreProperties = new Properties();
+
+    mysql = new MySQLContainer("mysql:" + TestServiceDatabaseConfig.MysqlVersion);
+    mysql.start();
+    commonServiceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_URL_KEY, mysql.getJdbcUrl());
+    commonServiceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_USERNAME, mysql.getUsername());
+    commonServiceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_PASSWORD, mysql.getPassword());
+
     commonServiceCoreProperties.put(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY, testingZKServer.getConnectString());
     commonServiceCoreProperties.put(ServiceConfigKeys.HELIX_CLUSTER_NAME_KEY, TEST_HELIX_CLUSTER_NAME);
     commonServiceCoreProperties.put(ServiceConfigKeys.HELIX_INSTANCE_NAME_KEY, "GaaS_" + UUID.randomUUID().toString());
@@ -237,6 +249,8 @@ public class GobblinServiceHATest {
     }
 
     cleanUpDir(COMMON_SPEC_STORE_PARENT_DIR);
+
+    mysql.stop();
   }
 
   @Test
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
index 003e318..4f6fbde 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
@@ -26,6 +26,7 @@ import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MySQLContainer;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -47,6 +48,7 @@ import org.apache.gobblin.service.FlowConfigClient;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.Schedule;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.TestServiceDatabaseConfig;
 import org.apache.gobblin.service.modules.utils.HelixUtils;
 import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
@@ -106,6 +108,8 @@ public class GobblinServiceRedirectTest {
   private Properties node1ServiceCoreProperties;
   private Properties node2ServiceCoreProperties;
 
+  private MySQLContainer mysql;
+
   @BeforeClass
   public void setup() throws Exception {
     port1 = Integer.toString(new PortUtils.ServerSocketPortLocator().random());
@@ -122,6 +126,13 @@ public class GobblinServiceRedirectTest {
     ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
 
     Properties commonServiceCoreProperties = new Properties();
+
+    mysql = new MySQLContainer("mysql:" + TestServiceDatabaseConfig.MysqlVersion);
+    mysql.start();
+    commonServiceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_URL_KEY, mysql.getJdbcUrl());
+    commonServiceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_USERNAME, mysql.getUsername());
+    commonServiceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_PASSWORD, mysql.getPassword());
+
     commonServiceCoreProperties.put(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY, testingZKServer.getConnectString());
     commonServiceCoreProperties.put(ServiceConfigKeys.HELIX_CLUSTER_NAME_KEY, TEST_HELIX_CLUSTER_NAME);
     commonServiceCoreProperties.put(ServiceConfigKeys.HELIX_INSTANCE_NAME_KEY, "GaaS_" + UUID.randomUUID().toString());
@@ -210,6 +221,8 @@ public class GobblinServiceRedirectTest {
     } catch (Exception e) {
       logger.warn("Could not cleanly stop Testing Zookeeper", e);
     }
+
+    mysql.stop();
   }
 
   @Test
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepositoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepositoryTest.java
new file mode 100644
index 0000000..86fabfd
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepositoryTest.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.troubleshooter;
+
+import java.time.Duration;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.StringUtils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.testcontainers.containers.MySQLContainer;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Stopwatch;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.TestServiceDatabaseConfig;
+import org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
+import org.apache.gobblin.service.modules.db.ServiceDatabaseProviderImpl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+
+public class MySqlMultiContextIssueRepositoryTest {
+  private int testId = 1;
+  private MySQLContainer mysql;
+  private ServiceDatabaseProviderImpl databaseProvider;
+  private ServiceDatabaseManager databaseManager;
+  private MySqlMultiContextIssueRepository repository;
+
+  @BeforeMethod
+  public void setup() {
+    testId++;
+  }
+
+  @BeforeClass
+  public void classSetUp() {
+    mysql = new MySQLContainer("mysql:" + TestServiceDatabaseConfig.MysqlVersion);
+    mysql.start();
+
+    ServiceDatabaseProviderImpl.Configuration dbConfig =
+        ServiceDatabaseProviderImpl.Configuration.builder().url(mysql.getJdbcUrl()).userName(mysql.getUsername())
+            .password(mysql.getPassword()).build();
+
+    databaseProvider = new ServiceDatabaseProviderImpl(dbConfig);
+
+    databaseManager = new ServiceDatabaseManager(databaseProvider);
+    databaseManager.startAsync().awaitRunning();
+    repository = new MySqlMultiContextIssueRepository(databaseProvider);
+  }
+
+  @AfterClass
+  public void classTearDown() {
+    databaseManager.stopAsync().awaitTerminated();
+    mysql.stop();
+  }
+
+  @Test
+  public void canReadEmptyRepository()
+      throws Exception {
+    List<Issue> issues = repository.getAll("test-nonexistent");
+    assertThat(issues).isEmpty();
+  }
+
+  @Test
+  public void canCreateWithEmptyConfiguration()
+      throws Exception {
+    MySqlMultiContextIssueRepository.Configuration configuration =
+        new MySqlMultiContextIssueRepository.Configuration(ConfigFactory.empty());
+
+    MySqlMultiContextIssueRepository newRepo = new MySqlMultiContextIssueRepository(databaseProvider, configuration);
+
+    newRepo.startAsync().awaitRunning();
+
+    List<Issue> issues = newRepo.getAll("test-nonexistent");
+    assertThat(issues).isEmpty();
+
+    newRepo.stopAsync().awaitTerminated();
+  }
+
+  @Test
+  public void canPutAndGetFullIssue()
+      throws Exception {
+    HashMap<String, String> properties = new HashMap<>();
+    properties.put("test.prop1", "test value 1");
+    properties.put("test.prop2", "test value 2");
+
+    // Mysql date has less precision than Java date, so we zero sub-second component of the date to get the same
+    // value after retrieval from db
+
+    Issue issue = Issue.builder().summary("Test summary \" ' -- ").code("CODE1")
+        .time(ZonedDateTime.now().withNano(0).withZoneSameInstant(ZoneOffset.UTC)).severity(IssueSeverity.ERROR)
+        .details("details for test issue").exceptionClass("java.io.IOException")
+        .sourceClass("org.apache.gobblin.service.modules.troubleshooter.AutoTroubleshooterLogAppender")
+        .properties(properties).build();
+
+    String contextId = "context-" + testId;
+    repository.put(contextId, issue);
+
+    List<Issue> issues = repository.getAll(contextId);
+
+    assertThat(issues).hasSize(1);
+    assertThat(issues.get(0)).usingRecursiveComparison().isEqualTo(issue);
+  }
+
+  @Test
+  public void canPutAndGetMinimalIssue()
+      throws Exception {
+    Issue issue = Issue.builder().summary("Test summary").code("CODE1")
+        .time(ZonedDateTime.now().withNano(0).withZoneSameInstant(ZoneOffset.UTC)).severity(IssueSeverity.WARN).build();
+
+    String contextId = "context-" + testId;
+    repository.put(contextId, issue);
+
+    List<Issue> issues = repository.getAll(contextId);
+
+    assertThat(issues).hasSize(1);
+    assertThat(issues.get(0)).usingRecursiveComparison().isEqualTo(issue);
+  }
+
+  @Test
+  public void canPutIssueWithMaximumFieldLengths()
+      throws Exception {
+    // summary and details are bounded at 16MB, so we just put reasonably large values there
+    Issue issue = Issue.builder().summary(StringUtils.repeat("s", 100000)).details(StringUtils.repeat("s", 100000))
+        .code(StringUtils.repeat("c", Issue.MAX_ISSUE_CODE_LENGTH))
+        .exceptionClass(StringUtils.repeat("e", Issue.MAX_CLASSNAME_LENGTH))
+        .sourceClass(StringUtils.repeat("s", Issue.MAX_CLASSNAME_LENGTH))
+        .time(ZonedDateTime.now().withNano(0).withZoneSameInstant(ZoneOffset.UTC)).severity(IssueSeverity.WARN).build();
+
+    String contextId = TroubleshooterUtils
+        .getContextIdForJob(StringUtils.repeat("g", ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH),
+                            StringUtils.repeat("f", ServiceConfigKeys.MAX_FLOW_NAME_LENGTH),
+                            String.valueOf(Long.MAX_VALUE),
+                            StringUtils.repeat("j", ServiceConfigKeys.MAX_JOB_NAME_LENGTH));
+
+    repository.put(contextId, issue);
+
+    List<Issue> issues = repository.getAll(contextId);
+
+    assertThat(issues).hasSize(1);
+    assertThat(issues.get(0)).usingRecursiveComparison().isEqualTo(issue);
+  }
+
+  @Test
+  public void willGetMeaningfulErrorOnOversizedData()
+      throws Exception {
+    Issue issue = Issue.builder().summary("Test summary").code(StringUtils.repeat("c", Issue.MAX_ISSUE_CODE_LENGTH * 2))
+        .time(ZonedDateTime.now().withNano(0).withZoneSameInstant(ZoneOffset.UTC)).severity(IssueSeverity.WARN).build();
+
+    String contextId = "context-" + testId;
+
+    assertThatThrownBy(() -> {
+      repository.put(contextId, issue);
+    }).isInstanceOf(TroubleshooterException.class).getRootCause()
+        .hasMessageContaining("Data too long for column 'code'");
+  }
+
+  @Test
+  public void willRollbackWhenSomeIssuesAreInvalid()
+      throws Exception {
+    Issue validIssue = getTestIssue("test", "test1");
+    Issue invalidIssue =
+        Issue.builder().summary("Test summary").code(StringUtils.repeat("c", Issue.MAX_ISSUE_CODE_LENGTH * 2))
+            .time(ZonedDateTime.now().withNano(0).withZoneSameInstant(ZoneOffset.UTC)).severity(IssueSeverity.WARN)
+            .build();
+
+    String contextId = "context-" + testId;
+
+    try {
+      repository.put(contextId, Arrays.asList(validIssue, invalidIssue));
+    } catch (TroubleshooterException ex) {
+      // exception is expected
+    }
+    List<Issue> issues = repository.getAll(contextId);
+
+    assertThat(issues).isEmpty();
+  }
+
+  @Test
+  public void canPutIssueRepeatedly()
+      throws Exception {
+    Issue issue = getTestIssue("test", "test1");
+
+    String contextId = "context-" + testId;
+
+    repository.put(contextId, issue);
+    repository.put(contextId, issue);
+
+    List<Issue> issues = repository.getAll(contextId);
+
+    assertThat(issues).hasSize(1);
+    assertThat(issues.get(0)).usingRecursiveComparison().isEqualTo(issue);
+  }
+
+  @Test
+  public void canPutAndGetMultipleIssues()
+      throws Exception {
+    Issue issue1 = getTestIssue("test-1", "code1");
+    Issue issue2 = getTestIssue("test-2", "code2");
+    Issue issue3 = getTestIssue("test-3", "code3");
+    repository.put("context-1-" + testId, issue1);
+    repository.put("context-1-" + testId, issue2);
+
+    repository.put("context-2-" + testId, issue2);
+    repository.put("context-2-" + testId, issue3);
+
+    List<Issue> context1Issues = repository.getAll("context-1-" + testId);
+    assertThat(context1Issues).hasSize(2);
+    assertThat(context1Issues.get(0)).usingRecursiveComparison().isEqualTo(issue1);
+    assertThat(context1Issues.get(1)).usingRecursiveComparison().isEqualTo(issue2);
+
+    List<Issue> context2Issues = repository.getAll("context-2-" + testId);
+    assertThat(context2Issues).hasSize(2);
+    assertThat(context2Issues.get(0)).usingRecursiveComparison().isEqualTo(issue2);
+    assertThat(context2Issues.get(1)).usingRecursiveComparison().isEqualTo(issue3);
+  }
+
+  @Test
+  public void canRemoveIssue()
+      throws Exception {
+    Issue issue1 = getTestIssue("test-1", "code1");
+    Issue issue2 = getTestIssue("test-2", "code2");
+    Issue issue3 = getTestIssue("test-3", "code3");
+
+    String contextId = "context-1-" + testId;
+    repository.put(contextId, issue1);
+    repository.put(contextId, issue2);
+    repository.put(contextId, issue3);
+
+    repository.remove(contextId, issue2.getCode());
+
+    List<Issue> issues = repository.getAll(contextId);
+    assertThat(issues).hasSize(2);
+    assertThat(issues.get(0)).usingRecursiveComparison().isEqualTo(issue1);
+    assertThat(issues.get(1)).usingRecursiveComparison().isEqualTo(issue3);
+  }
+
+  @Test
+  public void willPreserveIssueOrder()
+      throws Exception {
+    Random random = new Random(1);
+
+    List<Issue> issues = new ArrayList<>();
+
+    String contextId = "context-" + testId;
+    for (int i = 0; i < 100; i++) {
+      Issue issue = getTestIssue("test-" + random.nextInt(), "code-" + random.nextInt());
+      issues.add(issue);
+      repository.put(contextId, issue);
+    }
+
+    List<Issue> retrievedIssues = repository.getAll(contextId);
+    assertThat(retrievedIssues).usingRecursiveComparison().isEqualTo(issues);
+  }
+
+  @Test
+  public void canRemoveIssuesAboveSpecifiedCount()
+      throws Exception {
+    String contextId = "context-" + testId;
+    for (int i = 0; i < 100; i++) {
+      Issue issue = getTestIssue("test-" + i, "code-" + i);
+      repository.put(contextId, issue);
+    }
+
+    repository.deleteOldIssuesOverTheCount(20);
+
+    List<Issue> retrievedIssues = repository.getAll(contextId);
+    assertThat(retrievedIssues).hasSize(20);
+    assertThat(retrievedIssues.get(0).getCode()).isEqualTo("code-80");
+    assertThat(retrievedIssues.get(19).getCode()).isEqualTo("code-99");
+  }
+
+  @Test
+  public void canRemoveOlderIssues()
+      throws Exception {
+    String contextId = "context-" + testId;
+    int issueCount = 100;
+    ZonedDateTime startTime = ZonedDateTime.now().withNano(0).withZoneSameInstant(ZoneOffset.UTC);
+    for (int i = 0; i < 100; i++) {
+      Issue issue = Issue.builder().summary("test summary").code("code-" + i)
+          .time(startTime.minus(Duration.ofDays(issueCount - i))).severity(IssueSeverity.ERROR).build();
+      repository.put(contextId, issue);
+    }
+
+    repository.deleteIssuesOlderThan(startTime.minus(Duration.ofDays(20).plus(Duration.ofHours(1))));
+
+    List<Issue> retrievedIssues = repository.getAll(contextId);
+    assertThat(retrievedIssues).hasSize(20);
+  }
+
+  @Test(enabled = false) // Load test takes several minutes to run and is disabled by default
+  public void canWriteLotsOfIssuesConcurrently()
+      throws Exception {
+    canWriteLotsOfIssuesConcurrently(false);
+  }
+
+  @Test(enabled = false) // Load test takes several minutes to run and is disabled by default
+  public void canWriteLotsOfIssuesConcurrentlyWithBatching()
+      throws Exception {
+    canWriteLotsOfIssuesConcurrently(true);
+  }
+
+  private void canWriteLotsOfIssuesConcurrently(boolean useBatching)
+      throws Exception {
+    int threadCount = 10;
+    int contextsPerThread = 100;
+    int issuesPerContext = 10;
+
+    Stopwatch stopwatch = Stopwatch.createStarted();
+
+    ConcurrentHashSet<Exception> exceptions = new ConcurrentHashSet<>();
+    ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);
+
+    for (int i = 0; i < threadCount; i++) {
+      int threadId = i;
+      forkJoinPool.submit(() -> {
+        try {
+          runLoadTestThread(repository, threadId, contextsPerThread, issuesPerContext, useBatching);
+        } catch (Exception ex) {
+          exceptions.add(ex);
+        }
+      });
+    }
+
+    forkJoinPool.shutdown();
+    assertThat(forkJoinPool.awaitTermination(30, TimeUnit.MINUTES)).isTrue();
+
+    if (!exceptions.isEmpty()) {
+      throw exceptions.stream().findFirst().get();
+    }
+
+    int totalIssues = threadCount * contextsPerThread * issuesPerContext;
+    System.out.printf("Created %d issues in %d ms. Speed: %d issues/second%n", totalIssues,
+                      stopwatch.elapsed(TimeUnit.MILLISECONDS), totalIssues / stopwatch.elapsed(TimeUnit.SECONDS));
+  }
+
+  private void runLoadTestThread(MySqlMultiContextIssueRepository repository, int threadNumber, int contextsPerThread,
+      int issuesPerContext, boolean useBatching) {
+    Random random = new Random(threadNumber);
+    try {
+      for (int i = 0; i < contextsPerThread; i++) {
+        String contextId = "load-test-" + testId + "-thread-" + threadNumber + "-context-" + i;
+        List<Issue> issues = new ArrayList<>();
+        for (int j = 0; j < issuesPerContext; j++) {
+          Issue issue = getLargeTestIssue("load-test-1-" + random.nextInt(), "code-" + random.nextInt());
+          issues.add(issue);
+          if (!useBatching) {
+            repository.put(contextId, issue);
+          }
+        }
+        if (useBatching) {
+          repository.put(contextId, issues);
+        }
+        List<Issue> retrievedIssues = repository.getAll(contextId);
+        assertThat(retrievedIssues).usingRecursiveComparison().isEqualTo(issues);
+      }
+    } catch (TroubleshooterException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Issue getTestIssue(String summary, String code) {
+    return Issue.builder().summary(summary).code(code)
+        .time(ZonedDateTime.now().withNano(0).withZoneSameInstant(ZoneOffset.UTC)).severity(IssueSeverity.ERROR)
+        .details("test details for " + summary).build();
+  }
+
+  private Issue getLargeTestIssue(String summary, String code) {
+    HashMap<String, String> properties = new HashMap<>();
+    for (int i = 0; i < 5; i++) {
+      properties.put("test.property" + i, RandomStringUtils.random(100));
+    }
+
+    Issue.IssueBuilder issue = Issue.builder();
+    issue.summary(summary);
+    issue.code(code);
+    issue.time(ZonedDateTime.now().withNano(0).withZoneSameInstant(ZoneOffset.UTC));
+    issue.severity(IssueSeverity.ERROR);
+    issue.details(RandomStringUtils.random(3000));
+    issue.sourceClass(RandomStringUtils.random(100));
+    issue.exceptionClass(RandomStringUtils.random(100));
+    issue.properties(properties);
+
+    return issue.build();
+  }
+}
\ No newline at end of file
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index 4a7c480..9bf055c 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -23,6 +23,7 @@ dependencyManagement {
 
 ext.externalDependency = [
     "antlrRuntime": "org.antlr:antlr-runtime:3.5.2",
+    "assertj": "org.assertj:assertj-core:3.20.2",
     "avro": "org.apache.avro:avro:" + avroVersion,
     "avroMapredH2": "org.apache.avro:avro-mapred:" + avroVersion,
     "awsCore": "com.amazonaws:aws-java-sdk-core:" + awsVersion,
@@ -35,6 +36,7 @@ ext.externalDependency = [
     "commonsCli": "commons-cli:commons-cli:1.3.1",
     "commonsCodec": "commons-codec:commons-codec:1.10",
     "commonsDbcp": "commons-dbcp:commons-dbcp:1.4",
+    "commonsDbcp2": "org.apache.commons:commons-dbcp2:2.8.0",
     "commonsEmail": "org.apache.commons:commons-email:1.4",
     "commonsLang": "commons-lang:commons-lang:2.6",
     "commonsLang3": "org.apache.commons:commons-lang3:3.4",
@@ -129,7 +131,7 @@ ext.externalDependency = [
     "lombok":"org.projectlombok:lombok:1.18.16",
     "mockRunnerJdbc":"com.mockrunner:mockrunner-jdbc:1.0.8",
     "xerces":"xerces:xercesImpl:2.11.0",
-    "typesafeConfig": "com.typesafe:config:1.2.1",
+    "typesafeConfig": "com.typesafe:config:1.4.1",
     "byteman": "org.jboss.byteman:byteman:" + bytemanVersion,
     "bytemanBmunit": "org.jboss.byteman:byteman-bmunit:" + bytemanVersion,
     "bcpgJdk15on": "org.bouncycastle:bcpg-jdk15on:1.52",
@@ -168,7 +170,7 @@ ext.externalDependency = [
     "reflections" : "org.reflections:reflections:0.9.10",
     "embeddedProcess": "de.flapdoodle.embed:de.flapdoodle.embed.process:1.50.2",
     "testMysqlServer": "com.wix:wix-embedded-mysql:4.6.1",
-    "flyway": "org.flywaydb:flyway-core:3.2.1",
+    "flyway": "org.flywaydb:flyway-core:7.9.2",
     "oltu": "org.apache.oltu.oauth2:org.apache.oltu.oauth2.client:1.0.2",
     "googleAnalytics": "com.google.apis:google-api-services-analytics:v3-rev134-1.22.0",
     "googleDrive": "com.google.apis:google-api-services-drive:v3-rev42-1.22.0",
@@ -204,7 +206,8 @@ ext.externalDependency = [
         "org.slf4j:slf4j-log4j12:" + slf4jVersion
     ],
     "postgresConnector": "org.postgresql:postgresql:42.1.4",
-    "assertj": 'org.assertj:assertj-core:3.8.0',
+    "testContainers": "org.testcontainers:testcontainers:1.15.3",
+    "testContainersMysql": "org.testcontainers:mysql:1.15.3"
 ]
 
 if (!isDefaultEnvironment)