You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/06/11 16:13:23 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-790]
DagStateStore MySQL
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9dcc79d [GOBBLIN-790] DagStateStore MySQL
9dcc79d is described below
commit 9dcc79dd406bb347b908e5a8709c40ac6ed34489
Author: autumnust <le...@linkedin.com>
AuthorDate: Tue Jun 11 09:13:16 2019 -0700
[GOBBLIN-790] DagStateStore MySQL
Closes #2656 from autumnust/DagStataToStateStore
---
.../apache/gobblin/metastore/MysqlStateStore.java | 73 ++++++---
.../service/modules/orchestration/DagManager.java | 22 ++-
.../modules/orchestration/DagManagerUtils.java | 43 ++++--
.../modules/orchestration/DagStateStore.java | 2 +-
.../modules/orchestration/FSDagStateStore.java | 3 +-
.../modules/orchestration/MysqlDagStateStore.java | 146 ++++++++++++++++++
.../modules/orchestration/DagManagerTest.java | 3 +-
.../modules/orchestration/DagTestUtils.java | 93 ++++++++++++
.../modules/orchestration/FSDagStateStoreTest.java | 64 +-------
.../orchestration/MysqlDagStateStoreTest.java | 167 +++++++++++++++++++++
10 files changed, 517 insertions(+), 99 deletions(-)
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index b7694f6..a833a1d 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -90,6 +90,8 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
private static final String SELECT_JOB_STATE_WITH_LIKE_TEMPLATE =
"SELECT state FROM $TABLE$ WHERE store_name = ? and table_name like ?";
+ private static final String SELECT_ALL_JOBS_STATE = "SELECT state FROM $TABLE$";
+
private static final String SELECT_JOB_STATE_EXISTS_TEMPLATE =
"SELECT 1 FROM $TABLE$ WHERE store_name = ? and table_name = ?";
@@ -123,6 +125,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
private final String UPSERT_JOB_STATE_SQL;
private final String SELECT_JOB_STATE_SQL;
+ private final String SELECT_ALL_JOBS_STATE_SQL;
private final String SELECT_JOB_STATE_WITH_LIKE_SQL;
private final String SELECT_JOB_STATE_EXISTS_SQL;
private final String SELECT_JOB_STATE_NAMES_SQL;
@@ -149,6 +152,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
UPSERT_JOB_STATE_SQL = UPSERT_JOB_STATE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
SELECT_JOB_STATE_SQL = SELECT_JOB_STATE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
SELECT_JOB_STATE_WITH_LIKE_SQL = SELECT_JOB_STATE_WITH_LIKE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
+ SELECT_ALL_JOBS_STATE_SQL = SELECT_ALL_JOBS_STATE.replace("$TABLE$", stateStoreTableName);
SELECT_JOB_STATE_EXISTS_SQL = SELECT_JOB_STATE_EXISTS_TEMPLATE.replace("$TABLE$", stateStoreTableName);
SELECT_JOB_STATE_NAMES_SQL = SELECT_JOB_STATE_NAMES_TEMPLATE.replace("$TABLE$", stateStoreTableName);
DELETE_JOB_STORE_SQL = DELETE_JOB_STORE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
@@ -335,33 +339,30 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
SELECT_JOB_STATE_WITH_LIKE_SQL : SELECT_JOB_STATE_SQL)) {
queryStatement.setString(1, storeName);
queryStatement.setString(2, tableName);
-
- try (ResultSet rs = queryStatement.executeQuery()) {
- while (rs.next()) {
- Blob blob = rs.getBlob(1);
- Text key = new Text();
-
- try (InputStream is = StreamUtils.isCompressed(blob.getBytes(1, 2)) ?
- new GZIPInputStream(blob.getBinaryStream()) : blob.getBinaryStream();
- DataInputStream dis = new DataInputStream(is)) {
- // keep deserializing while we have data
- while (dis.available() > 0) {
- T state = this.stateClass.newInstance();
- key.readString(dis);
- state.readFields(dis);
- states.add(state);
- }
- } catch (EOFException e) {
- // no more data. GZIPInputStream.available() doesn't return 0 until after EOF.
- }
- }
- }
+ execGetAllStatement(queryStatement, states);
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
throw new IOException("failure retrieving state from storeName " + storeName + " tableName " + tableName, e);
}
+ return states;
+ }
+
+ /**
+ * An additional {@link #getAll()} method to retrieve all entries in a table.
+ *
+ */
+ public List<T> getAll() throws IOException {
+ List<T> states = Lists.newArrayList();
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement queryStatement = connection.prepareStatement(SELECT_ALL_JOBS_STATE_SQL)) {
+ execGetAllStatement(queryStatement, states);
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new IOException(String.format("failure retrieving all states with the SQL[%s]", SELECT_ALL_JOBS_STATE_SQL), e);
+ }
return states;
}
@@ -375,6 +376,36 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
return getAll(storeName, "%", true);
}
+ /**
+ * An helper function extracted from getAll method originally that has side effects:
+ * - Executing queryStatement
+ * - Put the result into List<state> object.
+ * @throws SQLException
+ * @throws Exception
+ */
+ private void execGetAllStatement(PreparedStatement queryStatement, List<T> states) throws SQLException, Exception {
+ try (ResultSet rs = queryStatement.executeQuery()) {
+ while (rs.next()) {
+ Blob blob = rs.getBlob(1);
+ Text key = new Text();
+
+ try (InputStream is = StreamUtils.isCompressed(blob.getBytes(1, 2)) ?
+ new GZIPInputStream(blob.getBinaryStream()) : blob.getBinaryStream();
+ DataInputStream dis = new DataInputStream(is)) {
+ // keep deserializing while we have data
+ while (dis.available() > 0) {
+ T state = this.stateClass.newInstance();
+ key.readString(dis);
+ state.readFields(dis);
+ states.add(state);
+ }
+ } catch (EOFException e) {
+ // no more data. GZIPInputStream.available() doesn't return 0 until after EOF.
+ }
+ }
+ }
+ }
+
@Override
public List<String> getTableNames(String storeName, Predicate<String> predicate) throws IOException {
List<String> names = Lists.newArrayList();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 8140648..27da869 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -98,7 +98,8 @@ import static org.apache.gobblin.service.ExecutionStatus.valueOf;
public class DagManager extends AbstractIdleService {
public static final String DEFAULT_FLOW_FAILURE_OPTION = FailureOption.FINISH_ALL_POSSIBLE.name();
- private static final String DAG_MANAGER_PREFIX = "gobblin.service.dagManager.";
+ public static final String DAG_MANAGER_PREFIX = "gobblin.service.dagManager.";
+
private static final String JOB_STATUS_RETRIEVER_KEY = DAG_MANAGER_PREFIX + "jobStatusRetriever";
private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
private static final Integer DEFAULT_NUM_THREADS = 3;
@@ -109,8 +110,6 @@ public class DagManager extends AbstractIdleService {
private static final String DEFAULT_JOB_STATUS_RETRIEVER_CLASS = FsJobStatusRetriever.class.getName();
private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX + "dagStateStoreClass";
- static final String DAG_STATESTORE_DIR = DAG_MANAGER_PREFIX + "dagStateStoreDir";
-
/**
* Action to be performed on a {@link Dag}, in case of a job failure. Currently, we allow 2 modes:
* <ul>
@@ -582,11 +581,20 @@ public class DagManager extends AbstractIdleService {
}
}
- private void cleanUpDag(String dagId) {
- Dag<JobExecutionPlan> dag = this.dags.get(dagId);
- this.dagToJobs.remove(dagId);
- this.dagStateStore.cleanUp(dag);
+ /**
+ * Note that removal of a {@link Dag} entry in {@link #dags} needs to be happen after {@link #cleanUp()}
+ * since the real {@link Dag} object is required for {@link #cleanUp()},
+ * and cleaning of all relevant states need to be atomic
+ * @param dagId
+ */
+ private synchronized void cleanUpDag(String dagId) {
+ try {
+ this.dagStateStore.cleanUp(dags.get(dagId));
+ } catch (IOException ioe) {
+ log.error(String.format("Failed to clean %s from backStore due to:", dagId), ioe);
+ }
this.dags.remove(dagId);
+ this.dagToJobs.remove(dagId);
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 1c6a086..14fc2bc 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -30,6 +30,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
@@ -38,17 +39,40 @@ import org.apache.gobblin.util.ConfigUtils;
public class DagManagerUtils {
+
+ static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
+ Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
+ String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ return new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+ }
+
+ static long getFlowExecId(Dag<JobExecutionPlan> dag) {
+ Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
+ return jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ }
+
/**
* Generate a dagId from the given {@link Dag} instance.
* @param dag instance of a {@link Dag}.
* @return a String id associated corresponding to the {@link Dag} instance.
*/
static String generateDagId(Dag<JobExecutionPlan> dag) {
- Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
- String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
- String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
- Long flowExecutionId = jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
- return Joiner.on("_").join(flowGroup, flowName, flowExecutionId);
+ FlowId flowId = getFlowId(dag);
+ Long flowExecutionId = getFlowExecId(dag);
+ return Joiner.on("_").join(flowId.getFlowGroup(), flowId.getFlowName(), flowExecutionId);
+ }
+
+ /**
+ * Generate a FlowId from the given {@link Dag} instance.
+ * FlowId, comparing to DagId, doesn't contain FlowExecutionId so different {@link Dag} could possibly have same
+ * {@link FlowId}.
+ * @param dag
+ * @return
+ */
+ static String generateFlowIdInString(Dag<JobExecutionPlan> dag) {
+ FlowId flowId = getFlowId(dag);
+ return Joiner.on("_").join(flowId.getFlowGroup(), flowId.getFlowName());
}
/**
@@ -57,12 +81,9 @@ public class DagManagerUtils {
* @return fully qualified name of the underlying {@link Dag}.
*/
static String getFullyQualifiedDagName(Dag<JobExecutionPlan> dag) {
- Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
- String flowGroup = ConfigUtils.getString(jobConfig, ConfigurationKeys.FLOW_GROUP_KEY, "");
- String flowName = ConfigUtils.getString(jobConfig, ConfigurationKeys.FLOW_NAME_KEY, "");
- Long flowExecutionId = ConfigUtils.getLong(jobConfig, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 0L);
-
- return "(flowGroup: " + flowGroup + ", flowName: " + flowName + ", flowExecutionId: " + flowExecutionId + ")";
+ FlowId flowid = getFlowId(dag);
+ long flowExecutionId = getFlowExecId(dag);
+ return "(flowGroup: " + flowid.getFlowGroup() + ", flowName: " + flowid.getFlowName() + ", flowExecutionId: " + flowExecutionId + ")";
}
static String getJobName(DagNode<JobExecutionPlan> dagNode) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
index 55ce6f0..cee56c2 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
@@ -44,7 +44,7 @@ public interface DagStateStore {
* Delete the {@link Dag} from the backing store, typically upon completion of execution.
* @param dag The dag completed/cancelled from execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}.
*/
- void cleanUp(Dag<JobExecutionPlan> dag);
+ void cleanUp(Dag<JobExecutionPlan> dag) throws IOException;
/**
* Load all currently running {@link Dag}s from the underlying store. Typically, invoked when a new {@link DagManager}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
index 62789a8..11e5f16 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
@@ -50,12 +50,13 @@ import org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
@Slf4j
public class FSDagStateStore implements DagStateStore {
public static final String DAG_FILE_EXTENSION = ".dag";
+ static final String DAG_STATESTORE_DIR = DagManager.DAG_MANAGER_PREFIX + "dagStateStoreDir";
private final String dagCheckpointDir;
private final GsonSerDe<List<JobExecutionPlan>> serDe;
public FSDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) throws IOException {
- this.dagCheckpointDir = config.getString(DagManager.DAG_STATESTORE_DIR);
+ this.dagCheckpointDir = config.getString(DAG_STATESTORE_DIR);
File checkpointDir = new File(this.dagCheckpointDir);
if (!checkpointDir.exists()) {
if (!checkpointDir.mkdirs()) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
new file mode 100644
index 0000000..5e2bd83
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.MysqlStateStoreFactory;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.GsonSerDe;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
+
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonSerializer;
+import com.google.gson.reflect.TypeToken;
+import com.typesafe.config.Config;
+
+import static org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX;
+import static org.apache.gobblin.service.modules.orchestration.DagManagerUtils.generateFlowIdInString;
+import static org.apache.gobblin.service.modules.orchestration.DagManagerUtils.getFlowExecId;
+
+
+/**
+ * A implementation of {@link DagStateStore} using MySQL as a backup, leverage {@link MysqlStateStore}.
+ * It implements interfaces of {@link DagStateStore} but delegating responsibilities to methods provided
+ * in {@link MysqlStateStore}.
+ * It also implements conversion between {@link Dag<JobExecutionPlan>} to {@link State}.
+ *
+ * The schema of this will simply be:
+ * | storeName | tableName | State |
+ * where storeName represents FlowId, a combination of FlowGroup and FlowName, and tableName represents FlowExecutionId.
+ * State is a pocket for serialized {@link Dag} object.
+ *
+ *
+ */
+public class MysqlDagStateStore implements DagStateStore {
+
+ public static final String CONFIG_PREFIX = GOBBLIN_SERVICE_PREFIX + "mysqlDagStateStore";
+ public static final String DAG_KEY_IN_STATE = "dag";
+
+ /**
+ * The schema of {@link MysqlStateStore} is fixed but the columns are semantically projected into Dag's context:
+ * - The 'storeName' is FlowId.
+ * - The 'tableName' is FlowExecutionId.
+ */
+ private MysqlStateStore<State> mysqlStateStore;
+ private final GsonSerDe<List<JobExecutionPlan>> serDe;
+ private JobExecutionPlanDagFactory jobExecPlanDagFactory;
+
+ public MysqlDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) {
+ if (config.hasPath(CONFIG_PREFIX)) {
+ config = config.getConfig(CONFIG_PREFIX).withFallback(config);
+ }
+
+ this.mysqlStateStore = (MysqlStateStore<State>) createStateStore(config);
+
+ JsonSerializer<List<JobExecutionPlan>> serializer = new JobExecutionPlanListSerializer();
+ JsonDeserializer<List<JobExecutionPlan>> deserializer = new JobExecutionPlanListDeserializer(topologySpecMap);
+ Type typeToken = new TypeToken<List<JobExecutionPlan>>() {
+ }.getType();
+ this.serDe = new GsonSerDe<>(typeToken, serializer, deserializer);
+ this.jobExecPlanDagFactory = new JobExecutionPlanDagFactory();
+ }
+
+ /**
+ * Creating an instance of StateStore.
+ */
+ protected StateStore<State> createStateStore(Config config) {
+ try {
+ return (MysqlStateStoreFactory.class.newInstance()).createStateStore(config, State.class);
+ } catch (ReflectiveOperationException rfoe) {
+ throw new RuntimeException("A MySQL StateStore cannot be correctly initialized due to:", rfoe);
+ }
+ }
+
+ @Override
+ public void writeCheckpoint(Dag<JobExecutionPlan> dag)
+ throws IOException {
+ mysqlStateStore.put(generateFlowIdInString(dag), getFlowExecId(dag) + "", convertDagIntoState(dag));
+ }
+
+ @Override
+ public void cleanUp(Dag<JobExecutionPlan> dag)
+ throws IOException {
+ mysqlStateStore.delete(generateFlowIdInString(dag), getFlowExecId(dag) + "");
+ }
+
+ @Override
+ public List<Dag<JobExecutionPlan>> getDags()
+ throws IOException {
+ return mysqlStateStore.getAll().stream().map(this::convertStateObjIntoDag).collect(Collectors.toList());
+ }
+
+ /**
+ * For {@link Dag} to work with {@link MysqlStateStore}, it needs to be packaged into a {@link State} object.
+ * The way that it does is simply serialize the {@link Dag} first and use the key {@link #DAG_KEY_IN_STATE}
+ * to be pair with it.
+ *
+ * The serialization step is required for readability and portability of serde lib.
+ * @param dag The dag to be converted.
+ * @return An {@link State} object that contains a single k-v pair for {@link Dag}.
+ */
+ private State convertDagIntoState(Dag<JobExecutionPlan> dag) {
+ State outputState = new State();
+
+ // Make sure the object has been serialized.
+ List<JobExecutionPlan> jobExecutionPlanList =
+ dag.getNodes().stream().map(Dag.DagNode::getValue).collect(Collectors.toList());
+ outputState.setProp(DAG_KEY_IN_STATE, serDe.serialize(jobExecutionPlanList));
+ return outputState;
+ }
+
+ /**
+ * Get the {@link Dag} out of a {@link State} pocket.
+ */
+ private Dag<JobExecutionPlan> convertStateObjIntoDag(State state) {
+ String serializedJobExecPlanList = state.getProp(DAG_KEY_IN_STATE);
+ return jobExecPlanDagFactory.createDag(serDe.deserialize(serializedJobExecPlanList));
+ }
+}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index a67e5dd..562f8d8 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -44,7 +44,6 @@ import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
@@ -71,7 +70,7 @@ public class DagManagerTest {
public void setUp() throws Exception {
FileUtils.deleteDirectory(new File(this.dagStateStoreDir));
Config config = ConfigFactory.empty()
- .withValue(DagManager.DAG_STATESTORE_DIR, ConfigValueFactory.fromAnyRef(this.dagStateStoreDir));
+ .withValue(FSDagStateStore.DAG_STATESTORE_DIR, ConfigValueFactory.fromAnyRef(this.dagStateStoreDir));
this._dagStateStore = new FSDagStateStore(config, new HashMap<>());
this._jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagTestUtils.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagTestUtils.java
new file mode 100644
index 0000000..f63c6ad
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagTestUtils.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.hadoop.fs.Path;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+
+public class DagTestUtils {
+ private DagTestUtils() {
+
+ }
+
+ public static TopologySpec buildNaiveTopologySpec(String specUriInString) {
+ String specStoreDir = "/tmp/specStoreDir";
+ Properties properties = new Properties();
+ properties.put("specStore.fs.dir", specStoreDir);
+ properties.put("specExecInstance.capabilities", "source:destination");
+ properties.put("specExecInstance.uri", specUriInString);
+ properties.put("uri",specUriInString);
+
+ Config specExecConfig = ConfigUtils.propertiesToConfig(properties);
+ SpecExecutor specExecutorInstanceProducer = new InMemorySpecExecutor(specExecConfig);
+ TopologySpec.Builder topologySpecBuilder = TopologySpec.builder(new Path(specStoreDir).toUri())
+ .withConfig(specExecConfig)
+ .withDescription("test")
+ .withVersion("1")
+ .withSpecExecutor(specExecutorInstanceProducer);
+
+ return topologySpecBuilder.build();
+ }
+
+ /**
+ * Create a {@link Dag < JobExecutionPlan >} with one parent and one child.
+ * @return a Dag.
+ */
+ public static Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId) throws URISyntaxException {
+ List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ String suffix = Integer.toString(i);
+ Config jobConfig = ConfigBuilder.create().
+ addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id).
+ addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id).
+ addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId).
+ addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).build();
+ if (i > 0) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job" + (i - 1)));
+ }
+ JobSpec js = JobSpec.builder("test_job" + suffix).withVersion(suffix).withConfig(jobConfig).
+ withTemplate(new URI("job" + suffix)).build();
+
+ SpecExecutor specExecutor = buildNaiveTopologySpec("mySpecExecutor").getSpecExecutor();
+ JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor);
+ jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
+ jobExecutionPlans.add(jobExecutionPlan);
+ }
+ return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+ }
+}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
index 8611b66..a38929a 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
@@ -20,14 +20,11 @@ import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -38,17 +35,11 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
-import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
-import org.apache.gobblin.util.ConfigUtils;
public class FSDagStateStoreTest {
@@ -64,63 +55,24 @@ public class FSDagStateStoreTest {
throws IOException, URISyntaxException {
this.checkpointDir = new File(dagStateStoreDir);
FileUtils.deleteDirectory(this.checkpointDir);
- Config config = ConfigFactory.empty().withValue(DagManager.DAG_STATESTORE_DIR, ConfigValueFactory.fromAnyRef(
+ Config config = ConfigFactory.empty().withValue(FSDagStateStore.DAG_STATESTORE_DIR, ConfigValueFactory.fromAnyRef(
this.dagStateStoreDir));
this.topologySpecMap = new HashMap<>();
- String specStoreDir = "/tmp/specStoreDir";
- String specExecInstance = "mySpecExecutor";
- Properties properties = new Properties();
- properties.put("specStore.fs.dir", specStoreDir);
- properties.put("specExecInstance.capabilities", "source:destination");
- properties.put("specExecInstance.uri", specExecInstance);
- properties.put("uri",specExecInstance);
-
- Config specExecConfig = ConfigUtils.propertiesToConfig(properties);
- SpecExecutor specExecutorInstanceProducer = new InMemorySpecExecutor(specExecConfig);
- TopologySpec.Builder topologySpecBuilder = TopologySpec.builder(new Path(specStoreDir).toUri())
- .withConfig(specExecConfig)
- .withDescription("test")
- .withVersion("1")
- .withSpecExecutor(specExecutorInstanceProducer);
- this.topologySpec = topologySpecBuilder.build();
- this.specExecURI = new URI(specExecInstance);
+ // Construct the TopologySpec and its map.
+ String specExecInstanceUriInString = "mySpecExecutor";
+ this.topologySpec = DagTestUtils.buildNaiveTopologySpec(specExecInstanceUriInString);
+ this.specExecURI = new URI(specExecInstanceUriInString);
this.topologySpecMap.put(this.specExecURI, topologySpec);
this._dagStateStore = new FSDagStateStore(config, this.topologySpecMap);
}
- /**
- * Create a {@link Dag<JobExecutionPlan>} with one parent and one child.
- * @return a Dag.
- */
- public Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId) throws URISyntaxException {
- List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
- for (int i = 0; i < 2; i++) {
- String suffix = Integer.toString(i);
- Config jobConfig = ConfigBuilder.create().
- addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id).
- addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id).
- addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId).
- addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).build();
- if (i > 0) {
- jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job" + (i - 1)));
- }
- JobSpec js = JobSpec.builder("test_job" + suffix).withVersion(suffix).withConfig(jobConfig).
- withTemplate(new URI("job" + suffix)).build();
- SpecExecutor specExecutor = this.topologySpec.getSpecExecutor();
- JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor);
- jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
- jobExecutionPlans.add(jobExecutionPlan);
- }
- return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
- }
-
@Test
public void testWriteCheckpoint() throws IOException, URISyntaxException {
long flowExecutionId = System.currentTimeMillis();
String flowGroupId = "0";
- Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId);
+ Dag<JobExecutionPlan> dag = DagTestUtils.buildDag(flowGroupId, flowExecutionId);
this._dagStateStore.writeCheckpoint(dag);
String fileName = DagManagerUtils.generateDagId(dag) + FSDagStateStore.DAG_FILE_EXTENSION;
@@ -148,7 +100,7 @@ public class FSDagStateStoreTest {
public void testCleanUp() throws IOException, URISyntaxException {
long flowExecutionId = System.currentTimeMillis();
String flowGroupId = "0";
- Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId);
+ Dag<JobExecutionPlan> dag = DagTestUtils.buildDag(flowGroupId, flowExecutionId);
this._dagStateStore.writeCheckpoint(dag);
String fileName = DagManagerUtils.generateDagId(dag) + FSDagStateStore.DAG_FILE_EXTENSION;
File dagFile = new File(this.checkpointDir, fileName);
@@ -164,7 +116,7 @@ public class FSDagStateStoreTest {
List<Long> flowExecutionIds = Lists.newArrayList(System.currentTimeMillis(), System.currentTimeMillis() + 1);
for (int i = 0; i < 2; i++) {
String flowGroupId = Integer.toString(i);
- Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionIds.get(i));
+ Dag<JobExecutionPlan> dag = DagTestUtils.buildDag(flowGroupId, flowExecutionIds.get(i));
this._dagStateStore.writeCheckpoint(dag);
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
new file mode 100644
index 0000000..6ad85d6
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+
+/**
+ * Mainly testing functionalities related to DagStateStore but not Mysql-related components.
+ */
+public class MysqlDagStateStoreTest {
+
+ private DagStateStore _dagStateStore;
+ private Map<URI, TopologySpec> topologySpecMap;
+
+ private static final String TEST_USER = "testUser";
+ private static final String TEST_PASSWORD = "testPassword";
+ private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
+
+ @BeforeClass
+ public void setUp() throws Exception {
+
+
+ ConfigBuilder configBuilder = ConfigBuilder.create();
+
+ // Constructing TopologySpecMap.
+ this.topologySpecMap = new HashMap<>();
+ String specExecInstance = "mySpecExecutor";
+ TopologySpec topologySpec = DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+ URI specExecURI = new URI(specExecInstance);
+ this.topologySpecMap.put(specExecURI, topologySpec);
+
+ this._dagStateStore = new TestMysqlDagStateStore(configBuilder.build(), this.topologySpecMap);
+ }
+
+
+ @Test
+ public void testWriteCheckpointAndGetAll() throws Exception{
+ Dag<JobExecutionPlan> dag_0 = DagTestUtils.buildDag("random_0", 123L);
+ Dag<JobExecutionPlan> dag_1 = DagTestUtils.buildDag("random_1", 456L);
+ _dagStateStore.writeCheckpoint(dag_0);
+ _dagStateStore.writeCheckpoint(dag_1);
+
+ List<Dag<JobExecutionPlan>> dags = _dagStateStore.getDags();
+ Assert.assertEquals(dags.size(), 2);
+
+ // Verify dag contents
+ Dag<JobExecutionPlan> dagDeserialized = dags.get(0);
+ Assert.assertEquals(dagDeserialized.getNodes().size(), 2);
+ Assert.assertEquals(dagDeserialized.getStartNodes().size(), 1);
+ Assert.assertEquals(dagDeserialized.getEndNodes().size(), 1);
+ Dag.DagNode<JobExecutionPlan> child = dagDeserialized.getEndNodes().get(0);
+ Dag.DagNode<JobExecutionPlan> parent = dagDeserialized.getStartNodes().get(0);
+ Assert.assertEquals(dagDeserialized.getParentChildMap().size(), 1);
+ Assert.assertTrue(dagDeserialized.getParentChildMap().get(parent).contains(child));
+
+ for (int i = 0; i < 2; i++) {
+ JobExecutionPlan plan = dagDeserialized.getNodes().get(i).getValue();
+ Config jobConfig = plan.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY), "group" + "random_0");
+ Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY), "flow" + "random_0");
+ Assert.assertEquals(jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), 123L);
+ Assert.assertEquals(plan.getExecutionStatus(), ExecutionStatus.RUNNING);
+ }
+
+ dagDeserialized = dags.get(1);
+ Assert.assertEquals(dagDeserialized.getNodes().size(), 2);
+ Assert.assertEquals(dagDeserialized.getStartNodes().size(), 1);
+ Assert.assertEquals(dagDeserialized.getEndNodes().size(), 1);
+ child = dagDeserialized.getEndNodes().get(0);
+ parent = dagDeserialized.getStartNodes().get(0);
+ Assert.assertEquals(dagDeserialized.getParentChildMap().size(), 1);
+ Assert.assertTrue(dagDeserialized.getParentChildMap().get(parent).contains(child));
+
+ for (int i = 0; i < 2; i++) {
+ JobExecutionPlan plan = dagDeserialized.getNodes().get(i).getValue();
+ Config jobConfig = plan.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY), "group" + "random_1");
+ Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY), "flow" + "random_1");
+ Assert.assertEquals(jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), 456L);
+ Assert.assertEquals(plan.getExecutionStatus(), ExecutionStatus.RUNNING);
+ }
+ }
+
+ @Test (dependsOnMethods = "testWriteCheckpointAndGetAll")
+ public void testCleanUp() throws Exception {
+ Dag<JobExecutionPlan> dag_0 = DagTestUtils.buildDag("random_0", 123L);
+ Dag<JobExecutionPlan> dag_1 = DagTestUtils.buildDag("random_1", 456L);
+ _dagStateStore.writeCheckpoint(dag_0);
+ _dagStateStore.writeCheckpoint(dag_1);
+
+ List<Dag<JobExecutionPlan>> dags = _dagStateStore.getDags();
+ Assert.assertEquals(dags.size(), 2);
+
+ _dagStateStore.cleanUp(dags.get(0));
+ _dagStateStore.cleanUp(dags.get(1));
+
+ dags = _dagStateStore.getDags();
+ Assert.assertEquals(dags.size(), 0);
+ }
+
+ /**
+ * Only overwrite {@link #createStateStore(Config)} method to directly return a mysqlStateStore
+ * backed by mocked db.
+ */
+ public class TestMysqlDagStateStore extends MysqlDagStateStore {
+ public TestMysqlDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) {
+ super(config, topologySpecMap);
+ }
+
+ @Override
+ protected StateStore<State> createStateStore(Config config) {
+ try {
+ // Setting up mock DB
+ ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ String jdbcUrl = testMetastoreDatabase.getJdbcUrl();
+ BasicDataSource mySqlDs = new BasicDataSource();
+
+ mySqlDs.setDriverClassName(ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER);
+ mySqlDs.setDefaultAutoCommit(false);
+ mySqlDs.setUrl(jdbcUrl);
+ mySqlDs.setUsername(TEST_USER);
+ mySqlDs.setPassword(TEST_PASSWORD);
+
+ return new MysqlStateStore<>(mySqlDs, TEST_DAG_STATE_STORE, false, State.class);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
\ No newline at end of file