You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/10 18:59:33 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-725] add a
mysql based job status retriever
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b28ddaf [GOBBLIN-725] add a mysql based job status retriever
b28ddaf is described below
commit b28ddaf16948dd270b43ac2fc2b27824d4b6a5f8
Author: Arjun <ab...@linkedin.com>
AuthorDate: Wed Apr 10 11:59:27 2019 -0700
[GOBBLIN-725] add a mysql based job status retriever
Closes #2592 from
arjun4084346/jobstatusstoretomysql
---
.../gobblin/configuration/ConfigurationKeys.java | 1 +
.../metastore/FileContextBasedFsStateStore.java | 1 +
.../metastore/MysqlJobStatusStateStore.java | 58 +++++++++
.../metastore/MysqlJobStatusStateStoreFactory.java | 47 ++++++++
.../org/apache/gobblin/kafka/KafkaTestBase.java | 4 +
.../metrics/kafka/KafkaEventKeyValueReporter.java | 2 +-
.../org/apache/gobblin/service/FlowStatusTest.java | 7 ++
.../service/monitoring/JobStatusRetriever.java | 36 +++++-
gobblin-service/build.gradle | 1 +
.../service/monitoring/FsJobStatusRetriever.java | 49 ++------
.../service/monitoring/KafkaJobStatusMonitor.java | 29 ++++-
.../monitoring/MysqlJobStatusRetriever.java | 103 ++++++++++++++++
.../service/modules/core/GobblinServiceHATest.java | 30 ++---
.../modules/core/GobblinServiceManagerTest.java | 14 +++
.../monitoring/FsJobStatusRetrieverTest.java | 129 +--------------------
...rieverTest.java => JobStatusRetrieverTest.java} | 119 ++++++++-----------
.../monitoring/KafkaAvroJobStatusMonitorTest.java | 8 +-
.../monitoring/MysqlJobStatusRetrieverTest.java | 54 +++++++++
18 files changed, 425 insertions(+), 267 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 75fbc35..3b30c43 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -50,6 +50,7 @@ public class ConfigurationKeys {
public static final String STATE_STORE_TYPE_KEY = "state.store.type";
public static final String DATASET_STATE_STORE_PREFIX = "dataset";
public static final String DATASET_STATE_STORE_TYPE_KEY = DATASET_STATE_STORE_PREFIX + ".state.store.type";
+ public static final String STATE_STORE_FACTORY_CLASS_KEY = "stateStoreFactoryClass";
public static final String INTERMEDIATE_STATE_STORE_PREFIX = "intermediate";
public static final String INTERMEDIATE_STATE_STORE_TYPE_KEY = INTERMEDIATE_STATE_STORE_PREFIX + ".state.store.type";
public static final String DEFAULT_STATE_STORE_TYPE = "fs";
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStore.java
index ade7a50..1fb74a4 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStore.java
@@ -49,6 +49,7 @@ import org.apache.gobblin.util.HadoopUtils;
public class FileContextBasedFsStateStore<T extends State> extends FsStateStore<T> {
private FileContext fc;
+
public FileContextBasedFsStateStore(String fsUri, String storeRootDir, Class stateClass)
throws IOException {
super(fsUri, storeRootDir, stateClass);
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
new file mode 100644
index 0000000..e84df60
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import java.io.IOException;
+import java.util.List;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.configuration.State;
+
+/**
+ * An implementation of {@link MysqlStateStore} backed by MySQL to store JobStatuses.
+ *
+ * @param <T> state object type
+ **/
+public class MysqlJobStatusStateStore<T extends State> extends MysqlStateStore {
+ /**
+ * Manages the persistence and retrieval of {@link State} in a MySQL database
+ * @param dataSource the {@link DataSource} object for connecting to MySQL
+ * @param stateStoreTableName the table for storing the state in rows keyed by two levels (store_name, table_name)
+ * @param compressedValues should values be compressed for storage?
+ * @param stateClass class of the {@link State}s stored in this state store
+ * @throws IOException
+ */
+ public MysqlJobStatusStateStore(DataSource dataSource, String stateStoreTableName, boolean compressedValues,
+ Class<T> stateClass)
+ throws IOException {
+ super(dataSource, stateStoreTableName, compressedValues, stateClass);
+ }
+
+ /**
+ * Returns all the job statuses for a flow group, flow name, flow execution id
+ * @param storeName
+ * @param flowExecutionId
+ * @return
+ * @throws IOException
+ */
+ public List<T> getAll(String storeName, long flowExecutionId) throws IOException {
+ return getAll(storeName, flowExecutionId + "%", true);
+ }
+
+}
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreFactory.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreFactory.java
new file mode 100644
index 0000000..f571046
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import org.apache.commons.dbcp.BasicDataSource;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class MysqlJobStatusStateStoreFactory extends MysqlStateStoreFactory {
+ @Override
+ public <T extends State> MysqlJobStatusStateStore<T> createStateStore(Config config, Class<T> stateClass) {
+ String stateStoreTableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
+ ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
+ boolean compressedValues = ConfigUtils.getBoolean(config, ConfigurationKeys.STATE_STORE_COMPRESSED_VALUES_KEY,
+ ConfigurationKeys.DEFAULT_STATE_STORE_COMPRESSED_VALUES);
+
+ try {
+ BasicDataSource basicDataSource = MysqlDataSourceFactory.get(config,
+ SharedResourcesBrokerFactory.getImplicitBroker());
+
+ return new MysqlJobStatusStateStore(basicDataSource, stateStoreTableName, compressedValues, stateClass);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create MysqlStateStore with factory", e);
+ }
+ }
+}
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java
index a89f29c..3dd0cd0 100644
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java
+++ b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java
@@ -247,5 +247,9 @@ public class KafkaTestBase implements Closeable {
public int getKafkaServerPort() {
return _kafkaServerSuite.getKafkaServerPort();
}
+
+ public String getZkConnectString() {
+ return _kafkaServerSuite.getZkConnectString();
+ }
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java
index cfe0dbc..983643e 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java
@@ -63,7 +63,7 @@ public class KafkaEventKeyValueReporter extends KafkaEventReporter {
if (nextEvent.getMetadata().containsKey(keyPart)) {
sb.append(nextEvent.getMetadata().get(keyPart));
} else {
- log.error("{} not found in the GobblinTrackingEvent. Setting key to null.", keyPart);
+ log.debug("{} not found in the GobblinTrackingEvent. Setting key to null.", keyPart);
sb = null;
break;
}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index f339eb2..c908b83 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -39,6 +39,8 @@ import com.google.inject.name.Names;
import com.linkedin.restli.server.resources.BaseResource;
import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.restli.EmbeddedRestliServer;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
@@ -64,6 +66,11 @@ public class FlowStatusTest {
}
@Override
+ public StateStore<State> getStateStore() {
+ return null;
+ }
+
+ @Override
public List<Long> getLatestExecutionIdsForFlow(String flowName, String flowGroup, int count) {
if (_listOfJobStatusLists == null) {
return null;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 7f60d61..91bfa7e 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -22,7 +22,13 @@ import java.util.List;
import com.google.common.collect.Iterators;
+import lombok.Getter;
+
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.event.TimingEvent;
/**
@@ -32,7 +38,6 @@ import org.apache.gobblin.annotation.Alpha;
public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker {
public static final String EVENT_NAME_FIELD = "eventName";
public static final String NA_KEY = "NA";
- public static final String STATE_STORE_KEY_SEPARATION_CHARACTER = ".";
public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup,
long flowExecutionId);
@@ -55,4 +60,33 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
return latestExecutionId == -1L ? Iterators.<JobStatus>emptyIterator()
: getJobStatusesForFlowExecution(flowName, flowGroup, latestExecutionId);
}
+
+
+ /**
+ *
+ * @param jobState instance of {@link State}
+ * @return deserialize {@link State} into a {@link JobStatus}.
+ */
+ protected JobStatus getJobStatus(State jobState) {
+ String flowGroup = jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+ String flowName = jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+ long flowExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
+ String jobName = jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+ String jobGroup = jobState.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+ long jobExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, "0"));
+ String eventName = jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ long startTime = Long.parseLong(jobState.getProp(TimingEvent.METADATA_START_TIME, "0"));
+ long endTime = Long.parseLong(jobState.getProp(TimingEvent.METADATA_END_TIME, "0"));
+ String message = jobState.getProp(TimingEvent.METADATA_MESSAGE, "");
+ String lowWatermark = jobState.getProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, "");
+ String highWatermark = jobState.getProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, "");
+ long processedCount = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.PROCESSED_COUNT_FIELD, "0"));
+
+ return JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
+ jobName(jobName).jobGroup(jobGroup).jobExecutionId(jobExecutionId).eventName(eventName).
+ lowWatermark(lowWatermark).highWatermark(highWatermark).startTime(startTime).endTime(endTime).
+ message(message).processedCount(processedCount).build();
+ }
+
+ public abstract StateStore<State> getStateStore();
}
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index d7e8ea2..ab56a3e 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -72,6 +72,7 @@ dependencies {
testCompile project(":gobblin-example")
testCompile project(path: ":gobblin-modules:gobblin-kafka-08:", configuration: "tests")
+ testCompile project(path: ":gobblin-metastore", configuration: "testFixtures")
testCompile project(":gobblin-test-utils")
testCompile externalDependency.byteman
testCompile externalDependency.bytemanBmunit
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index 8312d69..f35e076 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -67,7 +67,7 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
Preconditions.checkArgument(flowGroup != null, "FlowGroup cannot be null");
Predicate<String> flowExecutionIdPredicate = input -> input.startsWith(String.valueOf(flowExecutionId) + ".");
- String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
+ String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
try {
List<JobStatus> jobStatuses = new ArrayList<>();
List<String> tableNames = this.stateStore.getTableNames(storeName, flowExecutionIdPredicate);
@@ -96,9 +96,8 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
Preconditions.checkArgument(jobGroup != null, "jobGroup cannot be null");
try {
- String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
- String tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName,
- KafkaJobStatusMonitor.STATE_STORE_TABLE_SUFFIX);
+ String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
+ String tableName = KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup, jobName);
List<State> jobStates = this.stateStore.getAll(storeName, tableName);
if (jobStates.isEmpty()) {
return Iterators.emptyIterator();
@@ -122,13 +121,11 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
Preconditions.checkArgument(count > 0, "Number of execution ids must be at least 1.");
try {
- String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
+ String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
List<String> tableNames = this.stateStore.getTableNames(storeName, input -> true);
- if (tableNames.isEmpty()) {
- return null;
- }
- Set<Long> flowExecutionIds =
- new TreeSet<>(tableNames.stream().map(this::getExecutionIdFromTableName).collect(Collectors.toList())).descendingSet();
+ Set<Long> flowExecutionIds = new TreeSet<>(tableNames.stream()
+ .map(KafkaJobStatusMonitor::getExecutionIdFromTableName)
+ .collect(Collectors.toList())).descendingSet();
return ImmutableList.copyOf(Iterables.limit(flowExecutionIds, count));
} catch (Exception e) {
return null;
@@ -136,36 +133,6 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
}
/**
- *
- * @param jobState instance of {@link State}
- * @return deserialize {@link State} into a {@link JobStatus}.
- */
- private JobStatus getJobStatus(State jobState) {
- String flowGroup = jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
- String flowName = jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
- long flowExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
- String jobName = jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
- String jobGroup = jobState.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
- long jobExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, "0"));
- String eventName = jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
- long startTime = Long.parseLong(jobState.getProp(TimingEvent.METADATA_START_TIME, "0"));
- long endTime = Long.parseLong(jobState.getProp(TimingEvent.METADATA_END_TIME, "0"));
- String message = jobState.getProp(TimingEvent.METADATA_MESSAGE, "");
- String lowWatermark = jobState.getProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, "");
- String highWatermark = jobState.getProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, "");
- long processedCount = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.PROCESSED_COUNT_FIELD, "0"));
-
- return JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
- jobName(jobName).jobGroup(jobGroup).jobExecutionId(jobExecutionId).eventName(eventName).
- lowWatermark(lowWatermark).highWatermark(highWatermark).startTime(startTime).endTime(endTime).
- message(message).processedCount(processedCount).build();
- }
-
- private long getExecutionIdFromTableName(String tableName) {
- return Long.parseLong(Splitter.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(0));
- }
-
- /**
* A helper method to determine if {@link JobStatus}es for jobs without a jobGroup/jobName should be filtered out.
* Once a job has been orchestrated, {@link JobStatus}es without a jobGroup/jobName can be filtered out.
* @param tableNames
@@ -174,6 +141,6 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
*/
private boolean shouldFilterJobStatus(List<String> tableNames, String tableName) {
return tableNames.size() > 1 && JobStatusRetriever.NA_KEY
- .equals(Splitter.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(1));
+ .equals(Splitter.on(KafkaJobStatusMonitor.STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(1));
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 179a107..1675dea 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -32,8 +33,10 @@ import kafka.message.MessageAndMetadata;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
+import org.apache.gobblin.metastore.MysqlStateStoreFactory;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metastore.util.StateStoreCleanerRunnable;
import org.apache.gobblin.metrics.event.TimingEvent;
@@ -53,16 +56,15 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
//We use table suffix that is different from the Gobblin job state store suffix of jst to avoid confusion.
//gst refers to the state store suffix for GaaS-orchestrated Gobblin jobs.
public static final String STATE_STORE_TABLE_SUFFIX = "gst";
+ public static final String STATE_STORE_KEY_SEPARATION_CHARACTER = ".";
static final String JOB_STATUS_MONITOR_TOPIC_KEY = "topic";
static final String JOB_STATUS_MONITOR_NUM_THREADS_KEY = "numThreads";
static final String JOB_STATUS_MONITOR_CLASS_KEY = "class";
static final String DEFAULT_JOB_STATUS_MONITOR_CLASS = KafkaAvroJobStatusMonitor.class.getName();
- static final String STATE_STORE_FACTORY_CLASS_KEY = "stateStoreFactoryClass";
private static final String KAFKA_AUTO_OFFSET_RESET_KEY = "auto.offset.reset";
private static final String KAFKA_AUTO_OFFSET_RESET_SMALLEST = "smallest";
- private static final String KAFKA_AUTO_OFFSET_RESET_LARGEST = "largest";
@Getter
private final StateStore<org.apache.gobblin.configuration.State> stateStore;
@@ -73,7 +75,7 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
public KafkaJobStatusMonitor(String topic, Config config, int numThreads)
throws ReflectiveOperationException {
super(topic, config.withFallback(DEFAULTS), numThreads);
- String stateStoreFactoryClass = ConfigUtils.getString(config, STATE_STORE_FACTORY_CLASS_KEY, FileContextBasedFsStateStoreFactory.class.getName());
+ String stateStoreFactoryClass = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, FileContextBasedFsStateStoreFactory.class.getName());
this.stateStore =
((StateStore.Factory) Class.forName(stateStoreFactoryClass).newInstance()).createStateStore(config, org.apache.gobblin.configuration.State.class);
@@ -125,13 +127,30 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
throws IOException {
String flowName = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
String flowGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
- String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
String flowExecutionId = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
String jobName = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,JobStatusRetriever.NA_KEY);
String jobGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY);
- String tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName, STATE_STORE_TABLE_SUFFIX);
+
+ String storeName = jobStatusStoreName(flowGroup, flowName);
+ String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);
this.stateStore.put(storeName, tableName, jobStatus);
}
+ public static String jobStatusTableName(String flowExecutionId, String jobGroup, String jobName) {
+ return Joiner.on(STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName, STATE_STORE_TABLE_SUFFIX);
+ }
+
+ public static String jobStatusTableName(long flowExecutionId, String jobGroup, String jobName) {
+ return jobStatusTableName(String.valueOf(flowExecutionId), jobGroup, jobName);
+ }
+
+ public static String jobStatusStoreName(String flowGroup, String flowName) {
+ return Joiner.on(STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
+ }
+
+ public static long getExecutionIdFromTableName(String tableName) {
+ return Long.parseLong(Splitter.on(STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(0));
+ }
+
public abstract org.apache.gobblin.configuration.State parseJobStatus(byte[] message) throws IOException;
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
new file mode 100644
index 0000000..627e421
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
+import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+/**
+ * Mysql based Retriever for {@link JobStatus}.
+ */
+public class MysqlJobStatusRetriever extends JobStatusRetriever {
+
+ public static final String CONF_PREFIX = "mysqlJobStatusRetriever";
+ @Getter
+ private MysqlJobStatusStateStore<State> stateStore;
+
+ public MysqlJobStatusRetriever(Config config) throws ReflectiveOperationException {
+ config = config.getConfig(CONF_PREFIX).withFallback(config);
+ this.stateStore = (MysqlJobStatusStateStoreFactory.class.newInstance()).createStateStore(config, State.class);
+ }
+
+ @Override
+ public Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup, long flowExecutionId) {
+ String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
+ try {
+ List<State> jobStatusStates = this.stateStore.getAll(storeName, flowExecutionId);
+ return filterAndGetJobStatuses(jobStatusStates);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup, long flowExecutionId,
+ String jobName, String jobGroup) {
+ String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
+ String tableName = KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup, jobName);
+
+ try {
+ List<State> jobStatusStates = this.stateStore.getAll(storeName, tableName);
+ return filterAndGetJobStatuses(jobStatusStates);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public List<Long> getLatestExecutionIdsForFlow(String flowName, String flowGroup, int count) {
+ String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
+
+ try {
+ List<State> jobStatusStates = this.stateStore.getAll(storeName);
+ List<Long> flowExecutionIds = jobStatusStates.stream()
+ .map(state -> Long.parseLong(state.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)))
+ .collect(Collectors.toList());
+ return ImmutableList.copyOf(Iterables.limit(new TreeSet<>(flowExecutionIds).descendingSet(), count));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Iterator<JobStatus> filterAndGetJobStatuses(List<State> jobStatusStates) {
+ int totalJobStatuses = jobStatusStates.size();
+
+ return jobStatusStates.stream()
+ .filter(state -> !(totalJobStatuses > 1 && state.getProp(JobStatusRetriever.EVENT_NAME_FIELD)
+ .equals(ExecutionStatus.COMPILED.name())))
+ .map(this::getJobStatus)
+ .iterator();
+ }
+}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index 11b2c62..efb1a6d 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -34,23 +34,18 @@ import org.testng.annotations.Test;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
import com.linkedin.data.template.StringMap;
import com.linkedin.restli.client.RestLiResponseException;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
-import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
-import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
+import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.service.FlowConfig;
import org.apache.gobblin.service.FlowConfigClient;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.Schedule;
import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.utils.HelixUtils;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
@@ -59,7 +54,6 @@ import org.apache.gobblin.util.ConfigUtils;
public class GobblinServiceHATest {
private static final Logger logger = LoggerFactory.getLogger(GobblinServiceHATest.class);
- private static Gson gson = new GsonBuilder().setPrettyPrinting().create();
private static final String QUARTZ_INSTANCE_NAME = "org.quartz.scheduler.instanceName";
private static final String QUARTZ_THREAD_POOL_COUNT = "org.quartz.threadPool.threadCount";
@@ -91,22 +85,11 @@ public class GobblinServiceHATest {
private static final String TEST_FLOW_NAME_2 = "testFlow2";
private static final String TEST_SCHEDULE_2 = "0 1/0 * ? * *";
private static final String TEST_TEMPLATE_URI_2 = "FS:///templates/test.template";
- private static final String TEST_DUMMY_GROUP_NAME_2 = "dummyGroup";
- private static final String TEST_DUMMY_FLOW_NAME_2 = "dummyFlow";
private static final String TEST_GOBBLIN_EXECUTOR_NAME = "testGobblinExecutor";
private static final String TEST_SOURCE_NAME = "testSource";
private static final String TEST_SINK_NAME = "testSink";
- private ServiceBasedAppLauncher serviceLauncher;
- private TopologyCatalog topologyCatalog;
- private TopologySpec topologySpec;
-
- private FlowCatalog flowCatalog;
- private FlowSpec flowSpec;
-
- private Orchestrator orchestrator;
-
private GobblinServiceManager node1GobblinServiceManager;
private FlowConfigClient node1FlowConfigClient;
@@ -133,6 +116,8 @@ public class GobblinServiceHATest {
logger.info("Testing ZK Server listening on: " + testingZKServer.getConnectString());
HelixUtils.createGobblinHelixCluster(testingZKServer.getConnectString(), TEST_HELIX_CLUSTER_NAME);
+ ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
Properties commonServiceCoreProperties = new Properties();
commonServiceCoreProperties.put(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY, testingZKServer.getConnectString());
commonServiceCoreProperties.put(ServiceConfigKeys.HELIX_CLUSTER_NAME_KEY, TEST_HELIX_CLUSTER_NAME);
@@ -148,6 +133,11 @@ public class GobblinServiceHATest {
"org.gobblin.service.InMemorySpecExecutor");
commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".specExecInstance.capabilities",
TEST_SOURCE_NAME + ":" + TEST_SINK_NAME);
+ commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_USER_KEY, "testUser");
+ commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "testPassword");
+ commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl());
+ commonServiceCoreProperties.put("zookeeper.connect", testingZKServer.getConnectString());
+ commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, MysqlJobStatusStateStoreFactory.class.getName());
Properties node1ServiceCoreProperties = new Properties();
node1ServiceCoreProperties.putAll(commonServiceCoreProperties);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
index 32dc529..efe1b95 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
@@ -48,6 +48,10 @@ import com.linkedin.data.template.StringMap;
import com.linkedin.restli.client.RestLiResponseException;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.service.FlowConfig;
@@ -93,8 +97,18 @@ public class GobblinServiceManagerTest {
public void setup() throws Exception {
cleanUpDir(SERVICE_WORK_DIR);
cleanUpDir(SPEC_STORE_PARENT_DIR);
+ ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+ KafkaTestBase kafkaTestHelper = new KafkaTestBase();
+ kafkaTestHelper.startServers();
Properties serviceCoreProperties = new Properties();
+ serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_USER_KEY, "testUser");
+ serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "testPassword");
+ serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl());
+ serviceCoreProperties.put("zookeeper.connect", kafkaTestHelper.getZkConnectString());
+ serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, MysqlJobStatusStateStoreFactory.class.getName());
+
serviceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, TOPOLOGY_SPEC_STORE_DIR);
serviceCoreProperties.put(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, FLOW_SPEC_STORE_DIR);
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY, TEST_GOBBLIN_EXECUTOR_NAME);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
index fed5c47..fac2ff0 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
@@ -41,140 +41,23 @@ import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.service.ExecutionStatus;
-public class FsJobStatusRetrieverTest {
- private FsJobStatusRetriever jobStatusRetriever;
- private FileContextBasedFsStateStore fsStateStore;
- private String stateStoreDir = "/tmp/jobStatusRetrieverTest/statestore";
+public class FsJobStatusRetrieverTest extends JobStatusRetrieverTest {
- private String flowGroup = "myFlowGroup";
- private String flowName = "myFlowName";
- private String jobGroup;
- private String myJobGroup = "myJobGroup";
- private long jobExecutionId = 1111L;
- private String message = "https://myServer:8143/1234/1111";
+ private String stateStoreDir = "/tmp/jobStatusRetrieverTest/statestore";
@BeforeClass
public void setUp() throws Exception {
- cleanUpDir(stateStoreDir);
+ cleanUpDir();
Config config = ConfigFactory.empty().withValue(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef(stateStoreDir));
this.jobStatusRetriever = new FsJobStatusRetriever(config);
- this.fsStateStore = this.jobStatusRetriever.getStateStore();
- }
-
- private void addJobStatusToStateStore(Long flowExecutionId, String jobName) throws IOException {
- Properties properties = new Properties();
- properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
- properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
- properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(flowExecutionId));
- properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
- if (!jobName.equals(JobStatusRetriever.NA_KEY)) {
- this.jobGroup = myJobGroup;
- properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, myJobGroup);
- properties.setProperty(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, String.valueOf(this.jobExecutionId));
- properties.setProperty(TimingEvent.METADATA_MESSAGE, this.message);
- properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
- } else {
- this.jobGroup = JobStatusRetriever.NA_KEY;
- properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY);
- properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPILED.name());
- }
- properties.setProperty(TimingEvent.METADATA_START_TIME, "1");
- properties.setProperty(TimingEvent.METADATA_END_TIME, "2");
- State jobStatus = new State(properties);
-
- String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
- String tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName, KafkaJobStatusMonitor.STATE_STORE_TABLE_SUFFIX);
-
- this.fsStateStore.put(storeName, tableName, jobStatus);
}
- @Test
- public void testGetJobStatusesForFlowExecution() throws IOException {
- Long flowExecutionId = 1234L;
- addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY);
-
- Iterator<JobStatus> jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
- Assert.assertTrue(jobStatusIterator.hasNext());
- JobStatus jobStatus = jobStatusIterator.next();
- Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.COMPILED.name());
- Assert.assertEquals(jobStatus.getJobName(), (JobStatusRetriever.NA_KEY));
- Assert.assertEquals(jobStatus.getJobGroup(), JobStatusRetriever.NA_KEY);
- Assert.assertEquals(jobStatus.getProcessedCount(), 0);
- Assert.assertEquals(jobStatus.getLowWatermark(), "");
- Assert.assertEquals(jobStatus.getHighWatermark(), "");
-
- addJobStatusToStateStore(flowExecutionId,"myJobName1");
- jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
- jobStatus = jobStatusIterator.next();
- Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.RUNNING.name());
- Assert.assertEquals(jobStatus.getJobName(), "myJobName1");
- Assert.assertEquals(jobStatus.getJobGroup(), jobGroup);
- Assert.assertFalse(jobStatusIterator.hasNext());
-
- addJobStatusToStateStore(flowExecutionId,"myJobName2");
- jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
- Assert.assertTrue(jobStatusIterator.hasNext());
- jobStatus = jobStatusIterator.next();
- Assert.assertTrue(jobStatus.getJobName().equals("myJobName1") || jobStatus.getJobName().equals("myJobName2"));
-
- String jobName = jobStatus.getJobName();
- String nextExpectedJobName = ("myJobName1".equals(jobName)) ? "myJobName2" : "myJobName1";
- Assert.assertTrue(jobStatusIterator.hasNext());
- jobStatus = jobStatusIterator.next();
- Assert.assertEquals(jobStatus.getJobName(), nextExpectedJobName);
- }
-
- @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution")
- public void testGetJobStatusesForFlowExecution1() {
- long flowExecutionId = 1234L;
- String jobName = "myJobName1";
- String jobGroup = "myJobGroup";
- Iterator<JobStatus> jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId, jobName, jobGroup);
-
- Assert.assertTrue(jobStatusIterator.hasNext());
- JobStatus jobStatus = jobStatusIterator.next();
- Assert.assertEquals(jobStatus.getJobName(), jobName);
- Assert.assertEquals(jobStatus.getJobGroup(), jobGroup);
- Assert.assertEquals(jobStatus.getJobExecutionId(), jobExecutionId);
- Assert.assertEquals(jobStatus.getFlowName(), flowName);
- Assert.assertEquals(jobStatus.getFlowGroup(), flowGroup);
- Assert.assertEquals(jobStatus.getFlowExecutionId(), flowExecutionId);
- Assert.assertEquals(jobStatus.getMessage(), message);
- }
-
- @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution1")
- public void testGetLatestExecutionIdsForFlow() throws Exception {
- //Add new flow execution to state store
- long flowExecutionId1 = 1235L;
- addJobStatusToStateStore(flowExecutionId1, "myJobName1");
- long latestExecutionIdForFlow = this.jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup);
- Assert.assertEquals(latestExecutionIdForFlow, flowExecutionId1);
-
- long flowExecutionId2 = 1236L;
- addJobStatusToStateStore(flowExecutionId2, "myJobName1");
-
- //State store now has 3 flow executions - 1234, 1235, 1236. Get the latest 2 executions i.e. 1235 and 1236.
- List<Long> latestFlowExecutionIds = this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 2);
- Assert.assertEquals(latestFlowExecutionIds.size(), 2);
- Assert.assertEquals(latestFlowExecutionIds.get(0), (Long) flowExecutionId2);
- Assert.assertEquals(latestFlowExecutionIds.get(1), (Long) flowExecutionId1);
-
- //Remove all flow executions from state store
- cleanUpDir(stateStoreDir);
- Assert.assertNull(this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1));
- Assert.assertEquals(this.jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup), -1L);
- }
-
- private void cleanUpDir(String dir) throws Exception {
- File specStoreDir = new File(dir);
+ @Override
+ protected void cleanUpDir() throws Exception {
+ File specStoreDir = new File(this.stateStoreDir);
if (specStoreDir.exists()) {
FileUtils.deleteDirectory(specStoreDir);
}
}
-
- @AfterClass
- public void tearDown() throws Exception {
- cleanUpDir(stateStoreDir);
- }
}
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
similarity index 57%
copy from gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
copy to gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index fed5c47..83cb0fe 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -17,65 +17,47 @@
package org.apache.gobblin.service.monitoring;
-import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
-import org.apache.commons.io.FileUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import com.google.common.base.Joiner;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.service.ExecutionStatus;
-public class FsJobStatusRetrieverTest {
- private FsJobStatusRetriever jobStatusRetriever;
- private FileContextBasedFsStateStore fsStateStore;
- private String stateStoreDir = "/tmp/jobStatusRetrieverTest/statestore";
-
- private String flowGroup = "myFlowGroup";
- private String flowName = "myFlowName";
+public abstract class JobStatusRetrieverTest {
+ protected static final String FLOW_GROUP = "myFlowGroup";
+ protected static final String FLOW_NAME = "myFlowName";
private String jobGroup;
- private String myJobGroup = "myJobGroup";
- private long jobExecutionId = 1111L;
- private String message = "https://myServer:8143/1234/1111";
-
- @BeforeClass
- public void setUp() throws Exception {
- cleanUpDir(stateStoreDir);
- Config config = ConfigFactory.empty().withValue(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
- ConfigValueFactory.fromAnyRef(stateStoreDir));
- this.jobStatusRetriever = new FsJobStatusRetriever(config);
- this.fsStateStore = this.jobStatusRetriever.getStateStore();
- }
+ private static final String myJobGroup = "myJobGroup";
+ private static final String MY_JOB_NAME_1 = "myJobName1";
+ private static final String MY_JOB_NAME_2 = "myJobName2";
+ private static final long JOB_EXECUTION_ID = 1111L;
+ private static final String MESSAGE = "https://myServer:8143/1234/1111";
+ JobStatusRetriever jobStatusRetriever;
+
+ abstract void setUp() throws Exception;
- private void addJobStatusToStateStore(Long flowExecutionId, String jobName) throws IOException {
+ protected void addJobStatusToStateStore(Long flowExecutionId, String jobName) throws IOException {
Properties properties = new Properties();
- properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
- properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
+ properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, FLOW_GROUP);
+ properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, FLOW_NAME);
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(flowExecutionId));
properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
if (!jobName.equals(JobStatusRetriever.NA_KEY)) {
- this.jobGroup = myJobGroup;
+ jobGroup = myJobGroup;
properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, myJobGroup);
- properties.setProperty(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, String.valueOf(this.jobExecutionId));
- properties.setProperty(TimingEvent.METADATA_MESSAGE, this.message);
+ properties.setProperty(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, String.valueOf(JOB_EXECUTION_ID));
+ properties.setProperty(TimingEvent.METADATA_MESSAGE, MESSAGE);
properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
} else {
- this.jobGroup = JobStatusRetriever.NA_KEY;
+ jobGroup = JobStatusRetriever.NA_KEY;
properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY);
properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPILED.name());
}
@@ -83,10 +65,10 @@ public class FsJobStatusRetrieverTest {
properties.setProperty(TimingEvent.METADATA_END_TIME, "2");
State jobStatus = new State(properties);
- String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
- String tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName, KafkaJobStatusMonitor.STATE_STORE_TABLE_SUFFIX);
+ String storeName = KafkaJobStatusMonitor.jobStatusStoreName(FLOW_GROUP, FLOW_NAME);
+ String tableName = KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup, jobName);
- this.fsStateStore.put(storeName, tableName, jobStatus);
+ this.jobStatusRetriever.getStateStore().put(storeName, tableName, jobStatus);
}
@Test
@@ -94,7 +76,8 @@ public class FsJobStatusRetrieverTest {
Long flowExecutionId = 1234L;
addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY);
- Iterator<JobStatus> jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
+ Iterator<JobStatus>
+ jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
Assert.assertTrue(jobStatusIterator.hasNext());
JobStatus jobStatus = jobStatusIterator.next();
Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.COMPILED.name());
@@ -104,22 +87,22 @@ public class FsJobStatusRetrieverTest {
Assert.assertEquals(jobStatus.getLowWatermark(), "");
Assert.assertEquals(jobStatus.getHighWatermark(), "");
- addJobStatusToStateStore(flowExecutionId,"myJobName1");
- jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1);
+ jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
jobStatus = jobStatusIterator.next();
Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.RUNNING.name());
- Assert.assertEquals(jobStatus.getJobName(), "myJobName1");
+ Assert.assertEquals(jobStatus.getJobName(), MY_JOB_NAME_1);
Assert.assertEquals(jobStatus.getJobGroup(), jobGroup);
Assert.assertFalse(jobStatusIterator.hasNext());
- addJobStatusToStateStore(flowExecutionId,"myJobName2");
- jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_2);
+ jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
Assert.assertTrue(jobStatusIterator.hasNext());
jobStatus = jobStatusIterator.next();
- Assert.assertTrue(jobStatus.getJobName().equals("myJobName1") || jobStatus.getJobName().equals("myJobName2"));
+ Assert.assertTrue(jobStatus.getJobName().equals(MY_JOB_NAME_1) || jobStatus.getJobName().equals(MY_JOB_NAME_2));
String jobName = jobStatus.getJobName();
- String nextExpectedJobName = ("myJobName1".equals(jobName)) ? "myJobName2" : "myJobName1";
+ String nextExpectedJobName = (MY_JOB_NAME_1.equals(jobName)) ? MY_JOB_NAME_2 : MY_JOB_NAME_1;
Assert.assertTrue(jobStatusIterator.hasNext());
jobStatus = jobStatusIterator.next();
Assert.assertEquals(jobStatus.getJobName(), nextExpectedJobName);
@@ -128,53 +111,47 @@ public class FsJobStatusRetrieverTest {
@Test (dependsOnMethods = "testGetJobStatusesForFlowExecution")
public void testGetJobStatusesForFlowExecution1() {
long flowExecutionId = 1234L;
- String jobName = "myJobName1";
- String jobGroup = "myJobGroup";
- Iterator<JobStatus> jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId, jobName, jobGroup);
+ Iterator<JobStatus> jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId,
+ MY_JOB_NAME_1, myJobGroup);
Assert.assertTrue(jobStatusIterator.hasNext());
JobStatus jobStatus = jobStatusIterator.next();
- Assert.assertEquals(jobStatus.getJobName(), jobName);
- Assert.assertEquals(jobStatus.getJobGroup(), jobGroup);
- Assert.assertEquals(jobStatus.getJobExecutionId(), jobExecutionId);
- Assert.assertEquals(jobStatus.getFlowName(), flowName);
- Assert.assertEquals(jobStatus.getFlowGroup(), flowGroup);
+ Assert.assertEquals(jobStatus.getJobName(), MY_JOB_NAME_1);
+ Assert.assertEquals(jobStatus.getJobGroup(), myJobGroup);
+ Assert.assertEquals(jobStatus.getJobExecutionId(), JOB_EXECUTION_ID);
+ Assert.assertEquals(jobStatus.getFlowName(), FLOW_NAME);
+ Assert.assertEquals(jobStatus.getFlowGroup(), FLOW_GROUP);
Assert.assertEquals(jobStatus.getFlowExecutionId(), flowExecutionId);
- Assert.assertEquals(jobStatus.getMessage(), message);
+ Assert.assertEquals(jobStatus.getMessage(), MESSAGE);
}
@Test (dependsOnMethods = "testGetJobStatusesForFlowExecution1")
public void testGetLatestExecutionIdsForFlow() throws Exception {
//Add new flow execution to state store
long flowExecutionId1 = 1235L;
- addJobStatusToStateStore(flowExecutionId1, "myJobName1");
- long latestExecutionIdForFlow = this.jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup);
+ addJobStatusToStateStore(flowExecutionId1, MY_JOB_NAME_1);
+ long latestExecutionIdForFlow = this.jobStatusRetriever.getLatestExecutionIdForFlow(FLOW_NAME, FLOW_GROUP);
Assert.assertEquals(latestExecutionIdForFlow, flowExecutionId1);
long flowExecutionId2 = 1236L;
- addJobStatusToStateStore(flowExecutionId2, "myJobName1");
+ addJobStatusToStateStore(flowExecutionId2, MY_JOB_NAME_1);
//State store now has 3 flow executions - 1234, 1235, 1236. Get the latest 2 executions i.e. 1235 and 1236.
- List<Long> latestFlowExecutionIds = this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 2);
+ List<Long> latestFlowExecutionIds = this.jobStatusRetriever.getLatestExecutionIdsForFlow(FLOW_NAME, FLOW_GROUP, 2);
Assert.assertEquals(latestFlowExecutionIds.size(), 2);
Assert.assertEquals(latestFlowExecutionIds.get(0), (Long) flowExecutionId2);
Assert.assertEquals(latestFlowExecutionIds.get(1), (Long) flowExecutionId1);
//Remove all flow executions from state store
- cleanUpDir(stateStoreDir);
- Assert.assertNull(this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1));
- Assert.assertEquals(this.jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup), -1L);
+ cleanUpDir();
+ Assert.assertEquals(this.jobStatusRetriever.getLatestExecutionIdsForFlow(FLOW_NAME, FLOW_GROUP, 1).size(), 0);
+ Assert.assertEquals(this.jobStatusRetriever.getLatestExecutionIdForFlow(FLOW_NAME, FLOW_GROUP), -1L);
}
- private void cleanUpDir(String dir) throws Exception {
- File specStoreDir = new File(dir);
- if (specStoreDir.exists()) {
- FileUtils.deleteDirectory(specStoreDir);
- }
- }
+ abstract void cleanUpDir() throws Exception;
@AfterClass
public void tearDown() throws Exception {
- cleanUpDir(stateStoreDir);
+ cleanUpDir();
}
-}
\ No newline at end of file
+}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
index d34fafd..ad48f34 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
@@ -126,9 +126,8 @@ public class KafkaAvroJobStatusMonitorTest {
jobStatusMonitor.processMessage(messageAndMetadata);
StateStore stateStore = jobStatusMonitor.getStateStore();
- String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(this.flowGroup, this.flowName);
- String tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(this.flowExecutionId, "NA", "NA",
- KafkaJobStatusMonitor.STATE_STORE_TABLE_SUFFIX);
+ String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
+ String tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
List<State> stateList = stateStore.getAll(storeName, tableName);
Assert.assertEquals(stateList.size(), 1);
State state = stateList.get(0);
@@ -137,8 +136,7 @@ public class KafkaAvroJobStatusMonitorTest {
messageAndMetadata = iterator.next();
jobStatusMonitor.processMessage(messageAndMetadata);
- tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(this.flowExecutionId, this.jobGroup, this.jobName,
- KafkaJobStatusMonitor.STATE_STORE_TABLE_SUFFIX);
+ tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, this.jobGroup, this.jobName);
stateList = stateStore.getAll(storeName, tableName);
Assert.assertEquals(stateList.size(), 1);
state = stateList.get(0);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
new file mode 100644
index 0000000..45b80a8
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import org.testng.annotations.BeforeClass;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+
+
+public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
+ private MysqlJobStatusStateStore dbJobStateStore;
+ private static final String TEST_USER = "testUser";
+ private static final String TEST_PASSWORD = "testPassword";
+
+ @BeforeClass
+ @Override
+ public void setUp() throws Exception {
+ ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ String jdbcUrl = testMetastoreDatabase.getJdbcUrl();
+
+ ConfigBuilder configBuilder = ConfigBuilder.create();
+ configBuilder.addPrimitive(MysqlJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, jdbcUrl);
+ configBuilder.addPrimitive(MysqlJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER);
+ configBuilder.addPrimitive(MysqlJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD);
+
+ this.jobStatusRetriever = new MysqlJobStatusRetriever(configBuilder.build());
+ this.dbJobStateStore = ((MysqlJobStatusRetriever) this.jobStatusRetriever).getStateStore();
+ cleanUpDir();
+ }
+
+ @Override
+ void cleanUpDir() throws Exception {
+ this.dbJobStateStore.delete(KafkaJobStatusMonitor.jobStatusStoreName(FLOW_GROUP, FLOW_NAME));
+ }
+}