You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ap...@apache.org on 2021/08/12 04:01:12 UTC
[gobblin] branch master updated: [GOBBLIN-1457] Add persistence for
troubleshooter in Gobblin service (#3327)
This is an automated email from the ASF dual-hosted git repository.
aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new de3a694 [GOBBLIN-1457] Add persistence for troubleshooter in Gobblin service (#3327)
de3a694 is described below
commit de3a6941d0d9ab0eeb1035a8d8ea04aaa65bf07e
Author: Alex Prokofiev <ap...@linkedin.com>
AuthorDate: Wed Aug 11 21:01:05 2021 -0700
[GOBBLIN-1457] Add persistence for troubleshooter in Gobblin service (#3327)
Previously, Gobblin service kept the last few job issues in memory.
In this commit, we add MySql-based persistence for job issues.
We also introduce Flyway-based migrations to Gobblin service DB, so that
we can adjust the table schemas in the future, and add new tables
in a consistent way.
---
.../apache/gobblin/service/ServiceConfigKeys.java | 34 ++
.../apache/gobblin/util/ClassAliasResolver.java | 3 +-
.../gobblin/metastore/DatabaseJobHistoryStore.java | 3 +-
.../apache/gobblin/metastore/MysqlStateStore.java | 5 +-
.../util/DatabaseJobHistoryStoreSchemaManager.java | 17 +-
.../metastore/util/MysqlDataSourceUtils.java | 32 +-
.../InMemoryMultiContextIssueRepository.java | 85 +++--
.../gobblin/runtime/troubleshooter/Issue.java | 10 +
.../troubleshooter/JobIssueEventHandler.java | 13 +-
.../MultiContextIssueRepository.java | 16 +-
.../InMemoryMultiContextIssueRepositoryTest.java | 4 +-
.../troubleshooter/JobIssueEventHandlerTest.java | 2 +-
.../MultiContextIssueRepositoryTest.java | 5 +-
gobblin-service/build.gradle | 5 +
.../modules/core/GobblinServiceGuiceModule.java | 19 +-
.../modules/core/GobblinServiceManager.java | 11 +
.../service/modules/db/ServiceDatabaseManager.java | 64 ++++
.../modules/db/ServiceDatabaseProvider.java | 22 +-
.../modules/db/ServiceDatabaseProviderImpl.java | 116 ++++++
.../modules/orchestration/Orchestrator.java | 6 +-
.../MySqlMultiContextIssueRepository.java | 292 ++++++++++++++
.../service/db/migration/V1__AddIssuesTable.sql | 55 +++
.../gobblin/service/GobblinServiceManagerTest.java | 12 +
.../gobblin/service/TestServiceDatabaseConfig.java | 22 +-
.../service/modules/core/GobblinServiceHATest.java | 14 +
.../modules/core/GobblinServiceRedirectTest.java | 13 +
.../MySqlMultiContextIssueRepositoryTest.java | 419 +++++++++++++++++++++
gradle/scripts/dependencyDefinitions.gradle | 9 +-
28 files changed, 1198 insertions(+), 110 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 773d3a0..7697bc2 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -17,6 +17,8 @@
package org.apache.gobblin.service;
+import java.time.Duration;
+
import org.apache.gobblin.annotation.Alpha;
@Alpha
@@ -147,4 +149,36 @@ public class ServiceConfigKeys {
public static final String STATE_STORE_KEY_SEPARATION_CHARACTER = ".";
public static final String DAG_STORE_KEY_SEPARATION_CHARACTER = "_";
+
+ // Service database connection
+
+ public static final String SERVICE_DB_URL_KEY = GOBBLIN_SERVICE_PREFIX + "db.url";
+ public static final String SERVICE_DB_USERNAME = GOBBLIN_SERVICE_PREFIX + "db.username";
+ public static final String SERVICE_DB_PASSWORD = GOBBLIN_SERVICE_PREFIX + "db.password";
+ public static final String SERVICE_DB_MAX_CONNECTIONS = GOBBLIN_SERVICE_PREFIX + "db.maxConnections";
+ public static final String SERVICE_DB_MAX_CONNECTION_LIFETIME = GOBBLIN_SERVICE_PREFIX + "db.maxConnectionLifetime";
+
+ // Mysql-based issues repository
+ public static final String MYSQL_ISSUE_REPO_PREFIX = GOBBLIN_SERVICE_PREFIX + "issueRepo.mysql.";
+
+ public static final String MYSQL_ISSUE_REPO_CLEANUP_INTERVAL = MYSQL_ISSUE_REPO_PREFIX + "cleanupInterval";
+ public static final Duration DEFAULT_MYSQL_ISSUE_REPO_CLEANUP_INTERVAL = Duration.ofHours(1);
+
+ public static final String MYSQL_ISSUE_REPO_MAX_ISSUES_TO_KEEP = MYSQL_ISSUE_REPO_PREFIX + "maxIssuesToKeep";
+ public static final long DEFAULT_MYSQL_ISSUE_REPO_MAX_ISSUES_TO_KEEP = 10 * 1000 * 1000;
+
+ public static final String MYSQL_ISSUE_REPO_DELETE_ISSUES_OLDER_THAN =
+ MYSQL_ISSUE_REPO_PREFIX + "deleteIssuesOlderThan";
+ public static final Duration DEFAULT_MYSQL_ISSUE_REPO_DELETE_ISSUES_OLDER_THAN = Duration.ofDays(30);
+
+ // In-memory issue repository
+ public static final String MEMORY_ISSUE_REPO_PREFIX = GOBBLIN_SERVICE_PREFIX + "issueRepo.memory.";
+
+ public static final String MEMORY_ISSUE_REPO_MAX_CONTEXT_COUNT = MEMORY_ISSUE_REPO_PREFIX + "maxContextCount";
+ public static final int DEFAULT_MEMORY_ISSUE_REPO_MAX_CONTEXT_COUNT = 100;
+
+ public static final String MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT = MEMORY_ISSUE_REPO_PREFIX + "maxIssuesPerContext";
+ public static final int DEFAULT_MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT= 20;
+
+ public static final String ISSUE_REPO_CLASS = GOBBLIN_SERVICE_PREFIX + "issueRepo.class";
}
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/util/ClassAliasResolver.java b/gobblin-api/src/main/java/org/apache/gobblin/util/ClassAliasResolver.java
index 97708ef..4cb2c7d 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/util/ClassAliasResolver.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/util/ClassAliasResolver.java
@@ -101,7 +101,8 @@ public class ClassAliasResolver<T> {
* and is also a subclass of {@link #subtypeOf}, if it fails it returns a class object for name
* <code>aliasOrClassName</code>.
*/
- public Class<? extends T> resolveClass(final String aliasOrClassName) throws ClassNotFoundException {
+ public Class<? extends T> resolveClass(final String aliasOrClassName)
+ throws ClassNotFoundException {
if (this.aliasToClassCache.containsKey(aliasOrClassName.toUpperCase())) {
return this.aliasToClassCache.get(aliasOrClassName.toUpperCase());
}
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatabaseJobHistoryStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatabaseJobHistoryStore.java
index 280005d..19a81b0 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatabaseJobHistoryStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatabaseJobHistoryStore.java
@@ -87,8 +87,7 @@ public class DatabaseJobHistoryStore implements JobHistoryStore {
}
private static MigrationVersion getDatabaseVersion(DataSource dataSource) throws FlywayException {
- Flyway flyway = new Flyway();
- flyway.setDataSource(dataSource);
+ Flyway flyway = Flyway.configure().dataSource(dataSource).load();
MigrationInfoService info = flyway.info();
MigrationVersion currentVersion = MigrationVersion.EMPTY;
if (info.current() != null) {
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index 3b54f42..3e01344 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -54,6 +54,7 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.metadata.StateStoreEntryManager;
import org.apache.gobblin.metastore.predicates.StateStorePredicate;
import org.apache.gobblin.metastore.predicates.StoreNamePredicate;
+import org.apache.gobblin.metastore.util.MysqlDataSourceUtils;
import org.apache.gobblin.password.PasswordManager;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.io.StreamUtils;
@@ -187,9 +188,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
basicDataSource.setDriverClassName(ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_JDBC_DRIVER_KEY,
ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER));
// MySQL server can timeout a connection so need to validate connections before use
- // This query will fail if db is in read-only mode, otherwise read-only connections may continue to fail and not get evicted
- // See https://stackoverflow.com/questions/39552146/evicting-connections-to-a-read-only-node-in-a-cluster-from-the-connection-pool
- basicDataSource.setValidationQuery("select case when @@read_only = 0 then 1 else (select table_name from information_schema.tables) end as `1`");
+ basicDataSource.setValidationQuery(MysqlDataSourceUtils.QUERY_CONNECTION_IS_VALID_AND_NOT_READONLY);
basicDataSource.setTestOnBorrow(true);
basicDataSource.setDefaultAutoCommit(false);
basicDataSource.setTimeBetweenEvictionRunsMillis(60000);
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/DatabaseJobHistoryStoreSchemaManager.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/DatabaseJobHistoryStoreSchemaManager.java
index ba1aa08..1ad50c1 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/DatabaseJobHistoryStoreSchemaManager.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/DatabaseJobHistoryStoreSchemaManager.java
@@ -23,23 +23,24 @@ import java.util.Iterator;
import java.util.List;
import java.util.Properties;
-import com.google.common.io.Closer;
-
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.configuration.AbstractConfiguration;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.SystemConfiguration;
import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.annotation.Alias;
-import org.apache.gobblin.runtime.cli.CliApplication;
+import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.FlywayException;
import org.flywaydb.core.api.MigrationInfoService;
-import org.flywaydb.core.Flyway;
import org.flywaydb.core.internal.info.MigrationInfoDumper;
+import com.google.common.io.Closer;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.metastore.DatabaseJobHistoryStore;
+import org.apache.gobblin.runtime.cli.CliApplication;
/**
@@ -54,9 +55,7 @@ public class DatabaseJobHistoryStoreSchemaManager implements CliApplication, Clo
private final Flyway flyway;
private DatabaseJobHistoryStoreSchemaManager(Properties properties) {
- flyway = new Flyway();
- flyway.configure(properties);
- flyway.setClassLoader(this.getClass().getClassLoader());
+ flyway = Flyway.configure(this.getClass().getClassLoader()).configuration(properties).load();
}
public static DataSourceBuilder builder() {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/MysqlDataSourceUtils.java
similarity index 52%
copy from gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
copy to gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/MysqlDataSourceUtils.java
index e3eafbd..1c141a7 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/MysqlDataSourceUtils.java
@@ -15,24 +15,20 @@
* limitations under the License.
*/
-package org.apache.gobblin.runtime.troubleshooter;
+package org.apache.gobblin.metastore.util;
-import java.util.List;
-
-/**
- * Stores issues from multiple jobs, flows and other contexts
- *
- * @see AutomaticTroubleshooter
- * */
-public interface MultiContextIssueRepository {
-
- List<Issue> getAll(String contextId)
- throws TroubleshooterException;
-
- void put(String contextId, Issue issue)
- throws TroubleshooterException;
-
- void remove(String contextId, String issueCode)
- throws TroubleshooterException;
+public final class MysqlDataSourceUtils {
+ /**
+ * This query will validate that MySQL connection is active and Mysql instance is writable.
+ *
+ * If a database failover happened, and current replica became read-only, this query will fail and
+ * connection will be removed from the pool.
+ *
+ * See https://stackoverflow.com/questions/39552146/evicting-connections-to-a-read-only-node-in-a-cluster-from-the-connection-pool
+ * */
+ public static final String QUERY_CONNECTION_IS_VALID_AND_NOT_READONLY =
+ "select case when @@read_only = 0 then 1 else (select table_name from information_schema.tables) end as `1`";
+ private MysqlDataSourceUtils() {
+ }
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java
index 6727f3c..530798d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java
@@ -19,47 +19,42 @@ package org.apache.gobblin.runtime.troubleshooter;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import org.apache.commons.collections4.map.LRUMap;
+import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.util.ConfigUtils;
/**
* Stores issues from multiple jobs, flows or other contexts in memory.
*
- * To limit the memory consumption, it will keep only the last {@link #MAX_CONTEXT_COUNT} contexts,
+ * To limit the memory consumption, it will keep only the last {@link Configuration#maxContextCount} contexts,
* and older ones will be discarded.
* */
@Singleton
-public class InMemoryMultiContextIssueRepository implements MultiContextIssueRepository {
- public static final int DEFAULT_MAX_CONTEXT_COUNT = 100;
-
- public static final String CONFIG_PREFIX = "gobblin.troubleshooter.inMemoryIssueRepository.";
- public static final String MAX_CONTEXT_COUNT = CONFIG_PREFIX + "maxContextCount";
- public static final String MAX_ISSUE_PER_CONTEXT = CONFIG_PREFIX + "maxIssuesPerContext";
-
+public class InMemoryMultiContextIssueRepository extends AbstractIdleService implements MultiContextIssueRepository {
private final LRUMap<String, InMemoryIssueRepository> contextIssues;
- private final int maxIssuesPerContext;
+ private final Configuration configuration;
public InMemoryMultiContextIssueRepository() {
- this(ConfigFactory.empty());
+ this(Configuration.builder().build());
}
@Inject
- public InMemoryMultiContextIssueRepository(Config config) {
- this(ConfigUtils.getInt(config, MAX_CONTEXT_COUNT, DEFAULT_MAX_CONTEXT_COUNT),
- ConfigUtils.getInt(config, MAX_ISSUE_PER_CONTEXT, InMemoryIssueRepository.DEFAULT_MAX_SIZE));
- }
-
- public InMemoryMultiContextIssueRepository(int maxContextCount, int maxIssuesPerContext) {
- contextIssues = new LRUMap<>(maxContextCount);
- this.maxIssuesPerContext = maxIssuesPerContext;
+ public InMemoryMultiContextIssueRepository(Configuration configuration) {
+ this.configuration = Objects.requireNonNull(configuration);
+ contextIssues = new LRUMap<>(configuration.getMaxContextCount());
}
@Override
@@ -78,14 +73,23 @@ public class InMemoryMultiContextIssueRepository implements MultiContextIssueRep
@Override
public synchronized void put(String contextId, Issue issue)
throws TroubleshooterException {
-
- InMemoryIssueRepository issueRepository =
- contextIssues.computeIfAbsent(contextId, s -> new InMemoryIssueRepository(maxIssuesPerContext));
+ InMemoryIssueRepository issueRepository = contextIssues
+ .computeIfAbsent(contextId, s -> new InMemoryIssueRepository(configuration.getMaxIssuesPerContext()));
issueRepository.put(issue);
}
@Override
+ public synchronized void put(String contextId, List<Issue> issues)
+ throws TroubleshooterException {
+
+ InMemoryIssueRepository issueRepository = contextIssues
+ .computeIfAbsent(contextId, s -> new InMemoryIssueRepository(configuration.getMaxIssuesPerContext()));
+
+ issueRepository.put(issues);
+ }
+
+ @Override
public synchronized void remove(String contextId, String issueCode)
throws TroubleshooterException {
@@ -95,4 +99,39 @@ public class InMemoryMultiContextIssueRepository implements MultiContextIssueRep
issueRepository.remove(issueCode);
}
}
+
+ @Override
+ protected void startUp()
+ throws Exception {
+ }
+
+ @Override
+ protected void shutDown()
+ throws Exception {
+ }
+
+ @Builder
+ @Getter
+ @AllArgsConstructor
+ @NoArgsConstructor
+ public static class Configuration {
+
+ @Builder.Default
+ private int maxContextCount = ServiceConfigKeys.DEFAULT_MEMORY_ISSUE_REPO_MAX_CONTEXT_COUNT;
+
+ @Builder.Default
+ private int maxIssuesPerContext = ServiceConfigKeys.DEFAULT_MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT;
+
+ @Inject
+ public Configuration(Config innerConfig) {
+ this();
+ if (innerConfig.hasPath(ServiceConfigKeys.MEMORY_ISSUE_REPO_MAX_CONTEXT_COUNT)) {
+ maxContextCount = innerConfig.getInt(ServiceConfigKeys.MEMORY_ISSUE_REPO_MAX_CONTEXT_COUNT);
+ }
+
+ if (innerConfig.hasPath(ServiceConfigKeys.MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT)) {
+ maxIssuesPerContext = innerConfig.getInt(ServiceConfigKeys.MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT);
+ }
+ }
+ }
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java
index de4bafd..7f694b2 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java
@@ -24,6 +24,7 @@ import java.util.Map;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
+import lombok.ToString;
/**
@@ -36,7 +37,10 @@ import lombok.Getter;
@Builder
@Getter
@EqualsAndHashCode
+@ToString
public class Issue {
+ public static final int MAX_ISSUE_CODE_LENGTH = 100;
+ public static final int MAX_CLASSNAME_LENGTH = 1000;
private final ZonedDateTime time;
private final IssueSeverity severity;
@@ -45,6 +49,8 @@ public class Issue {
* Unique code that identifies a specific problem.
*
* It can be used for making programmatic decisions on how to handle and recover from this issue.
+ *
+ * The code length should be less than {@link Issue.MAX_ISSUE_CODE_LENGTH}
* */
private final String code;
@@ -64,11 +70,15 @@ public class Issue {
* Unique name of the component that produced the issue.
*
* This is a full name of the class that logged the error or generated the issue.
+ *
+ * The class name length should be less than {@link Issue.MAX_CLASSNAME_LENGTH}
* */
private final String sourceClass;
/**
* If the issue was generated from an exception, then a full exception class name should be stored here.
+ *
+ * The class name length should be less than {@link Issue.MAX_CLASSNAME_LENGTH}
*/
private final String exceptionClass;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java
index bccd01b..a6e7292 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java
@@ -45,22 +45,22 @@ import org.apache.gobblin.util.ConfigUtils;
public class JobIssueEventHandler {
public static final String CONFIG_PREFIX = "gobblin.troubleshooter.jobIssueEventHandler.";
- public static final String LOG_RECEIVED_EVENTS = CONFIG_PREFIX + "logReceiveEvents";
+ public static final String LOG_RECEIVED_EVENTS = CONFIG_PREFIX + "logReceivedEvents";
private static final Logger issueLogger =
LoggerFactory.getLogger("org.apache.gobblin.runtime.troubleshooter.JobIssueLogger");
private final MultiContextIssueRepository issueRepository;
- private final boolean logReceiveEvents;
+ private final boolean logReceivedEvents;
@Inject
public JobIssueEventHandler(MultiContextIssueRepository issueRepository, Config config) {
this(issueRepository, ConfigUtils.getBoolean(config, LOG_RECEIVED_EVENTS, true));
}
- public JobIssueEventHandler(MultiContextIssueRepository issueRepository, boolean logReceiveEvents) {
+ public JobIssueEventHandler(MultiContextIssueRepository issueRepository, boolean logReceivedEvents) {
this.issueRepository = Objects.requireNonNull(issueRepository);
- this.logReceiveEvents = logReceiveEvents;
+ this.logReceivedEvents = logReceivedEvents;
}
public void processEvent(GobblinTrackingEvent event) {
@@ -83,10 +83,11 @@ public class JobIssueEventHandler {
try {
issueRepository.put(contextId, issueEvent.getIssue());
} catch (TroubleshooterException e) {
- log.warn("Failed to save issue to repository. Issue code: " + issueEvent.getIssue().getCode(), e);
+ log.warn(String.format("Failed to save issue to repository. Issue time: %s, code: %s",
+ issueEvent.getIssue().getTime(), issueEvent.getIssue().getCode()), e);
}
- if (logReceiveEvents) {
+ if (logReceivedEvents) {
logEvent(issueEvent);
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
index e3eafbd..1d30ed3 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
@@ -19,20 +19,30 @@ package org.apache.gobblin.runtime.troubleshooter;
import java.util.List;
+import com.google.common.util.concurrent.Service;
+
+
/**
- * Stores issues from multiple jobs, flows and other contexts
+ * Stores issues from multiple jobs, flows and other contexts.
+ *
+ * For each context, there can only be one issue with a specific code.
*
* @see AutomaticTroubleshooter
* */
-public interface MultiContextIssueRepository {
+public interface MultiContextIssueRepository extends Service {
+ /**
+ * Will return issues in the same order as they were put into the repository.
+ * */
List<Issue> getAll(String contextId)
throws TroubleshooterException;
void put(String contextId, Issue issue)
throws TroubleshooterException;
- void remove(String contextId, String issueCode)
+ void put(String contextId, List<Issue> issues)
throws TroubleshooterException;
+ void remove(String contextId, String issueCode)
+ throws TroubleshooterException;
}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepositoryTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepositoryTest.java
index f4f3e97..241b20d 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepositoryTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepositoryTest.java
@@ -34,7 +34,9 @@ public class InMemoryMultiContextIssueRepositoryTest extends MultiContextIssueRe
int jobCount = 100;
int jobCapacity = 50;
- MultiContextIssueRepository repository = new InMemoryMultiContextIssueRepository(50, 10);
+ MultiContextIssueRepository repository = new InMemoryMultiContextIssueRepository(
+ InMemoryMultiContextIssueRepository.Configuration.builder().maxContextCount(50).maxIssuesPerContext(10)
+ .build());
for (int j = 0; j < jobCount; j++) {
repository.put("job" + j, getTestIssue("issue 1", "code1"));
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java
index b479935..c03f81f 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java
@@ -43,7 +43,7 @@ public class JobIssueEventHandlerTest {
eventHandler.processEvent(eventBuilder.build());
- verify(issueRepository).put(any(), any());
+ verify(issueRepository).put(any(), (Issue) any());
}
private Issue getTestIssue(String summary, String code) {
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepositoryTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepositoryTest.java
index a524408..1244cca 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepositoryTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepositoryTest.java
@@ -21,6 +21,8 @@ import java.util.List;
import org.testng.annotations.Test;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
import static org.testng.Assert.assertEquals;
@@ -90,7 +92,7 @@ public abstract class MultiContextIssueRepositoryTest {
throws Exception {
int jobCount = 10;
- int issueCount = 50;
+ int issueCount = ServiceConfigKeys.DEFAULT_MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT;
MultiContextIssueRepository repository = getRepository();
@@ -102,6 +104,7 @@ public abstract class MultiContextIssueRepositoryTest {
for (int j = 0; j < jobCount; j++) {
List<Issue> retrievedIssues = repository.getAll("job" + j);
+ assertEquals(retrievedIssues.size(), issueCount);
for (int i = 0; i < issueCount; i++) {
assertEquals(String.valueOf(i), retrievedIssues.get(i).getCode());
}
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index c32885b..68f3ac7 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -43,7 +43,9 @@ dependencies {
compile externalDependency.curatorFramework
compile externalDependency.curatorClient
compile externalDependency.curatorRecipes
+ compile externalDependency.commonsDbcp2
compile externalDependency.findBugsAnnotations
+ compile externalDependency.flyway
compile externalDependency.gson
compile externalDependency.guava
compile externalDependency.guavaretrying
@@ -76,6 +78,7 @@ dependencies {
testCompile project(":gobblin-runtime").sourceSets.test.output
testCompile project(path: ":gobblin-metastore", configuration: "testFixtures")
testCompile project(":gobblin-test-utils")
+ testCompile externalDependency.assertj
testCompile externalDependency.byteman
testCompile externalDependency.bytemanBmunit
testCompile externalDependency.calciteCore
@@ -86,6 +89,8 @@ dependencies {
testCompile externalDependency.hamcrest
testCompile externalDependency.jhyde
testCompile externalDependency.mockito
+ testCompile externalDependency.testContainers
+ testCompile externalDependency.testContainersMysql
}
// Begin HACK to get around POM being depenendent on the (empty) gobblin-rest-api instead of gobblin-rest-api-rest-client
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 21db6d7..41e0174 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -59,6 +59,9 @@ import org.apache.gobblin.service.GroupOwnershipService;
import org.apache.gobblin.service.NoopRequesterService;
import org.apache.gobblin.service.RequesterService;
import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
+import org.apache.gobblin.service.modules.db.ServiceDatabaseProvider;
+import org.apache.gobblin.service.modules.db.ServiceDatabaseProviderImpl;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
@@ -66,6 +69,7 @@ import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2Resou
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandler;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
+import org.apache.gobblin.service.modules.troubleshooter.MySqlMultiContextIssueRepository;
import org.apache.gobblin.service.modules.utils.HelixUtils;
import org.apache.gobblin.service.modules.utils.InjectionNames;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
@@ -205,12 +209,23 @@ public class GobblinServiceGuiceModule implements Module {
JOB_STATUS_RETRIEVER_CLASS_KEY, FsJobStatusRetriever.class.getName()));
if (serviceConfig.isRestLIServerEnabled()) {
- binder.bind(EmbeddedRestliServer.class).toProvider(EmbeddedRestliServerProvider.class).in(Singleton.class);
+ binder.bind(EmbeddedRestliServer.class).toProvider(EmbeddedRestliServerProvider.class);
}
binder.bind(GobblinServiceManager.class);
- binder.bind(MultiContextIssueRepository.class).to(InMemoryMultiContextIssueRepository.class);
+ binder.bind(ServiceDatabaseProvider.class).to(ServiceDatabaseProviderImpl.class);
+ binder.bind(ServiceDatabaseProviderImpl.Configuration.class);
+
+ binder.bind(ServiceDatabaseManager.class);
+
+ binder.bind(MultiContextIssueRepository.class)
+ .to(getClassByNameOrAlias(MultiContextIssueRepository.class, serviceConfig.getInnerConfig(),
+ ServiceConfigKeys.ISSUE_REPO_CLASS,
+ InMemoryMultiContextIssueRepository.class.getName()));
+
+ binder.bind(MySqlMultiContextIssueRepository.Configuration.class);
+ binder.bind(InMemoryMultiContextIssueRepository.Configuration.class);
binder.bind(JobIssueEventHandler.class);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 8c2a49c..9bd6cd8 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -78,6 +78,7 @@ import org.apache.gobblin.runtime.app.ApplicationLauncher;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.FlowConfig;
import org.apache.gobblin.service.FlowConfigClient;
@@ -90,6 +91,7 @@ import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.GroupOwnershipService;
import org.apache.gobblin.service.Schedule;
import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
@@ -189,6 +191,12 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
@Inject(optional = true)
protected KafkaJobStatusMonitor jobStatusMonitor;
+ @Inject
+ protected MultiContextIssueRepository issueRepository;
+
+ @Inject
+ protected ServiceDatabaseManager databaseManager;
+
protected Optional<HelixLeaderState> helixLeaderGauges;
@Inject(optional = true)
@@ -348,6 +356,9 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
this.serviceLauncher.addService(dagManager);
}
+ this.serviceLauncher.addService(databaseManager);
+ this.serviceLauncher.addService(issueRepository);
+
if (configuration.isJobStatusMonitorEnabled()) {
this.serviceLauncher.addService(jobStatusMonitor);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/db/ServiceDatabaseManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/db/ServiceDatabaseManager.java
new file mode 100644
index 0000000..da1ad05
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/db/ServiceDatabaseManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.db;
+
+import java.util.Objects;
+
+import org.flywaydb.core.Flyway;
+
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * This class initializes and migrates the database schema for Gobblin service.
+ *
+ * We use Flyway to run the migrations that are defined in resources/org/apache/gobblin/service/db/migration
+ * */
+@Singleton
+@Slf4j
+public class ServiceDatabaseManager extends AbstractIdleService {
+
+ private final ServiceDatabaseProvider databaseProvider;
+
+ @Inject
+ public ServiceDatabaseManager(ServiceDatabaseProvider databaseProvider) {
+ this.databaseProvider = Objects.requireNonNull(databaseProvider);
+ }
+
+ @Override
+ protected void startUp()
+ throws Exception {
+
+ Flyway flyway =
+ Flyway.configure().locations("classpath:org/apache/gobblin/service/db/migration").failOnMissingLocations(true)
+ .dataSource(databaseProvider.getDatasource()).load();
+
+ log.info("Ensuring service database is migrated to latest schema");
+ // Start the migration
+ flyway.migrate();
+ }
+
+ @Override
+ protected void shutDown()
+ throws Exception {
+
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/db/ServiceDatabaseProvider.java
similarity index 63%
copy from gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
copy to gobblin-service/src/main/java/org/apache/gobblin/service/modules/db/ServiceDatabaseProvider.java
index e3eafbd..da30441 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/db/ServiceDatabaseProvider.java
@@ -15,24 +15,16 @@
* limitations under the License.
*/
-package org.apache.gobblin.runtime.troubleshooter;
+package org.apache.gobblin.service.modules.db;
+
+import javax.sql.DataSource;
-import java.util.List;
/**
- * Stores issues from multiple jobs, flows and other contexts
+ * Provides access to Gobblin service database.
*
- * @see AutomaticTroubleshooter
+ * DB schema is defined using Flyway migrations.
* */
-public interface MultiContextIssueRepository {
-
- List<Issue> getAll(String contextId)
- throws TroubleshooterException;
-
- void put(String contextId, Issue issue)
- throws TroubleshooterException;
-
- void remove(String contextId, String issueCode)
- throws TroubleshooterException;
-
+public interface ServiceDatabaseProvider {
+ DataSource getDatasource();
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/db/ServiceDatabaseProviderImpl.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/db/ServiceDatabaseProviderImpl.java
new file mode 100644
index 0000000..bb05ba6
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/db/ServiceDatabaseProviderImpl.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.db;
+
+import java.time.Duration;
+import java.util.Objects;
+
+import org.apache.commons.dbcp2.BasicDataSource;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import javax.sql.DataSource;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+import org.apache.gobblin.metastore.util.MysqlDataSourceUtils;
+import org.apache.gobblin.password.PasswordManager;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class ServiceDatabaseProviderImpl implements ServiceDatabaseProvider {
+
+ private final Configuration configuration;
+ private BasicDataSource dataSource;
+
+ @Inject
+ public ServiceDatabaseProviderImpl(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ public DataSource getDatasource() {
+ ensureDataSource();
+ return dataSource;
+ }
+
+ private synchronized void ensureDataSource() {
+ if (dataSource != null) {
+ return;
+ }
+
+ dataSource = new BasicDataSource();
+
+ dataSource.setUrl(configuration.getUrl());
+ dataSource.setUsername(configuration.getUserName());
+ dataSource.setPassword(configuration.getPassword());
+
+ // MySQL server can timeout a connection so we need to validate connections.
+ dataSource.setValidationQuery(MysqlDataSourceUtils.QUERY_CONNECTION_IS_VALID_AND_NOT_READONLY);
+ dataSource.setValidationQueryTimeout(5);
+
+ // To improve performance, we only check connections on creation, and set a maximum connection lifetime
+ // If database goes to read-only mode, then connection would not work correctly for up to configured lifetime
+ dataSource.setTestOnCreate(true);
+ dataSource.setMaxConnLifetimeMillis(configuration.getMaxConnectionLifetime().toMillis());
+ dataSource.setTimeBetweenEvictionRunsMillis(Duration.ofSeconds(10).toMillis());
+ dataSource.setMinIdle(2);
+ dataSource.setMaxTotal(configuration.getMaxConnections());
+ }
+
+ @Builder
+ @AllArgsConstructor
+ @Getter
+ @NoArgsConstructor
+ public static class Configuration {
+
+ private String url;
+ private String userName;
+ private String password;
+
+ @Builder.Default
+ private Duration maxConnectionLifetime = Duration.ofMinutes(1);
+
+ @Builder.Default
+ private int maxConnections = 100;
+
+ @Inject
+ public Configuration(Config config) {
+ this();
+ Objects.requireNonNull(config, "Config cannot be null");
+
+ url = config.getString(ServiceConfigKeys.SERVICE_DB_URL_KEY);
+
+ PasswordManager passwordManager = PasswordManager.getInstance(ConfigUtils.configToProperties(config));
+
+ userName = config.getString(ServiceConfigKeys.SERVICE_DB_USERNAME);
+ password = passwordManager.readPassword(config.getString(ServiceConfigKeys.SERVICE_DB_PASSWORD));
+
+ if(config.hasPath(ServiceConfigKeys.SERVICE_DB_MAX_CONNECTIONS)){
+ maxConnections = config.getInt(ServiceConfigKeys.SERVICE_DB_MAX_CONNECTIONS);
+ }
+
+ if(config.hasPath(ServiceConfigKeys.SERVICE_DB_MAX_CONNECTION_LIFETIME)){
+ maxConnectionLifetime = config.getDuration(ServiceConfigKeys.SERVICE_DB_MAX_CONNECTION_LIFETIME);
+ }
+ }
+ }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index b256ae2..9058fb0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -105,8 +105,8 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
- public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, Optional<TopologyCatalog> topologyCatalog,
- Optional<DagManager> dagManager, Optional<Logger> log, boolean instrumentationEnabled) {
+ public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log,
+ FlowStatusGenerator flowStatusGenerator, boolean instrumentationEnabled) {
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
this.topologyCatalog = topologyCatalog;
@@ -154,7 +154,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
@Inject
public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, Optional<TopologyCatalog> topologyCatalog,
Optional<DagManager> dagManager, Optional<Logger> log) {
- this(config, flowStatusGenerator, topologyCatalog, dagManager, log, true);
+ this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepository.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepository.java
new file mode 100644
index 0000000..3c4c60d
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepository.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.troubleshooter;
+
+import java.lang.reflect.Type;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.gson.reflect.TypeToken;
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
+import org.apache.gobblin.runtime.util.GsonUtils;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.db.ServiceDatabaseProvider;
+
+
+@Singleton
+@Slf4j
+public class MySqlMultiContextIssueRepository extends AbstractIdleService implements MultiContextIssueRepository {
+
+ private final ServiceDatabaseProvider databaseProvider;
+ private final Configuration configuration;
+ private ScheduledExecutorService scheduledExecutor;
+
+ public MySqlMultiContextIssueRepository(ServiceDatabaseProvider databaseProvider) {
+ this(databaseProvider, MySqlMultiContextIssueRepository.Configuration.builder().build());
+ }
+
+ @Inject
+ public MySqlMultiContextIssueRepository(ServiceDatabaseProvider databaseProvider, Configuration configuration) {
+ this.databaseProvider = Objects.requireNonNull(databaseProvider);
+ this.configuration = Objects.requireNonNull(configuration);
+ }
+
+ @Override
+ public List<Issue> getAll(String contextId)
+ throws TroubleshooterException {
+ Objects.requireNonNull(contextId, "contextId should not be null");
+
+ String querySql = "select code, time, severity, summary, details, source_class, exception_class, properties "
+ + "from issues where context_id = ? order by position";
+
+ try (Connection connection = databaseProvider.getDatasource().getConnection();
+ PreparedStatement statement = connection.prepareStatement(querySql)) {
+
+ statement.setString(1, contextId);
+
+ ArrayList<Issue> issues = new ArrayList<>();
+
+ try (ResultSet results = statement.executeQuery()) {
+ while (results.next()) {
+ Issue.IssueBuilder issue = Issue.builder();
+ issue.code(results.getString(1));
+ issue.time(ZonedDateTime.ofInstant(Instant.ofEpochMilli(results.getTimestamp(2).getTime()), ZoneOffset.UTC));
+ issue.severity(IssueSeverity.valueOf(results.getString(3)));
+ issue.summary(results.getString(4));
+ issue.details(results.getString(5));
+ issue.sourceClass(results.getString(6));
+ issue.exceptionClass(results.getString(7));
+
+ String serializedProperties = results.getString(8);
+ if (serializedProperties != null) {
+ Type mapType = new TypeToken<HashMap<String, String>>() {
+ }.getType();
+
+ HashMap<String, String> properties =
+ GsonUtils.GSON_WITH_DATE_HANDLING.fromJson(serializedProperties, mapType);
+ issue.properties(properties);
+ }
+
+ issues.add(issue.build());
+ }
+ }
+
+ return issues;
+ } catch (SQLException e) {
+ throw new TroubleshooterException("Cannot read issues from the database", e);
+ }
+ }
+
+ @Override
+ public void put(String contextId, Issue issue)
+ throws TroubleshooterException {
+ Objects.requireNonNull(contextId, "contextId should not be null");
+ Objects.requireNonNull(issue, "issue should not be null");
+
+ put(contextId, Collections.singletonList(issue));
+ }
+
+ @Override
+ public void put(String contextId, List<Issue> issues)
+ throws TroubleshooterException {
+ Objects.requireNonNull(contextId, "contextId should not be null");
+ Objects.requireNonNull(issues, "issues should not be null");
+
+ String statementSql =
+ "replace into issues (context_id, code, time, severity,summary,details,source_class,exception_class,properties) "
+ + "values (?,?,?,?,?,?,?,?,?)";
+
+ try (Connection connection = databaseProvider.getDatasource().getConnection();
+ PreparedStatement statement = connection.prepareStatement(statementSql)) {
+ connection.setAutoCommit(false);
+
+ for (Issue issue : issues) {
+ statement.setString(1, contextId);
+ statement.setString(2, issue.getCode());
+ statement.setTimestamp(3, new Timestamp(issue.getTime().toInstant().toEpochMilli()));
+ statement.setString(4, issue.getSeverity().toString());
+ statement.setString(5, issue.getSummary());
+ statement.setString(6, issue.getDetails());
+ statement.setString(7, issue.getSourceClass());
+ statement.setString(8, issue.getExceptionClass());
+
+ String serializedProperties = null;
+ if (issue.getProperties() != null) {
+ serializedProperties = GsonUtils.GSON_WITH_DATE_HANDLING.toJson(issue.getProperties());
+ }
+ statement.setString(9, serializedProperties);
+
+ statement.executeUpdate();
+ }
+ connection.commit();
+ } catch (SQLException e) {
+ throw new TroubleshooterException("Cannot save issue to the database", e);
+ }
+ }
+
+ @Override
+ public void remove(String contextId, String issueCode)
+ throws TroubleshooterException {
+ Objects.requireNonNull(contextId, "contextId should not be null");
+ Objects.requireNonNull(issueCode, "issueCode should not be null");
+
+ String statementSql = "delete from issues where context_id=? and code=?";
+ try (Connection connection = databaseProvider.getDatasource().getConnection();
+ PreparedStatement statement = connection.prepareStatement(statementSql)) {
+ statement.setString(1, contextId);
+ statement.setString(2, issueCode);
+
+ statement.executeUpdate();
+ } catch (SQLException e) {
+ throw new TroubleshooterException("Cannot remove issue from the database", e);
+ }
+ }
+
+ @Override
+ protected void startUp()
+ throws Exception {
+ scheduledExecutor = Executors.newScheduledThreadPool(1);
+ scheduledExecutor.scheduleAtFixedRate(this::cleanupOldIssues, configuration.cleanupInterval.toMillis(),
+ configuration.cleanupInterval.toMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ protected void shutDown()
+ throws Exception {
+ scheduledExecutor.shutdown();
+ }
+
+ private void cleanupOldIssues() {
+ try {
+ deleteIssuesOlderThan(ZonedDateTime.now().minus(configuration.deleteIssuesOlderThan));
+ deleteOldIssuesOverTheCount(configuration.maxIssuesToKeep);
+ } catch (Exception ex) {
+ log.warn("Failed to cleanup old issues", ex);
+ }
+ }
+
+ @VisibleForTesting
+ public void deleteIssuesOlderThan(ZonedDateTime olderThanDate)
+ throws SQLException {
+
+ try (Connection connection = databaseProvider.getDatasource().getConnection();
+ PreparedStatement statement = connection.prepareStatement("delete from issues where time < ?")) {
+
+ Instant deleteBefore = olderThanDate.withZoneSameInstant(ZoneOffset.UTC).toInstant();
+ statement.setTimestamp(1, new Timestamp(deleteBefore.toEpochMilli()));
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ int deletedIssues = statement.executeUpdate();
+ log.info("Deleted {} issues that are older than {} in {} ms", deletedIssues, deleteBefore,
+ stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ }
+ }
+
+ @VisibleForTesting
+ public void deleteOldIssuesOverTheCount(long maxIssuesToKeep)
+ throws SQLException {
+
+ try (Connection connection = databaseProvider.getDatasource().getConnection();
+ PreparedStatement countQuery = connection.prepareStatement("select count(*) from issues");
+ ResultSet resultSet = countQuery.executeQuery()) {
+
+ resultSet.next();
+ long totalIssueCount = resultSet.getLong(1);
+
+ long issuesToRemove = totalIssueCount - maxIssuesToKeep;
+ if (issuesToRemove <= 0) {
+ return;
+ }
+
+ // position is a table-wide auto-increment field. older issues will have smaller position.
+ try (PreparedStatement deleteStatement = connection
+ .prepareStatement("delete from issues order by position limit ?")) {
+
+ deleteStatement.setLong(1, issuesToRemove);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ int deletedIssues = deleteStatement.executeUpdate();
+ log.info("Deleted {} issues to keep the total issue count under {} in {} ms", deletedIssues, maxIssuesToKeep,
+ stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ }
+ }
+ }
+
+ @Builder
+ @Getter
+ @AllArgsConstructor
+ @NoArgsConstructor
+ public static class Configuration {
+ @Builder.Default
+ private Duration cleanupInterval = ServiceConfigKeys.DEFAULT_MYSQL_ISSUE_REPO_CLEANUP_INTERVAL;
+
+ @Builder.Default
+ private long maxIssuesToKeep = ServiceConfigKeys.DEFAULT_MYSQL_ISSUE_REPO_MAX_ISSUES_TO_KEEP;
+
+ @Builder.Default
+ private Duration deleteIssuesOlderThan = ServiceConfigKeys.DEFAULT_MYSQL_ISSUE_REPO_DELETE_ISSUES_OLDER_THAN;
+
+ @Inject
+ public Configuration(Config innerConfig) {
+ this(); // see https://github.com/projectlombok/lombok/issues/1347
+
+ if (innerConfig.hasPath(ServiceConfigKeys.MYSQL_ISSUE_REPO_CLEANUP_INTERVAL)) {
+ cleanupInterval = innerConfig.getDuration(ServiceConfigKeys.MYSQL_ISSUE_REPO_CLEANUP_INTERVAL);
+ }
+
+ if (innerConfig.hasPath(ServiceConfigKeys.MYSQL_ISSUE_REPO_MAX_ISSUES_TO_KEEP)) {
+ maxIssuesToKeep = innerConfig.getLong(ServiceConfigKeys.MYSQL_ISSUE_REPO_MAX_ISSUES_TO_KEEP);
+ }
+
+ if (innerConfig.hasPath(ServiceConfigKeys.MYSQL_ISSUE_REPO_DELETE_ISSUES_OLDER_THAN)) {
+ deleteIssuesOlderThan = innerConfig.getDuration(ServiceConfigKeys.MYSQL_ISSUE_REPO_DELETE_ISSUES_OLDER_THAN);
+ }
+ }
+ }
+}
diff --git a/gobblin-service/src/main/resources/org/apache/gobblin/service/db/migration/V1__AddIssuesTable.sql b/gobblin-service/src/main/resources/org/apache/gobblin/service/db/migration/V1__AddIssuesTable.sql
new file mode 100644
index 0000000..ccb9ff8
--- /dev/null
+++ b/gobblin-service/src/main/resources/org/apache/gobblin/service/db/migration/V1__AddIssuesTable.sql
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+create table issues
+(
+ context_id varchar(660) not null, -- context_id + code length should fit into mysql key size limits
+ code varchar(100) not null,
+ time datetime not null,
+ position bigint not null auto_increment, -- it is used to sort issues by insertion order
+ severity varchar(10) not null,
+ summary mediumtext not null,
+ details mediumtext null,
+ source_class varchar(1000) null,
+ exception_class varchar(1000) null,
+ properties json null,
+
+ constraint issues_pk
+ primary key (context_id, code),
+
+ key issues_position (position)
+);
+
+create index issues_time_index
+ on issues (time);
+
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 82eb270..2572fda 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -35,6 +35,7 @@ import org.eclipse.jgit.transport.RefSpec;
import org.eclipse.jgit.util.FS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MySQLContainer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -101,6 +102,8 @@ public class GobblinServiceManagerTest {
private GobblinServiceManager gobblinServiceManager;
private FlowConfigV2Client flowConfigClient;
+ private MySQLContainer mysql;
+
private Git gitForPush;
private TestingServer testingServer;
Properties serviceCoreProperties = new Properties();
@@ -114,6 +117,13 @@ public class GobblinServiceManagerTest {
public void setup() throws Exception {
cleanUpDir(SERVICE_WORK_DIR);
cleanUpDir(SPEC_STORE_PARENT_DIR);
+
+ mysql = new MySQLContainer("mysql:" + TestServiceDatabaseConfig.MysqlVersion);
+ mysql.start();
+ serviceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_URL_KEY, mysql.getJdbcUrl());
+ serviceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_USERNAME, mysql.getUsername());
+ serviceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_PASSWORD, mysql.getPassword());
+
ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
testingServer = new TestingServer(true);
@@ -212,6 +222,8 @@ public class GobblinServiceManagerTest {
} catch(Exception e) {
System.err.println("Failed to close ZK testing server.");
}
+
+ mysql.stop();
}
/**
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java b/gobblin-service/src/test/java/org/apache/gobblin/service/TestServiceDatabaseConfig.java
similarity index 62%
copy from gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
copy to gobblin-service/src/test/java/org/apache/gobblin/service/TestServiceDatabaseConfig.java
index e3eafbd..801ff3d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/TestServiceDatabaseConfig.java
@@ -15,24 +15,8 @@
* limitations under the License.
*/
-package org.apache.gobblin.runtime.troubleshooter;
-
-import java.util.List;
-
-/**
- * Stores issues from multiple jobs, flows and other contexts
- *
- * @see AutomaticTroubleshooter
- * */
-public interface MultiContextIssueRepository {
-
- List<Issue> getAll(String contextId)
- throws TroubleshooterException;
-
- void put(String contextId, Issue issue)
- throws TroubleshooterException;
-
- void remove(String contextId, String issueCode)
- throws TroubleshooterException;
+package org.apache.gobblin.service;
+public class TestServiceDatabaseConfig {
+ public static final String MysqlVersion = "8.0.20";
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index 4263b31..3a03be7 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.eclipse.jetty.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MySQLContainer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -47,6 +48,7 @@ import org.apache.gobblin.service.FlowConfigClient;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.Schedule;
import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.TestServiceDatabaseConfig;
import org.apache.gobblin.service.modules.utils.HelixUtils;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
@@ -100,6 +102,8 @@ public class GobblinServiceHATest {
private TestingServer testingZKServer;
+ private MySQLContainer mysql;
+
@BeforeClass
public void setup() throws Exception {
// Clean up common Flow Spec Dir
@@ -118,9 +122,17 @@ public class GobblinServiceHATest {
logger.info("Testing ZK Server listening on: " + testingZKServer.getConnectString());
HelixUtils.createGobblinHelixCluster(testingZKServer.getConnectString(), TEST_HELIX_CLUSTER_NAME);
+
ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
Properties commonServiceCoreProperties = new Properties();
+
+ mysql = new MySQLContainer("mysql:" + TestServiceDatabaseConfig.MysqlVersion);
+ mysql.start();
+ commonServiceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_URL_KEY, mysql.getJdbcUrl());
+ commonServiceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_USERNAME, mysql.getUsername());
+ commonServiceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_PASSWORD, mysql.getPassword());
+
commonServiceCoreProperties.put(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY, testingZKServer.getConnectString());
commonServiceCoreProperties.put(ServiceConfigKeys.HELIX_CLUSTER_NAME_KEY, TEST_HELIX_CLUSTER_NAME);
commonServiceCoreProperties.put(ServiceConfigKeys.HELIX_INSTANCE_NAME_KEY, "GaaS_" + UUID.randomUUID().toString());
@@ -237,6 +249,8 @@ public class GobblinServiceHATest {
}
cleanUpDir(COMMON_SPEC_STORE_PARENT_DIR);
+
+ mysql.stop();
}
@Test
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
index 003e318..4f6fbde 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
@@ -26,6 +26,7 @@ import org.apache.curator.test.TestingServer;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MySQLContainer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -47,6 +48,7 @@ import org.apache.gobblin.service.FlowConfigClient;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.Schedule;
import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.TestServiceDatabaseConfig;
import org.apache.gobblin.service.modules.utils.HelixUtils;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
@@ -106,6 +108,8 @@ public class GobblinServiceRedirectTest {
private Properties node1ServiceCoreProperties;
private Properties node2ServiceCoreProperties;
+ private MySQLContainer mysql;
+
@BeforeClass
public void setup() throws Exception {
port1 = Integer.toString(new PortUtils.ServerSocketPortLocator().random());
@@ -122,6 +126,13 @@ public class GobblinServiceRedirectTest {
ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
Properties commonServiceCoreProperties = new Properties();
+
+ mysql = new MySQLContainer("mysql:" + TestServiceDatabaseConfig.MysqlVersion);
+ mysql.start();
+ commonServiceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_URL_KEY, mysql.getJdbcUrl());
+ commonServiceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_USERNAME, mysql.getUsername());
+ commonServiceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_PASSWORD, mysql.getPassword());
+
commonServiceCoreProperties.put(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY, testingZKServer.getConnectString());
commonServiceCoreProperties.put(ServiceConfigKeys.HELIX_CLUSTER_NAME_KEY, TEST_HELIX_CLUSTER_NAME);
commonServiceCoreProperties.put(ServiceConfigKeys.HELIX_INSTANCE_NAME_KEY, "GaaS_" + UUID.randomUUID().toString());
@@ -210,6 +221,8 @@ public class GobblinServiceRedirectTest {
} catch (Exception e) {
logger.warn("Could not cleanly stop Testing Zookeeper", e);
}
+
+ mysql.stop();
}
@Test
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepositoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepositoryTest.java
new file mode 100644
index 0000000..86fabfd
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepositoryTest.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.troubleshooter;
+
+import java.time.Duration;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.StringUtils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.testcontainers.containers.MySQLContainer;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Stopwatch;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.TestServiceDatabaseConfig;
+import org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
+import org.apache.gobblin.service.modules.db.ServiceDatabaseProviderImpl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+
+public class MySqlMultiContextIssueRepositoryTest {
+ private int testId = 1;
+ private MySQLContainer mysql;
+ private ServiceDatabaseProviderImpl databaseProvider;
+ private ServiceDatabaseManager databaseManager;
+ private MySqlMultiContextIssueRepository repository;
+
+ @BeforeMethod
+ public void setup() {
+ testId++;
+ }
+
+ @BeforeClass
+ public void classSetUp() {
+ mysql = new MySQLContainer("mysql:" + TestServiceDatabaseConfig.MysqlVersion);
+ mysql.start();
+
+ ServiceDatabaseProviderImpl.Configuration dbConfig =
+ ServiceDatabaseProviderImpl.Configuration.builder().url(mysql.getJdbcUrl()).userName(mysql.getUsername())
+ .password(mysql.getPassword()).build();
+
+ databaseProvider = new ServiceDatabaseProviderImpl(dbConfig);
+
+ databaseManager = new ServiceDatabaseManager(databaseProvider);
+ databaseManager.startAsync().awaitRunning();
+ repository = new MySqlMultiContextIssueRepository(databaseProvider);
+ }
+
+ @AfterClass
+ public void classTearDown() {
+ databaseManager.stopAsync().awaitTerminated();
+ mysql.stop();
+ }
+
+ @Test
+ public void canReadEmptyRepository()
+ throws Exception {
+ List<Issue> issues = repository.getAll("test-nonexistent");
+ assertThat(issues).isEmpty();
+ }
+
+ @Test
+ public void canCreateWithEmptyConfiguration()
+ throws Exception {
+ MySqlMultiContextIssueRepository.Configuration configuration =
+ new MySqlMultiContextIssueRepository.Configuration(ConfigFactory.empty());
+
+ MySqlMultiContextIssueRepository newRepo = new MySqlMultiContextIssueRepository(databaseProvider, configuration);
+
+ newRepo.startAsync().awaitRunning();
+
+ List<Issue> issues = newRepo.getAll("test-nonexistent");
+ assertThat(issues).isEmpty();
+
+ newRepo.stopAsync().awaitTerminated();
+ }
+
+ @Test
+ public void canPutAndGetFullIssue()
+ throws Exception {
+ HashMap<String, String> properties = new HashMap<>();
+ properties.put("test.prop1", "test value 1");
+ properties.put("test.prop2", "test value 2");
+
+ // Mysql date has less precision than Java date, so we zero sub-second component of the date to get the same
+ // value after retrieval from db
+
+ Issue issue = Issue.builder().summary("Test summary \" ' -- ").code("CODE1")
+ .time(ZonedDateTime.now().withNano(0).withZoneSameInstant(ZoneOffset.UTC)).severity(IssueSeverity.ERROR)
+ .details("details for test issue").exceptionClass("java.io.IOException")
+ .sourceClass("org.apache.gobblin.service.modules.troubleshooter.AutoTroubleshooterLogAppender")
+ .properties(properties).build();
+
+ String contextId = "context-" + testId;
+ repository.put(contextId, issue);
+
+ List<Issue> issues = repository.getAll(contextId);
+
+ assertThat(issues).hasSize(1);
+ assertThat(issues.get(0)).usingRecursiveComparison().isEqualTo(issue);
+ }
+
+ @Test
+ public void canPutAndGetMinimalIssue()
+ throws Exception {
+ Issue issue = Issue.builder().summary("Test summary").code("CODE1")
+ .time(ZonedDateTime.now().withNano(0).withZoneSameInstant(ZoneOffset.UTC)).severity(IssueSeverity.WARN).build();
+
+ String contextId = "context-" + testId;
+ repository.put(contextId, issue);
+
+ List<Issue> issues = repository.getAll(contextId);
+
+ assertThat(issues).hasSize(1);
+ assertThat(issues.get(0)).usingRecursiveComparison().isEqualTo(issue);
+ }
+
+ @Test
+ public void canPutIssueWithMaximumFieldLengths()
+ throws Exception {
+ // summary and details are bounded at 16MB, so we just put reasonably large values there
+ Issue issue = Issue.builder().summary(StringUtils.repeat("s", 100000)).details(StringUtils.repeat("s", 100000))
+ .code(StringUtils.repeat("c", Issue.MAX_ISSUE_CODE_LENGTH))
+ .exceptionClass(StringUtils.repeat("e", Issue.MAX_CLASSNAME_LENGTH))
+ .sourceClass(StringUtils.repeat("s", Issue.MAX_CLASSNAME_LENGTH))
+ .time(ZonedDateTime.now().withNano(0).withZoneSameInstant(ZoneOffset.UTC)).severity(IssueSeverity.WARN).build();
+
+ String contextId = TroubleshooterUtils
+ .getContextIdForJob(StringUtils.repeat("g", ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH),
+ StringUtils.repeat("f", ServiceConfigKeys.MAX_FLOW_NAME_LENGTH),
+ String.valueOf(Long.MAX_VALUE),
+ StringUtils.repeat("j", ServiceConfigKeys.MAX_JOB_NAME_LENGTH));
+
+ repository.put(contextId, issue);
+
+ List<Issue> issues = repository.getAll(contextId);
+
+ assertThat(issues).hasSize(1);
+ assertThat(issues.get(0)).usingRecursiveComparison().isEqualTo(issue);
+ }
+
+ @Test
+ public void willGetMeaningfulErrorOnOversizedData()
+ throws Exception {
+ Issue issue = Issue.builder().summary("Test summary").code(StringUtils.repeat("c", Issue.MAX_ISSUE_CODE_LENGTH * 2))
+ .time(ZonedDateTime.now().withNano(0).withZoneSameInstant(ZoneOffset.UTC)).severity(IssueSeverity.WARN).build();
+
+ String contextId = "context-" + testId;
+
+ assertThatThrownBy(() -> {
+ repository.put(contextId, issue);
+ }).isInstanceOf(TroubleshooterException.class).getRootCause()
+ .hasMessageContaining("Data too long for column 'code'");
+ }
+
+ @Test
+ public void willRollbackWhenSomeIssuesAreInvalid()
+ throws Exception {
+ Issue validIssue = getTestIssue("test", "test1");
+ Issue invalidIssue =
+ Issue.builder().summary("Test summary").code(StringUtils.repeat("c", Issue.MAX_ISSUE_CODE_LENGTH * 2))
+ .time(ZonedDateTime.now().withNano(0).withZoneSameInstant(ZoneOffset.UTC)).severity(IssueSeverity.WARN)
+ .build();
+
+ String contextId = "context-" + testId;
+
+ try {
+ repository.put(contextId, Arrays.asList(validIssue, invalidIssue));
+ } catch (TroubleshooterException ex) {
+ // exception is expected
+ }
+ List<Issue> issues = repository.getAll(contextId);
+
+ assertThat(issues).isEmpty();
+ }
+
+ @Test
+ public void canPutIssueRepeatedly()
+ throws Exception {
+ Issue issue = getTestIssue("test", "test1");
+
+ String contextId = "context-" + testId;
+
+ repository.put(contextId, issue);
+ repository.put(contextId, issue);
+
+ List<Issue> issues = repository.getAll(contextId);
+
+ assertThat(issues).hasSize(1);
+ assertThat(issues.get(0)).usingRecursiveComparison().isEqualTo(issue);
+ }
+
+ @Test
+ public void canPutAndGetMultipleIssues()
+ throws Exception {
+ Issue issue1 = getTestIssue("test-1", "code1");
+ Issue issue2 = getTestIssue("test-2", "code2");
+ Issue issue3 = getTestIssue("test-3", "code3");
+ repository.put("context-1-" + testId, issue1);
+ repository.put("context-1-" + testId, issue2);
+
+ repository.put("context-2-" + testId, issue2);
+ repository.put("context-2-" + testId, issue3);
+
+ List<Issue> context1Issues = repository.getAll("context-1-" + testId);
+ assertThat(context1Issues).hasSize(2);
+ assertThat(context1Issues.get(0)).usingRecursiveComparison().isEqualTo(issue1);
+ assertThat(context1Issues.get(1)).usingRecursiveComparison().isEqualTo(issue2);
+
+ List<Issue> context2Issues = repository.getAll("context-2-" + testId);
+ assertThat(context2Issues).hasSize(2);
+ assertThat(context2Issues.get(0)).usingRecursiveComparison().isEqualTo(issue2);
+ assertThat(context2Issues.get(1)).usingRecursiveComparison().isEqualTo(issue3);
+ }
+
+ @Test
+ public void canRemoveIssue()
+ throws Exception {
+ Issue issue1 = getTestIssue("test-1", "code1");
+ Issue issue2 = getTestIssue("test-2", "code2");
+ Issue issue3 = getTestIssue("test-3", "code3");
+
+ String contextId = "context-1-" + testId;
+ repository.put(contextId, issue1);
+ repository.put(contextId, issue2);
+ repository.put(contextId, issue3);
+
+ repository.remove(contextId, issue2.getCode());
+
+ List<Issue> issues = repository.getAll(contextId);
+ assertThat(issues).hasSize(2);
+ assertThat(issues.get(0)).usingRecursiveComparison().isEqualTo(issue1);
+ assertThat(issues.get(1)).usingRecursiveComparison().isEqualTo(issue3);
+ }
+
+ @Test
+ public void willPreserveIssueOrder()
+ throws Exception {
+ Random random = new Random(1);
+
+ List<Issue> issues = new ArrayList<>();
+
+ String contextId = "context-" + testId;
+ for (int i = 0; i < 100; i++) {
+ Issue issue = getTestIssue("test-" + random.nextInt(), "code-" + random.nextInt());
+ issues.add(issue);
+ repository.put(contextId, issue);
+ }
+
+ List<Issue> retrievedIssues = repository.getAll(contextId);
+ assertThat(retrievedIssues).usingRecursiveComparison().isEqualTo(issues);
+ }
+
+ @Test
+ public void canRemoveIssuesAboveSpecifiedCount()
+ throws Exception {
+ String contextId = "context-" + testId;
+ for (int i = 0; i < 100; i++) {
+ Issue issue = getTestIssue("test-" + i, "code-" + i);
+ repository.put(contextId, issue);
+ }
+
+ repository.deleteOldIssuesOverTheCount(20);
+
+ List<Issue> retrievedIssues = repository.getAll(contextId);
+ assertThat(retrievedIssues).hasSize(20);
+ assertThat(retrievedIssues.get(0).getCode()).isEqualTo("code-80");
+ assertThat(retrievedIssues.get(19).getCode()).isEqualTo("code-99");
+ }
+
+ @Test
+ public void canRemoveOlderIssues()
+ throws Exception {
+ String contextId = "context-" + testId;
+ int issueCount = 100;
+ ZonedDateTime startTime = ZonedDateTime.now().withNano(0).withZoneSameInstant(ZoneOffset.UTC);
+ for (int i = 0; i < 100; i++) {
+ Issue issue = Issue.builder().summary("test summary").code("code-" + i)
+ .time(startTime.minus(Duration.ofDays(issueCount - i))).severity(IssueSeverity.ERROR).build();
+ repository.put(contextId, issue);
+ }
+
+ repository.deleteIssuesOlderThan(startTime.minus(Duration.ofDays(20).plus(Duration.ofHours(1))));
+
+ List<Issue> retrievedIssues = repository.getAll(contextId);
+ assertThat(retrievedIssues).hasSize(20);
+ }
+
+ @Test(enabled = false) // Load test takes several minutes to run and is disabled by default
+ public void canWriteLotsOfIssuesConcurrently()
+ throws Exception {
+ canWriteLotsOfIssuesConcurrently(false);
+ }
+
+ @Test(enabled = false) // Load test takes several minutes to run and is disabled by default
+ public void canWriteLotsOfIssuesConcurrentlyWithBatching()
+ throws Exception {
+ canWriteLotsOfIssuesConcurrently(true);
+ }
+
+ private void canWriteLotsOfIssuesConcurrently(boolean useBatching)
+ throws Exception {
+ int threadCount = 10;
+ int contextsPerThread = 100;
+ int issuesPerContext = 10;
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ ConcurrentHashSet<Exception> exceptions = new ConcurrentHashSet<>();
+ ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);
+
+ for (int i = 0; i < threadCount; i++) {
+ int threadId = i;
+ forkJoinPool.submit(() -> {
+ try {
+ runLoadTestThread(repository, threadId, contextsPerThread, issuesPerContext, useBatching);
+ } catch (Exception ex) {
+ exceptions.add(ex);
+ }
+ });
+ }
+
+ forkJoinPool.shutdown();
+ assertThat(forkJoinPool.awaitTermination(30, TimeUnit.MINUTES)).isTrue();
+
+ if (!exceptions.isEmpty()) {
+ throw exceptions.stream().findFirst().get();
+ }
+
+ int totalIssues = threadCount * contextsPerThread * issuesPerContext;
+ System.out.printf("Created %d issues in %d ms. Speed: %d issues/second%n", totalIssues,
+ stopwatch.elapsed(TimeUnit.MILLISECONDS), totalIssues / stopwatch.elapsed(TimeUnit.SECONDS));
+ }
+
+ private void runLoadTestThread(MySqlMultiContextIssueRepository repository, int threadNumber, int contextsPerThread,
+ int issuesPerContext, boolean useBatching) {
+ Random random = new Random(threadNumber);
+ try {
+ for (int i = 0; i < contextsPerThread; i++) {
+ String contextId = "load-test-" + testId + "-thread-" + threadNumber + "-context-" + i;
+ List<Issue> issues = new ArrayList<>();
+ for (int j = 0; j < issuesPerContext; j++) {
+ Issue issue = getLargeTestIssue("load-test-1-" + random.nextInt(), "code-" + random.nextInt());
+ issues.add(issue);
+ if (!useBatching) {
+ repository.put(contextId, issue);
+ }
+ }
+ if (useBatching) {
+ repository.put(contextId, issues);
+ }
+ List<Issue> retrievedIssues = repository.getAll(contextId);
+ assertThat(retrievedIssues).usingRecursiveComparison().isEqualTo(issues);
+ }
+ } catch (TroubleshooterException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Issue getTestIssue(String summary, String code) {
+ return Issue.builder().summary(summary).code(code)
+ .time(ZonedDateTime.now().withNano(0).withZoneSameInstant(ZoneOffset.UTC)).severity(IssueSeverity.ERROR)
+ .details("test details for " + summary).build();
+ }
+
+ private Issue getLargeTestIssue(String summary, String code) {
+ HashMap<String, String> properties = new HashMap<>();
+ for (int i = 0; i < 5; i++) {
+ properties.put("test.property" + i, RandomStringUtils.random(100));
+ }
+
+ Issue.IssueBuilder issue = Issue.builder();
+ issue.summary(summary);
+ issue.code(code);
+ issue.time(ZonedDateTime.now().withNano(0).withZoneSameInstant(ZoneOffset.UTC));
+ issue.severity(IssueSeverity.ERROR);
+ issue.details(RandomStringUtils.random(3000));
+ issue.sourceClass(RandomStringUtils.random(100));
+ issue.exceptionClass(RandomStringUtils.random(100));
+ issue.properties(properties);
+
+ return issue.build();
+ }
+}
\ No newline at end of file
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index 4a7c480..9bf055c 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -23,6 +23,7 @@ dependencyManagement {
ext.externalDependency = [
"antlrRuntime": "org.antlr:antlr-runtime:3.5.2",
+ "assertj": "org.assertj:assertj-core:3.20.2",
"avro": "org.apache.avro:avro:" + avroVersion,
"avroMapredH2": "org.apache.avro:avro-mapred:" + avroVersion,
"awsCore": "com.amazonaws:aws-java-sdk-core:" + awsVersion,
@@ -35,6 +36,7 @@ ext.externalDependency = [
"commonsCli": "commons-cli:commons-cli:1.3.1",
"commonsCodec": "commons-codec:commons-codec:1.10",
"commonsDbcp": "commons-dbcp:commons-dbcp:1.4",
+ "commonsDbcp2": "org.apache.commons:commons-dbcp2:2.8.0",
"commonsEmail": "org.apache.commons:commons-email:1.4",
"commonsLang": "commons-lang:commons-lang:2.6",
"commonsLang3": "org.apache.commons:commons-lang3:3.4",
@@ -129,7 +131,7 @@ ext.externalDependency = [
"lombok":"org.projectlombok:lombok:1.18.16",
"mockRunnerJdbc":"com.mockrunner:mockrunner-jdbc:1.0.8",
"xerces":"xerces:xercesImpl:2.11.0",
- "typesafeConfig": "com.typesafe:config:1.2.1",
+ "typesafeConfig": "com.typesafe:config:1.4.1",
"byteman": "org.jboss.byteman:byteman:" + bytemanVersion,
"bytemanBmunit": "org.jboss.byteman:byteman-bmunit:" + bytemanVersion,
"bcpgJdk15on": "org.bouncycastle:bcpg-jdk15on:1.52",
@@ -168,7 +170,7 @@ ext.externalDependency = [
"reflections" : "org.reflections:reflections:0.9.10",
"embeddedProcess": "de.flapdoodle.embed:de.flapdoodle.embed.process:1.50.2",
"testMysqlServer": "com.wix:wix-embedded-mysql:4.6.1",
- "flyway": "org.flywaydb:flyway-core:3.2.1",
+ "flyway": "org.flywaydb:flyway-core:7.9.2",
"oltu": "org.apache.oltu.oauth2:org.apache.oltu.oauth2.client:1.0.2",
"googleAnalytics": "com.google.apis:google-api-services-analytics:v3-rev134-1.22.0",
"googleDrive": "com.google.apis:google-api-services-drive:v3-rev42-1.22.0",
@@ -204,7 +206,8 @@ ext.externalDependency = [
"org.slf4j:slf4j-log4j12:" + slf4jVersion
],
"postgresConnector": "org.postgresql:postgresql:42.1.4",
- "assertj": 'org.assertj:assertj-core:3.8.0',
+ "testContainers": "org.testcontainers:testcontainers:1.15.3",
+ "testContainersMysql": "org.testcontainers:mysql:1.15.3"
]
if (!isDefaultEnvironment)