You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/09/23 17:04:49 UTC
[gobblin] branch master updated: [GOBBLIN-1706] Add DagActionStore to store the action to kill/resume one flow execution (#3558)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new bbd18f33c [GOBBLIN-1706] Add DagActionStore to store the action to kill/resume one flow execution (#3558)
bbd18f33c is described below
commit bbd18f33c9df0776c837b2df1353270b5719edae
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Fri Sep 23 10:04:41 2022 -0700
[GOBBLIN-1706] Add DagActionStore to store the action to kill/resume one flow execution (#3558)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1706]Add DagActionStore to store the action to kill/resume one flow execution
* add new flow execution handler which use DagactionStore to persist dag actions and let other host get the info
* Make dag manager integrate with the dag action store
* address comments
* address comments
* fix typo and add comments
* [GOBBLIN-1699] Log progress of reducer task for visibility with slow compaction jobs #3552
* before starting reduce
* after first record is reduced
* after reducing every 1000 records
Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
* [GOBBLIN-1673][GOBBLIN-1683] Skeleton code for handling messages between task runner / application master for Dynamic work unit allocation (#3539)
* [GOBBLIN-1673] Schema for dynamic work unit message
* [GOBBLIN-1683] Dynamic Work Unit messaging abstractions
* [GOBBLIN-1698] Fast fail during work unit generation based on config. (#3542)
* fast fail during work unit generation based on config.
* [GOBBLIN-1690] Added logging to ORC writer
Closes #3543 from rdsr/master
* [GOBBLIN-1678] Refactor git flowgraph component to be extensible (#3536)
* Refactor git flowgraph component to be extensible
* Move files to appropriate modules
* Cleanup and add javadocs
* Cleanup, add missing javadocs
* Address review and import order
* Fix findbugs
* Use java sort instead of collections
* Add GMCE topic explicitly to hive commit event (#3547)
* [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode (#3544)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode
* add orchestor as listener before service start
* fix code style
* address comments
* fix test case to test orchestor as one listener of flow spec
* remove unintentional change
* remove unused import
* address comments
* fix typo
Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
* fast fail during work unit generation based on config.
Co-authored-by: Meeth Gala <mg...@linkedin.com>
Co-authored-by: Ratandeep <rd...@gmail.com>
Co-authored-by: William Lo <lo...@gmail.com>
Co-authored-by: Jack Moseley <jm...@linkedin.com>
Co-authored-by: Zihan Li <zi...@linkedin.com>
Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
* Define basics for collecting Iceberg metadata for the current snapshot (#3559)
* [GOBBLIN-1701] Replace jcenter with either maven central or gradle plugin portal (#3554)
* remove jcentral
* Use gradle plugin portal for shadow
* Use maven central in all other cases
* [GOBBLIN-1695] Fix: Failure to add spec executors doesn't block deployment (#3551)
* Allow first time failure to authenticate with Azkaban to fail silently
* Fix findbugs report
* Refactor azkaban authentication into function. Call on init and if session_id is null when adding a flow
* Add handling for fetchSession throwing an exception
* Add logging when fails on constructor and initialization, but continue to local deploy
* Revert changes for azkabanSpecProducer, but quiet log instead of throw in constructor
* Fixed vars
* Revert changes on azkabanSpecProducer
* clean up error throwing
* revert function checking changes
* Reformat file
* Clean up function
* Format file for try/catch
* Allow first time failure to authenticate with Azkaban to fail silently
* Fix findbugs report
* Refactor azkaban authentication into function. Call on init and if session_id is null when adding a flow
* Fixed rebase
* Fixed rebase
* Revert changes for azkabanSpecProducer, but quiet log instead of throw in constructor
* Add whitespace back
* fix helix job wait completion bug when job goes to STOPPING state (#3556)
address comments
update stoppingStateEndTime with currentTime
update test cases
* [GOBBLIN-1699] Log progress of reducer task for visibility with slow compaction jobs #3552
* before starting reduce
* after first record is reduced
* after reducing every 1000 records
Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
* Define basics for collecting Iceberg metadata for the current snapshot
* [GOBBLIN-1673][GOBBLIN-1683] Skeleton code for handling messages between task runner / application master for Dynamic work unit allocation (#3539)
* [GOBBLIN-1673] Schema for dynamic work unit message
* [GOBBLIN-1683] Dynamic Work Unit messaging abstractions
* Address review comments
* Correct import order
Co-authored-by: Matthew Ho <ho...@gmail.com>
Co-authored-by: Andy Jiang <20...@users.noreply.github.com>
Co-authored-by: Hanghang Nate Liu <na...@gmail.com>
Co-authored-by: umustafi <um...@gmail.com>
Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
Co-authored-by: William Lo <lo...@gmail.com>
* [GOBBLIN-1710] Codecov should be optional in CI and not fail Github Actions (#3562)
* [GOBBLIN-1711] Replace Jcenter with maven central (#3566)
* [GOBBLIN-1697]Have a separate resource handler to rely on CDC stream to do message forwarding (#3549)
* address comments
* use connectionmanager when httpclient is not cloesable
* fix test case to test orchestor as one listener of flow spec
* remove unintentional change
* [GOBBLIN-1697]Have a separate resource handler to rely on CDC stream to do message forwarding
* fix compilation error
* address comments
* address comments
* address comments
* update outdated javadoc
Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
* [GOBBLIN-1709] Create Iceberg Datasets Finder, Iceberg Dataset and FileSet to generate Copy Entities to support Distcp for Iceberg (#3560)
* initial commit for iceberg distcp.
* adding copy entity helper and icerbeg distcp template and test case.
* Adding unit tests and refactoring method definitions for an Iceberg dataset.
* resolve conflicts after cleaning history
* update iceberg dataset and finder to include javadoc
* addressed comments on PR and aligned code check style
* renamed vars, added logging and updated javadoc
* update dataset descriptor with ternary operation and rename fs to sourceFs
* added source and target fs and update iceberg dataset finder constructor
* Update source and dest dataset methods as protected and add req args constructor
* change the order of attributes for iceberg dataset finder ctor
* update iceberg dataset methods with correct source and target fs
Co-authored-by: Meeth Gala <mg...@linkedin.com>
* [GOBBLIN-1707] Add `IcebergTableTest` unit test (#3564)
* Add `IcebergTableTest` unit test
* Fixup comment and indentation
* Minor correction of `Long` => `Integer`
* Correct comment
* [GOBBLIN-1711] Replace Jcenter with maven central (#3566)
* Minor rename of local var
Co-authored-by: Matthew Ho <ho...@gmail.com>
* [GOBBLIN-1708] Improve TimeAwareRecursiveCopyableDataset to lookback only into datefolders that match range (#3563)
* Check datetime range validity prior to recursing
* Remove unused packages
* Remove extra line
* Reformat function
* Check string prior to parsing
* removed unused import
* Change checkpathdatetimevalidity to use available localdatetime library parsing functions
* Change to isempty
* Modify check path to be flexible
* Update javadoc
* Add unit tests and refactor
* change bind class as GOBBLIN-1697 get merged
Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
Co-authored-by: umustafi <um...@gmail.com>
Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
Co-authored-by: Matthew Ho <ho...@gmail.com>
Co-authored-by: meethngala <me...@gmail.com>
Co-authored-by: Meeth Gala <mg...@linkedin.com>
Co-authored-by: Ratandeep <rd...@gmail.com>
Co-authored-by: William Lo <lo...@gmail.com>
Co-authored-by: Jack Moseley <jm...@linkedin.com>
Co-authored-by: Kip Kohn <ck...@linkedin.com>
Co-authored-by: Andy Jiang <20...@users.noreply.github.com>
Co-authored-by: Hanghang Nate Liu <na...@gmail.com>
---
.../apache/gobblin/service/ServiceConfigKeys.java | 1 +
.../java/org/apache/gobblin/MysqlDagStore.java | 2 +-
.../apache/gobblin/runtime/api/DagActionStore.java | 95 +++++++++++
.../dag_action_store/MysqlDagActionStore.java | 174 +++++++++++++++++++++
.../dag_action_store/MysqlDagActionStoreTest.java | 100 ++++++++++++
.../modules/core/GobblinServiceGuiceModule.java | 20 ++-
.../orchestration/AbstractUserQuotaManager.java | 8 +-
.../service/modules/orchestration/DagManager.java | 131 +++++++++++-----
.../modules/orchestration/DagManagerUtils.java | 28 ++--
.../modules/orchestration/FSDagStateStore.java | 2 +-
.../modules/orchestration/MysqlDagStateStore.java | 4 +-
...lowExecutionResourceHandlerWithWarmStandby.java | 95 +++++++++++
.../modules/orchestration/DagManagerFlowTest.java | 51 ++++--
.../modules/orchestration/DagManagerTest.java | 35 +++--
.../modules/orchestration/FSDagStateStoreTest.java | 6 +-
.../orchestration/MysqlDagStateStoreTest.java | 8 +-
16 files changed, 667 insertions(+), 93 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index dbb9ef072..55b8282fe 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -152,6 +152,7 @@ public class ServiceConfigKeys {
public static final int MAX_FLOW_NAME_LENGTH = 128; // defined in FlowId.pdl
public static final int MAX_FLOW_GROUP_LENGTH = 128; // defined in FlowId.pdl
+ public static final int MAX_FLOW_EXECUTION_ID_LENGTH = 13; // length of flowExecutionId which is epoch timestamp
public static final int MAX_JOB_NAME_LENGTH = 374;
public static final int MAX_JOB_GROUP_LENGTH = 374;
public static final String STATE_STORE_TABLE_SUFFIX = "gst";
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/MysqlDagStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/MysqlDagStore.java
index e373577b1..18c56b4c9 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/MysqlDagStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/MysqlDagStore.java
@@ -52,7 +52,7 @@ public class MysqlDagStore<T extends State> extends MysqlStateStore<T> {
protected String getCreateJobStateTableTemplate() {
int maxStoreName = ServiceConfigKeys.MAX_FLOW_NAME_LENGTH + ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER.length()
+ ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH;
- int maxTableName = 13; // length of flowExecutionId which is epoch timestamp
+ int maxTableName = ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH;
return "CREATE TABLE IF NOT EXISTS $TABLE$ (store_name varchar(" + maxStoreName + ") CHARACTER SET latin1 COLLATE latin1_bin not null,"
+ "table_name varchar(" + maxTableName + ") CHARACTER SET latin1 COLLATE latin1_bin not null,"
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
new file mode 100644
index 000000000..5da8e6d31
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Collection;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+
+public interface DagActionStore {
+ enum DagActionValue {
+ KILL,
+ RESUME
+ }
+
+ @Getter
+ @EqualsAndHashCode
+ class DagAction {
+ String flowGroup;
+ String flowName;
+ String flowExecutionId;
+ DagActionValue dagActionValue;
+ public DagAction(String flowGroup, String flowName, String flowExecutionId, DagActionValue dagActionValue) {
+ this.flowGroup = flowGroup;
+ this.flowName = flowName;
+ this.flowExecutionId = flowExecutionId;
+ this.dagActionValue = dagActionValue;
+ }
+ }
+
+
+ /**
+ * Check if an action exists in dagAction store by flow group, flow name and flow execution id.
+ * @param flowGroup flow group for the dag action
+ * @param flowName flow name for the dag action
+ * @param flowExecutionId flow execution for the dag action
+ * @throws IOException
+ */
+ boolean exists(String flowGroup, String flowName, String flowExecutionId) throws IOException, SQLException;
+
+ /**
+ * Persist the dag action in {@link DagActionStore} for durability
+ * @param flowGroup flow group for the dag action
+ * @param flowName flow name for the dag action
+ * @param flowExecutionId flow execution for the dag action
+ * @param dagActionValue the value of the dag action
+ * @throws IOException
+ */
+ void addDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionValue dagActionValue) throws IOException;
+
+ /**
+ * delete the dag action from {@link DagActionStore}
+ * @param flowGroup flow group for the dag action
+ * @param flowName flow name for the dag action
+ * @param flowExecutionId flow execution for the dag action
+ * @throws IOException
+ * @return true if we successfully delete one record, return false if the record does not exist
+ */
+ boolean deleteDagAction(String flowGroup, String flowName, String flowExecutionId) throws IOException;
+
+ /***
+ * Retrieve action value by the flow group, flow name and flow execution id from the {@link DagActionStore}.
+ * @param flowGroup flow group for the dag action
+ * @param flowName flow name for the dag action
+ * @param flowExecutionId flow execution for the dag action
+ * @throws IOException Exception in retrieving the {@link DagAction}.
+ * @throws SpecNotFoundException If {@link DagAction} being retrieved is not present in store.
+ */
+ DagAction getDagAction(String flowGroup, String flowName, String flowExecutionId) throws IOException, SpecNotFoundException,
+ SQLException;
+
+ /***
+ * Get all {@link DagAction}s from the {@link DagActionStore}.
+ * @throws IOException Exception in retrieving {@link DagAction}s.
+ */
+ Collection<DagAction> getDagActions() throws IOException;
+
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
new file mode 100644
index 000000000..7600e304f
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.dag_action_store;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.HashSet;
+import javax.sql.DataSource;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class MysqlDagActionStore implements DagActionStore {
+
+ public static final String CONFIG_PREFIX = "MysqlDagActionStore";
+
+
+ protected final DataSource dataSource;
+ private final String tableName;
+ private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM %s WHERE flow_group = ? AND flow_name =? AND flow_execution_id = ?)";
+
+ protected static final String INSERT_STATEMENT = "INSERT INTO %s (flow_group, flow_name, flow_execution_id, dag_action ) "
+ + "VALUES (?, ?, ?, ?)";
+ private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE flow_group = ? AND flow_name =? AND flow_execution_id = ?";
+ private static final String GET_STATEMENT = "SELECT flow_group, flow_name, flow_execution_id, dag_action FROM %s WHERE flow_group = ? AND flow_name =? AND flow_execution_id = ?";
+ private static final String GET_ALL_STATEMENT = "SELECT flow_group, flow_name, flow_execution_id, dag_action FROM %s";
+ private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (" +
+ "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, "
+ + "flow_execution_id varchar(" + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, "
+ + "dag_action varchar(100) NOT NULL, modified_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP NOT NULL, "
+ + "PRIMARY KEY (flow_group,flow_name,flow_execution_id))";
+
+ @Inject
+ public MysqlDagActionStore(Config config) throws IOException {
+ if (config.hasPath(CONFIG_PREFIX)) {
+ config = config.getConfig(CONFIG_PREFIX).withFallback(config);
+ } else {
+ throw new IOException("Please specify the config for MysqlDagActionStore");
+ }
+ this.tableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
+ ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
+
+ this.dataSource = MysqlStateStore.newDataSource(config);
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
+ createStatement.executeUpdate();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new IOException("Failure creation table " + tableName, e);
+ }
+ }
+
+ @Override
+ public boolean exists(String flowGroup, String flowName, String flowExecutionId) throws IOException, SQLException {
+ ResultSet rs = null;
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement existStatement = connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) {
+ int i = 0;
+ existStatement.setString(++i, flowGroup);
+ existStatement.setString(++i, flowName);
+ existStatement.setString(++i, flowExecutionId);
+ rs = existStatement.executeQuery();
+ rs.next();
+ return rs.getBoolean(1);
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure checking existence for table %s of flow with flow group:%s, flow name:%s and flow execution id:%s",
+ tableName, flowGroup, flowName, flowExecutionId), e);
+ } finally {
+ if (rs != null) {
+ rs.close();
+ }
+ }
+ }
+
+ @Override
+ public void addDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionValue dagActionValue)
+ throws IOException {
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement insertStatement = connection.prepareStatement(String.format(INSERT_STATEMENT, tableName))) {
+ int i = 0;
+ insertStatement.setString(++i, flowGroup);
+ insertStatement.setString(++i, flowName);
+ insertStatement.setString(++i, flowExecutionId);
+ insertStatement.setString(++i, dagActionValue.toString());
+ insertStatement.executeUpdate();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure to adding action for table %s of flow with flow group:%s, flow name:%s and flow execution id:%s",
+ tableName, flowGroup, flowName, flowExecutionId), e);
+ }
+ }
+
+ @Override
+ public boolean deleteDagAction(String flowGroup, String flowName, String flowExecutionId) throws IOException {
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement deleteStatement = connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) {
+ int i = 0;
+ deleteStatement.setString(++i, flowGroup);
+ deleteStatement.setString(++i, flowName);
+ deleteStatement.setString(++i, flowExecutionId);
+ int result = deleteStatement.executeUpdate();
+ connection.commit();
+ return result != 0;
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure to delete action for table %s of flow with flow group:%s, flow name:%s and flow execution id:%s",
+ tableName, flowGroup, flowName, flowExecutionId), e);
+ }
+ }
+
+ @Override
+ public DagAction getDagAction(String flowGroup, String flowName, String flowExecutionId)
+ throws IOException, SQLException {
+ ResultSet rs = null;
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement getStatement = connection.prepareStatement(String.format(GET_STATEMENT, tableName))) {
+ int i = 0;
+ getStatement.setString(++i, flowGroup);
+ getStatement.setString(++i, flowName);
+ getStatement.setString(++i, flowExecutionId);
+ rs = getStatement.executeQuery();
+ if (rs.next()) {
+ return new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), DagActionValue.valueOf(rs.getString(4)));
+ }
+ return null;
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure get dag action from table %s of flow with flow group:%s, flow name:%s and flow execution id:%s",
+ tableName, flowGroup, flowName, flowExecutionId), e);
+ } finally {
+ if (rs != null) {
+ rs.close();
+ }
+ }
+ }
+
+ @Override
+ public Collection<DagAction> getDagActions() throws IOException {
+ HashSet<DagAction> result = new HashSet<>();
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement getAllStatement = connection.prepareStatement(String.format(GET_ALL_STATEMENT, tableName));
+ ResultSet rs = getAllStatement.executeQuery()) {
+ while (rs.next()) {
+ result.add(
+ new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), DagActionValue.valueOf(rs.getString(4))));
+ }
+ return result;
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure get dag actions from table %s ", tableName), e);
+ }
+ }
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java
new file mode 100644
index 000000000..0c65f2241
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.dag_action_store;
+
+import java.io.IOException;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.HashSet;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+
+public class MysqlDagActionStoreTest {
+ private static final String USER = "testUser";
+ private static final String PASSWORD = "testPassword";
+ private static final String TABLE = "dag_action_store";
+ private static final String flowGroup = "testFlowGroup";
+ private static final String flowName = "testFlowName";
+ private static final String flowExecutionId = "12345677";
+ private static final String flowExecutionId_2 = "12345678";
+ private static final String flowExecutionId_3 = "12345679";
+ private MysqlDagActionStore mysqlDagActionStore;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+
+ Config config = ConfigBuilder.create()
+ .addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
+ .addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+ .addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+ .addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+ .build();
+
+ this.mysqlDagActionStore = new MysqlDagActionStore(config);
+ }
+
+ @Test
+ public void testAddAction() throws Exception {
+ this.mysqlDagActionStore.addDagAction(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.KILL);
+ //Should not be able to add again when previous one exist
+ Assert.expectThrows(IOException.class,
+ () -> this.mysqlDagActionStore.addDagAction(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.RESUME));
+ //Should be able to add un-exist one
+ this.mysqlDagActionStore.addDagAction(flowGroup, flowName, flowExecutionId_2, DagActionStore.DagActionValue.RESUME);
+ }
+
+ @Test(dependsOnMethods = "testAddAction")
+ public void testExists() throws Exception {
+ Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, flowExecutionId));
+ Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, flowExecutionId_2));
+ Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName, flowExecutionId_3));
+ }
+
+ @Test(dependsOnMethods = "testExists")
+ public void testGetAction() throws IOException, SQLException {
+ Assert.assertEquals(new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.KILL), this.mysqlDagActionStore.getDagAction(flowGroup, flowName, flowExecutionId));
+ Assert.assertEquals(new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId_2, DagActionStore.DagActionValue.RESUME), this.mysqlDagActionStore.getDagAction(flowGroup, flowName, flowExecutionId_2));
+ Collection<DagActionStore.DagAction> dagActions = this.mysqlDagActionStore.getDagActions();
+ Assert.assertEquals(2, dagActions.size());
+ HashSet<DagActionStore.DagAction> set = new HashSet<>();
+ set.add(new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.KILL));
+ set.add(new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId_2, DagActionStore.DagActionValue.RESUME));
+ Assert.assertEquals(dagActions, set);
+ }
+
+ @Test(dependsOnMethods = "testGetAction")
+ public void testDeleteAction() throws IOException, SQLException {
+ this.mysqlDagActionStore.deleteDagAction(flowGroup, flowName, flowExecutionId);
+ Assert.assertEquals(this.mysqlDagActionStore.getDagActions().size(), 1);
+ Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName, flowExecutionId));
+ Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, flowExecutionId_2));
+ Assert.assertNull( this.mysqlDagActionStore.getDagAction(flowGroup, flowName, flowExecutionId));
+ }
+
+}
\ No newline at end of file
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 310073a8c..6b9645f19 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -19,7 +19,12 @@ package org.apache.gobblin.service.modules.core;
import java.util.Objects;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+//import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
+import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
+import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
import org.apache.helix.HelixManager;
import org.slf4j.Logger;
@@ -139,10 +144,17 @@ public class GobblinServiceGuiceModule implements Module {
binder.bindConstant()
.annotatedWith(Names.named(InjectionNames.WARM_STANDBY_ENABLED))
.to(serviceConfig.isWarmStandbyEnabled());
-
- binder.bind(FlowConfigsResourceHandler.class).to(GobblinServiceFlowConfigResourceHandler.class);
- binder.bind(FlowConfigsV2ResourceHandler.class).to(GobblinServiceFlowConfigV2ResourceHandler.class);
- binder.bind(FlowExecutionResourceHandler.class).to(GobblinServiceFlowExecutionResourceHandler.class);
+ OptionalBinder.newOptionalBinder(binder, DagActionStore.class);
+ if (serviceConfig.isWarmStandbyEnabled()) {
+ binder.bind(DagActionStore.class).to(MysqlDagActionStore.class);
+ binder.bind(FlowConfigsResourceHandler.class).to(GobblinServiceFlowConfigResourceHandler.class);
+ binder.bind(FlowConfigsV2ResourceHandler.class).to(GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby.class);
+ binder.bind(FlowExecutionResourceHandler.class).to(GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.class);
+ } else {
+ binder.bind(FlowConfigsResourceHandler.class).to(GobblinServiceFlowConfigResourceHandler.class);
+ binder.bind(FlowConfigsV2ResourceHandler.class).to(GobblinServiceFlowConfigV2ResourceHandler.class);
+ binder.bind(FlowExecutionResourceHandler.class).to(GobblinServiceFlowExecutionResourceHandler.class);
+ }
binder.bind(FlowConfigsResource.class);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
index 8d6cab041..fad079bf0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
@@ -93,16 +93,16 @@ abstract public class AbstractUserQuotaManager implements UserQuotaManager {
decrementJobCount(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), CountType.USER_COUNT);
decrementQuotaUsageForUsers(usersQuotaIncrement);
decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);
- runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
+ runningDagIds.remove(DagManagerUtils.generateDagId(dagNode).toString());
}
protected QuotaCheck increaseAndCheckQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
QuotaCheck quotaCheck = new QuotaCheck(true, true, true, "");
// Dag is already being tracked, no need to double increment for retries and multihop flows
- if (this.runningDagIds.contains(DagManagerUtils.generateDagId(dagNode))) {
+ if (this.runningDagIds.contains(DagManagerUtils.generateDagId(dagNode).toString())) {
return quotaCheck;
} else {
- runningDagIds.add(DagManagerUtils.generateDagId(dagNode));
+ runningDagIds.add(DagManagerUtils.generateDagId(dagNode).toString());
}
String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
@@ -169,7 +169,7 @@ abstract public class AbstractUserQuotaManager implements UserQuotaManager {
* Returns true if the dag existed in the set of running dags and was removed successfully
*/
public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
- boolean val = runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
+ boolean val = runningDagIds.remove(DagManagerUtils.generateDagId(dagNode).toString());
if (!val) {
return false;
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 3543c0e16..fcb2ff4c1 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -18,9 +18,13 @@
package org.apache.gobblin.service.modules.orchestration;
import com.codahale.metrics.Meter;
+import com.google.common.base.Joiner;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -49,8 +53,7 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigException;
-import javax.inject.Inject;
-import javax.inject.Singleton;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -62,6 +65,7 @@ import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
@@ -156,9 +160,27 @@ public class DagManager extends AbstractIdleService {
}
}
+ @Getter
+ @EqualsAndHashCode
+ public static class DagId {
+ String flowGroup;
+ String flowName;
+ String flowExecutionId;
+ public DagId(String flowGroup, String flowName, String flowExecutionId) {
+ this.flowGroup = flowGroup;
+ this.flowName = flowName;
+ this.flowExecutionId = flowExecutionId;
+ }
+
+ @Override
+ public String toString() {
+ return Joiner.on("_").join(flowGroup, flowName, flowExecutionId);
+ }
+ }
+
private final BlockingQueue<Dag<JobExecutionPlan>>[] runQueue;
- private final BlockingQueue<String>[] cancelQueue;
- private final BlockingQueue<String>[] resumeQueue;
+ private final BlockingQueue<DagId>[] cancelQueue;
+ private final BlockingQueue<DagId>[] resumeQueue;
DagManagerThread[] dagManagerThreads;
private final ScheduledExecutorService scheduledExecutorPool;
@@ -177,15 +199,16 @@ public class DagManager extends AbstractIdleService {
private final Optional<EventSubmitter> eventSubmitter;
private final long failedDagRetentionTime;
private final DagManagerMetrics dagManagerMetrics;
+ private final Optional<DagActionStore> dagActionStore;
private volatile boolean isActive = false;
- public DagManager(Config config, JobStatusRetriever jobStatusRetriever, boolean instrumentationEnabled) {
+ public DagManager(Config config, JobStatusRetriever jobStatusRetriever, Optional<DagActionStore> dagActionStore, boolean instrumentationEnabled) {
this.config = config;
this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, DEFAULT_NUM_THREADS);
this.runQueue = (BlockingQueue<Dag<JobExecutionPlan>>[]) initializeDagQueue(this.numThreads);
- this.cancelQueue = (BlockingQueue<String>[]) initializeDagQueue(this.numThreads);
- this.resumeQueue = (BlockingQueue<String>[]) initializeDagQueue(this.numThreads);
+ this.cancelQueue = (BlockingQueue<DagId>[]) initializeDagQueue(this.numThreads);
+ this.resumeQueue = (BlockingQueue<DagId>[]) initializeDagQueue(this.numThreads);
this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
this.pollingInterval = ConfigUtils.getInt(config, JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
this.retentionPollingInterval = ConfigUtils.getInt(config, FAILED_DAG_POLLING_INTERVAL, DEFAULT_FAILED_DAG_POLLING_INTERVAL);
@@ -197,6 +220,7 @@ public class DagManager extends AbstractIdleService {
} else {
this.eventSubmitter = Optional.absent();
}
+ this.dagActionStore = dagActionStore;
this.dagManagerMetrics = new DagManagerMetrics(metricContext);
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, JOB_START_SLA_UNITS, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
this.defaultJobStartSlaTimeMillis = jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
@@ -224,9 +248,9 @@ public class DagManager extends AbstractIdleService {
return queue;
}
- @Inject
- public DagManager(Config config, JobStatusRetriever jobStatusRetriever) {
- this(config, jobStatusRetriever, true);
+ @Inject(optional = true)
+ public DagManager(Config config, JobStatusRetriever jobStatusRetriever, Optional<DagActionStore> dagActionStore) {
+ this(config, jobStatusRetriever, dagActionStore, true);
}
/** Do Nothing on service startup. Scheduling of {@link DagManagerThread}s and loading of any {@link Dag}s is done
@@ -295,7 +319,7 @@ public class DagManager extends AbstractIdleService {
*/
private void killFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException {
int queueId = DagManagerUtils.getDagQueueId(flowExecutionId, this.numThreads);
- String dagId = DagManagerUtils.generateDagId(flowGroup, flowName, flowExecutionId);
+ DagId dagId = DagManagerUtils.generateDagId(flowGroup, flowName, flowExecutionId);
if (!this.cancelQueue[queueId].offer(dagId)) {
throw new IOException("Could not add dag " + dagId + " to cancellation queue.");
}
@@ -303,21 +327,27 @@ public class DagManager extends AbstractIdleService {
@Subscribe
public void handleKillFlowEvent(KillFlowEvent killFlowEvent) {
- log.info("Received kill request for flow ({}, {}, {})", killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(), killFlowEvent.getFlowExecutionId());
- try {
- killFlow(killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(), killFlowEvent.getFlowExecutionId());
- } catch (IOException e) {
- log.warn("Failed to kill flow", e);
+ if (isActive) {
+ log.info("Received kill request for flow ({}, {}, {})", killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(),
+ killFlowEvent.getFlowExecutionId());
+ try {
+ killFlow(killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(), killFlowEvent.getFlowExecutionId());
+ } catch (IOException e) {
+ log.warn("Failed to kill flow", e);
+ }
}
}
@Subscribe
public void handleResumeFlowEvent(ResumeFlowEvent resumeFlowEvent) {
- log.info("Received resume request for flow ({}, {}, {})", resumeFlowEvent.getFlowGroup(), resumeFlowEvent.getFlowName(), resumeFlowEvent.getFlowExecutionId());
- String dagId = DagManagerUtils.generateDagId(resumeFlowEvent.getFlowGroup(), resumeFlowEvent.getFlowName(), resumeFlowEvent.getFlowExecutionId());
- int queueId = DagManagerUtils.getDagQueueId(resumeFlowEvent.getFlowExecutionId(), this.numThreads);
- if (!this.resumeQueue[queueId].offer(dagId)) {
- log.warn("Could not add dag " + dagId + " to resume queue");
+ if (isActive) {
+ log.info("Received resume request for flow ({}, {}, {})", resumeFlowEvent.getFlowGroup(), resumeFlowEvent.getFlowName(), resumeFlowEvent.getFlowExecutionId());
+ DagId dagId = DagManagerUtils.generateDagId(resumeFlowEvent.getFlowGroup(), resumeFlowEvent.getFlowName(),
+ resumeFlowEvent.getFlowExecutionId());
+ int queueId = DagManagerUtils.getDagQueueId(resumeFlowEvent.getFlowExecutionId(), this.numThreads);
+ if (!this.resumeQueue[queueId].offer(dagId)) {
+ log.warn("Could not add dag " + dagId + " to resume queue");
+ }
}
}
@@ -356,7 +386,7 @@ public class DagManager extends AbstractIdleService {
//On startup, the service creates DagManagerThreads that are scheduled at a fixed rate.
this.dagManagerThreads = new DagManagerThread[numThreads];
for (int i = 0; i < numThreads; i++) {
- DagManagerThread dagManagerThread = new DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
+ DagManagerThread dagManagerThread = new DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore, dagActionStore,
runQueue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, failedDagIds, this.dagManagerMetrics,
this.defaultJobStartSlaTimeMillis, quotaManager, i);
this.dagManagerThreads[i] = dagManagerThread;
@@ -369,6 +399,21 @@ public class DagManager extends AbstractIdleService {
for (Dag<JobExecutionPlan> dag : dags) {
addDag(dag, false, false);
}
+ if (dagActionStore.isPresent()) {
+ Collection<DagActionStore.DagAction> dagActions = dagActionStore.get().getDagActions();
+ for (DagActionStore.DagAction action : dagActions) {
+ switch (action.getDagActionValue()) {
+ case KILL:
+ this.handleKillFlowEvent(new KillFlowEvent(action.getFlowGroup(), action.getFlowName(), Long.parseLong(action.getFlowExecutionId())));
+ break;
+ case RESUME:
+ this.handleResumeFlowEvent(new ResumeFlowEvent(action.getFlowGroup(), action.getFlowName(), Long.parseLong(action.getFlowExecutionId())));
+ break;
+ default:
+ log.warn("Unsupported dagAction: " + action.getDagActionValue().toString());
+ }
+ }
+ }
} else { //Mark the DagManager inactive.
log.info("Inactivating the DagManager. Shutting down all DagManager threads");
this.scheduledExecutorPool.shutdown();
@@ -413,16 +458,17 @@ public class DagManager extends AbstractIdleService {
private final DagStateStore dagStateStore;
private final DagStateStore failedDagStateStore;
private final BlockingQueue<Dag<JobExecutionPlan>> queue;
- private final BlockingQueue<String> cancelQueue;
- private final BlockingQueue<String> resumeQueue;
+ private final BlockingQueue<DagId> cancelQueue;
+ private final BlockingQueue<DagId> resumeQueue;
private final Long defaultJobStartSlaTimeMillis;
+ private final Optional<DagActionStore> dagActionStore;
private final Optional<Meter> dagManagerThreadHeartbeat;
/**
* Constructor.
*/
DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, DagStateStore failedDagStateStore,
- BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> cancelQueue, BlockingQueue<String> resumeQueue,
- boolean instrumentationEnabled, Set<String> failedDagIds, DagManagerMetrics dagManagerMetrics,
+ Optional<DagActionStore> dagActionStore, BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<DagId> cancelQueue,
+ BlockingQueue<DagId> resumeQueue, boolean instrumentationEnabled, Set<String> failedDagIds, DagManagerMetrics dagManagerMetrics,
Long defaultJobStartSla, UserQuotaManager quotaManager, int dagMangerThreadId) {
this.jobStatusRetriever = jobStatusRetriever;
this.dagStateStore = dagStateStore;
@@ -434,6 +480,7 @@ public class DagManager extends AbstractIdleService {
this.dagManagerMetrics = dagManagerMetrics;
this.defaultJobStartSlaTimeMillis = defaultJobStartSla;
this.quotaManager = quotaManager;
+ this.dagActionStore = dagActionStore;
if (instrumentationEnabled) {
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
@@ -459,7 +506,7 @@ public class DagManager extends AbstractIdleService {
@Override
public void run() {
try {
- String nextDagToCancel = cancelQueue.poll();
+ DagId nextDagToCancel = cancelQueue.poll();
//Poll the cancelQueue for a new Dag to cancel.
if (nextDagToCancel != null) {
cancelDag(nextDagToCancel);
@@ -478,7 +525,7 @@ public class DagManager extends AbstractIdleService {
}
while (!resumeQueue.isEmpty()) {
- String dagId = resumeQueue.poll();
+ DagId dagId = resumeQueue.poll();
beginResumingDag(dagId);
}
@@ -498,18 +545,27 @@ public class DagManager extends AbstractIdleService {
}
}
+ private void clearUpDagAction(DagId dagId) throws IOException {
+ if (this.dagActionStore.isPresent()) {
+ this.dagActionStore.get().deleteDagAction(dagId.flowGroup, dagId.flowName, dagId.flowExecutionId);
+ }
+ }
+
/**
* Begin resuming a dag by setting the status of both the dag and the failed/cancelled dag nodes to {@link ExecutionStatus#PENDING_RESUME},
* and also sending events so that this status will be reflected in the job status state store.
*/
- private void beginResumingDag(String dagId) throws IOException {
+ private void beginResumingDag(DagId dagIdToResume) throws IOException {
+ String dagId= dagIdToResume.toString();
if (!this.failedDagIds.contains(dagId)) {
log.warn("No dag found with dagId " + dagId + ", so cannot resume flow");
+ clearUpDagAction(dagIdToResume);
return;
}
Dag<JobExecutionPlan> dag = this.failedDagStateStore.getDag(dagId);
if (dag == null) {
log.error("Dag " + dagId + " was found in memory but not found in failed dag state store");
+ clearUpDagAction(dagIdToResume);
return;
}
@@ -560,6 +616,7 @@ public class DagManager extends AbstractIdleService {
if (dagReady) {
this.dagStateStore.writeCheckpoint(dag.getValue());
this.failedDagStateStore.cleanUp(dag.getValue());
+ clearUpDagAction(DagManagerUtils.generateDagId(dag.getValue()));
this.failedDagIds.remove(dag.getKey());
this.resumingDags.remove(dag.getKey());
initialize(dag.getValue());
@@ -573,7 +630,8 @@ public class DagManager extends AbstractIdleService {
* @throws ExecutionException executionException
* @throws InterruptedException interruptedException
*/
- private void cancelDag(String dagToCancel) throws ExecutionException, InterruptedException {
+ private void cancelDag(DagId dagId) throws ExecutionException, InterruptedException, IOException {
+ String dagToCancel = dagId.toString();
log.info("Cancel flow with DagId {}", dagToCancel);
if (this.dagToJobs.containsKey(dagToCancel)) {
List<DagNode<JobExecutionPlan>> dagNodesToCancel = this.dagToJobs.get(dagToCancel);
@@ -587,6 +645,7 @@ public class DagManager extends AbstractIdleService {
} else {
log.warn("Did not find Dag with id {}, it might be already cancelled/finished.", dagToCancel);
}
+ clearUpDagAction(dagId);
}
private void cancelDagNode(DagNode<JobExecutionPlan> dagNodeToCancel) throws ExecutionException, InterruptedException {
@@ -615,7 +674,7 @@ public class DagManager extends AbstractIdleService {
private void initialize(Dag<JobExecutionPlan> dag)
throws IOException {
//Add Dag to the map of running dags
- String dagId = DagManagerUtils.generateDagId(dag);
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
log.info("Initializing Dag {}", DagManagerUtils.getFullyQualifiedDagName(dag));
if (this.dags.containsKey(dagId)) {
log.warn("Already tracking a dag with dagId {}, skipping.", dagId);
@@ -730,7 +789,7 @@ public class DagManager extends AbstractIdleService {
}
for (DagNode<JobExecutionPlan> dagNode: nodesToCleanUp) {
- String dagId = DagManagerUtils.generateDagId(dagNode);
+ String dagId = DagManagerUtils.generateDagId(dagNode).toString();
deleteJobState(dagId, dagNode);
}
}
@@ -759,7 +818,7 @@ public class DagManager extends AbstractIdleService {
dagManagerMetrics.incrementCountsStartSlaExceeded(node);
cancelDagNode(node);
- String dagId = DagManagerUtils.generateDagId(node);
+ String dagId = DagManagerUtils.generateDagId(node).toString();
this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
this.dags.get(dagId).setMessage("Flow killed because no update received for " + timeOutForJobStart + " ms after orchestration");
return true;
@@ -792,7 +851,7 @@ public class DagManager extends AbstractIdleService {
private boolean slaKillIfNeeded(DagNode<JobExecutionPlan> node) throws ExecutionException, InterruptedException {
long flowStartTime = DagManagerUtils.getFlowStartTime(node);
long currentTime = System.currentTimeMillis();
- String dagId = DagManagerUtils.generateDagId(node);
+ String dagId = DagManagerUtils.generateDagId(node).toString();
long flowSla;
if (dagToSLA.containsKey(dagId)) {
@@ -924,7 +983,7 @@ public class DagManager extends AbstractIdleService {
Future<?> addSpecFuture = producer.addSpec(jobSpec);
dagNode.getValue().setJobFuture(Optional.of(addSpecFuture));
//Persist the dag
- this.dagStateStore.writeCheckpoint(this.dags.get(DagManagerUtils.generateDagId(dagNode)));
+ this.dagStateStore.writeCheckpoint(this.dags.get(DagManagerUtils.generateDagId(dagNode).toString()));
addSpecFuture.get();
@@ -957,7 +1016,7 @@ public class DagManager extends AbstractIdleService {
private Map<String, Set<DagNode<JobExecutionPlan>>> onJobFinish(DagNode<JobExecutionPlan> dagNode)
throws IOException {
Dag<JobExecutionPlan> dag = this.jobToDag.get(dagNode);
- String dagId = DagManagerUtils.generateDagId(dag);
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
String jobName = DagManagerUtils.getFullyQualifiedJobName(dagNode);
ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
log.info("Job {} of Dag {} has finished with status {}", jobName, dagId, jobStatus.name());
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index dc79524b3..d6fb0c99f 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -29,7 +29,6 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
@@ -83,29 +82,34 @@ public class DagManagerUtils {
return Long.parseLong(dagId.substring(dagId.lastIndexOf('_') + 1));
}
+
/**
- * Generate a dagId from the given {@link Dag} instance.
+ * Generate a dagId object from the given {@link Dag} instance.
* @param dag instance of a {@link Dag}.
- * @return a String id associated corresponding to the {@link Dag} instance.
+ * @return a DagId object associated corresponding to the {@link Dag} instance.
*/
- static String generateDagId(Dag<JobExecutionPlan> dag) {
+ static DagManager.DagId generateDagId(Dag<JobExecutionPlan> dag) {
return generateDagId(dag.getStartNodes().get(0).getValue().getJobSpec().getConfig());
}
- static String generateDagId(Dag.DagNode<JobExecutionPlan> dagNode) {
- return generateDagId(dagNode.getValue().getJobSpec().getConfig());
- }
-
- private static String generateDagId(Config jobConfig) {
+ private static DagManager.DagId generateDagId(Config jobConfig) {
String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
long flowExecutionId = jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
- return generateDagId(flowGroup, flowName, flowExecutionId);
+ return new DagManager.DagId(flowGroup, flowName, String.valueOf(flowExecutionId));
+ }
+
+ static DagManager.DagId generateDagId(Dag.DagNode<JobExecutionPlan> dagNode) {
+ return generateDagId(dagNode.getValue().getJobSpec().getConfig());
+ }
+
+ static DagManager.DagId generateDagId(String flowGroup, String flowName, long flowExecutionId) {
+ return generateDagId(flowGroup, flowName, String.valueOf(flowExecutionId));
}
- static String generateDagId(String flowGroup, String flowName, long flowExecutionId) {
- return Joiner.on("_").join(flowGroup, flowName, flowExecutionId);
+ static DagManager.DagId generateDagId(String flowGroup, String flowName, String flowExecutionId) {
+ return new DagManager.DagId(flowGroup, flowName, flowExecutionId);
}
/**
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
index 6cc3cdb32..238c19edc 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
@@ -101,7 +101,7 @@ public class FSDagStateStore implements DagStateStore {
*/
@Override
public synchronized void cleanUp(Dag<JobExecutionPlan> dag) {
- cleanUp(DagManagerUtils.generateDagId(dag));
+ cleanUp(DagManagerUtils.generateDagId(dag).toString());
}
/**
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
index 9f98a4bad..2a11fef26 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
@@ -107,13 +107,13 @@ public class MysqlDagStateStore implements DagStateStore {
@Override
public void writeCheckpoint(Dag<JobExecutionPlan> dag)
throws IOException {
- mysqlStateStore.put(getStoreNameFromDagId(generateDagId(dag)), getTableNameFromDagId(generateDagId(dag)), convertDagIntoState(dag));
+ mysqlStateStore.put(getStoreNameFromDagId(generateDagId(dag).toString()), getTableNameFromDagId(generateDagId(dag).toString()), convertDagIntoState(dag));
}
@Override
public void cleanUp(Dag<JobExecutionPlan> dag)
throws IOException {
- cleanUp(generateDagId(dag));
+ cleanUp(generateDagId(dag).toString());
}
@Override
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
new file mode 100644
index 000000000..3f35152be
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.restli;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Inject;
+import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.server.RestLiServiceException;
+import com.linkedin.restli.server.UpdateResponse;
+import java.io.IOException;
+import java.sql.SQLException;
+import javax.inject.Named;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.util.InjectionNames;
+import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.helix.HelixManager;
+
+@Slf4j
+public class GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends GobblinServiceFlowExecutionResourceHandler{
+ private DagActionStore dagActionStore;
+ @Inject
+ public GobblinServiceFlowExecutionResourceHandlerWithWarmStandby(FlowExecutionResourceLocalHandler handler,
+ @Named(GobblinServiceManager.SERVICE_EVENT_BUS_NAME) EventBus eventBus,
+ Optional<HelixManager> manager, @Named(InjectionNames.FORCE_LEADER) boolean forceLeader, DagActionStore dagActionStore) {
+ super(handler, eventBus, manager, forceLeader);
+ this.dagActionStore = dagActionStore;
+ }
+
+
+ @Override
+ public void resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, EmptyRecord> key) {
+ String flowGroup = key.getKey().getFlowGroup();
+ String flowName = key.getKey().getFlowName();
+ Long flowExecutionId = key.getKey().getFlowExecutionId();
+ try {
+ this.dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId.toString(), DagActionStore.DagActionValue.RESUME);
+ } catch (IOException e) {
+ log.warn(
+ String.format("Failed to add execution resume action for flow %s %s %s to dag action store due to", flowGroup,
+ flowName, flowExecutionId), e);
+ this.handleException(flowGroup, flowName, flowExecutionId.toString(), e);
+ }
+
+ }
+
+ private void handleException (String flowGroup, String flowName, String flowExecutionId, Exception e) {
+ try {
+ if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId)) {
+ throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, e.getMessage());
+ } else {
+ throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+ } catch (IOException | SQLException ex) {
+ throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+ }
+
+ @Override
+ public UpdateResponse delete(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, EmptyRecord> key) {
+
+ String flowGroup = key.getKey().getFlowGroup();
+ String flowName = key.getKey().getFlowName();
+ Long flowExecutionId = key.getKey().getFlowExecutionId();
+ try {
+ this.dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId.toString(), DagActionStore.DagActionValue.KILL);
+ return new UpdateResponse(HttpStatus.S_200_OK);
+ } catch (IOException e) {
+ log.warn(
+ String.format("Failed to add execution delete action for flow %s %s %s to dag action store due to", flowGroup,
+ flowName, flowExecutionId), e);
+ handleException(flowGroup, flowName, flowExecutionId.toString(), e);
+ return new UpdateResponse(HttpStatus.S_500_INTERNAL_SERVER_ERROR);
+ }
+ }
+}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index cf18ddf9f..71ddd9333 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.service.modules.orchestration;
+import com.google.common.base.Optional;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
@@ -25,6 +26,11 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
@@ -57,14 +63,37 @@ public class DagManagerFlowTest {
MockedDagManager dagManager;
int dagNumThreads;
static final String ERROR_MESSAGE = "Waiting for the map to update";
+ private static final String USER = "testUser";
+ private static final String PASSWORD = "testPassword";
+ private static final String TABLE = "dag_action_store";
+ private static final String flowGroup = "testFlowGroup";
+ private static final String flowName = "testFlowName";
+ private static final String flowExecutionId = "12345677";
+ private static final String flowExecutionId_2 = "12345678";
+ private DagActionStore dagActionStore;
@BeforeClass
- public void setUp() {
+ public void setUp() throws Exception {
Properties props = new Properties();
props.put(DagManager.JOB_STATUS_POLLING_INTERVAL_KEY, 1);
- dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props), false);
+ ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+
+ Config config = ConfigBuilder.create()
+ .addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
+ .addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+ .addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+ .addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+ .build();
+
+ dagActionStore = new MysqlDagActionStore(config);
+ dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.KILL);
+ dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId_2, DagActionStore.DagActionValue.RESUME);
+ dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props), Optional.of(dagActionStore), false);
dagManager.setActive(true);
this.dagNumThreads = dagManager.getNumThreads();
+ Thread.sleep(10000);
+ // On active, should proceed request and delete action entry
+ Assert.assertEquals(dagActionStore.getDagActions().size(), 0);
}
@Test
@@ -77,9 +106,9 @@ public class DagManagerFlowTest {
Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("1", flowExecutionId2, "FINISH_RUNNING", 1);
Dag<JobExecutionPlan> dag3 = DagManagerTest.buildDag("2", flowExecutionId3, "FINISH_RUNNING", 1);
- String dagId1 = DagManagerUtils.generateDagId(dag1);
- String dagId2 = DagManagerUtils.generateDagId(dag2);
- String dagId3 = DagManagerUtils.generateDagId(dag3);
+ String dagId1 = DagManagerUtils.generateDagId(dag1).toString();
+ String dagId2 = DagManagerUtils.generateDagId(dag2).toString();
+ String dagId3 = DagManagerUtils.generateDagId(dag3).toString();
int queue1 = DagManagerUtils.getDagQueueId(dag1, dagNumThreads);
int queue2 = DagManagerUtils.getDagQueueId(dag2, dagNumThreads);
@@ -144,7 +173,7 @@ public class DagManagerFlowTest {
void testFlowSlaWithoutConfig() throws Exception {
long flowExecutionId = System.currentTimeMillis();
Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("3", flowExecutionId, "FINISH_RUNNING", 1);
- String dagId = DagManagerUtils.generateDagId(dag);
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
int queue = DagManagerUtils.getDagQueueId(dag, dagNumThreads);
when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow3"), eq("group3"), anyInt()))
@@ -181,7 +210,7 @@ public class DagManagerFlowTest {
void testFlowSlaWithConfig() throws Exception {
long flowExecutionId = System.currentTimeMillis();
Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("4", flowExecutionId, "FINISH_RUNNING", 1);
- String dagId = DagManagerUtils.generateDagId(dag);
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
int queue = DagManagerUtils.getDagQueueId(dag, dagNumThreads);
when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow4"), eq("group4"), anyInt()))
@@ -221,7 +250,7 @@ public class DagManagerFlowTest {
void testOrphanFlowKill() throws Exception {
Long flowExecutionId = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(10);
Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("6", flowExecutionId, "FINISH_RUNNING", 1);
- String dagId = DagManagerUtils.generateDagId(dag);
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
int queue = DagManagerUtils.getDagQueueId(dag, dagNumThreads);
// change config to set a small sla
@@ -296,7 +325,11 @@ class CancelPredicate implements Predicate<Void> {
class MockedDagManager extends DagManager {
public MockedDagManager(Config config, boolean instrumentationEnabled) {
- super(config, createJobStatusRetriever(), instrumentationEnabled);
+ super(config, createJobStatusRetriever(), Optional.absent(), instrumentationEnabled);
+ }
+
+ public MockedDagManager(Config config, Optional<DagActionStore> dagactionStore, boolean instrumentationEnabled) {
+ super(config, createJobStatusRetriever(), dagactionStore, instrumentationEnabled);
}
private static JobStatusRetriever createJobStatusRetriever() {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index d85c708f4..655c4f895 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -18,6 +18,7 @@ package org.apache.gobblin.service.modules.orchestration;
import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -75,8 +76,8 @@ public class DagManagerTest {
private DagManagerMetrics _dagManagerMetrics;
private DagManager.DagManagerThread _dagManagerThread;
private LinkedBlockingQueue<Dag<JobExecutionPlan>> queue;
- private LinkedBlockingQueue<String> cancelQueue;
- private LinkedBlockingQueue<String> resumeQueue;
+ private LinkedBlockingQueue<DagManager.DagId> cancelQueue;
+ private LinkedBlockingQueue<DagManager.DagId> resumeQueue;
private Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag;
private Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs;
private Map<String, Dag<JobExecutionPlan>> dags;
@@ -103,7 +104,8 @@ public class DagManagerTest {
this._gobblinServiceQuotaManager = new InMemoryUserQuotaManager(quotaConfig);
this._dagManagerMetrics = new DagManagerMetrics(metricContext);
this._dagManagerMetrics.activate();
- this._dagManagerThread = new DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, _failedDagStateStore, queue, cancelQueue,
+ this._dagManagerThread = new DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, _failedDagStateStore,
+ Optional.absent(), queue, cancelQueue,
resumeQueue, true, new HashSet<>(), this._dagManagerMetrics, START_SLA_DEFAULT, _gobblinServiceQuotaManager, 0);
Field jobToDagField = DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
@@ -206,7 +208,7 @@ public class DagManagerTest {
String jobName2 = "job2";
Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, "FINISH_RUNNING", true);
- String dagId = DagManagerUtils.generateDagId(dag);
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
//Add a dag to the queue of dags
this.queue.offer(dag);
@@ -287,7 +289,7 @@ public class DagManagerTest {
String jobName2 = "job2";
Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, failureOption, false);
- String dagId = DagManagerUtils.generateDagId(dag);
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
//Add a dag to the queue of dags
this.queue.offer(dag);
@@ -391,7 +393,7 @@ public class DagManagerTest {
String jobName2 = "job2";
Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, "FINISH_RUNNING", true);
- String dagId = DagManagerUtils.generateDagId(dag);
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
//Add a dag to the queue of dags
this.queue.offer(dag);
@@ -443,7 +445,7 @@ public class DagManagerTest {
Assert.assertTrue(this.failedDagIds.contains(dagId));
// Resume dag
- this.resumeQueue.offer(dagId);
+ this.resumeQueue.offer(DagManagerUtils.generateDagId(dag));
// Job2 rerunning
this._dagManagerThread.run();
@@ -470,7 +472,7 @@ public class DagManagerTest {
String jobName2 = "job2";
Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, "FINISH_RUNNING", true);
- String dagId = DagManagerUtils.generateDagId(dag);
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
//Add a dag to the queue of dags
this.queue.offer(dag);
@@ -549,7 +551,7 @@ public class DagManagerTest {
String jobName0 = "job0";
Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, "FINISH_RUNNING", true);
- String dagId = DagManagerUtils.generateDagId(dag);
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
//Add a dag to the queue of dags
this.queue.offer(dag);
@@ -614,7 +616,7 @@ public class DagManagerTest {
String jobName2 = "job2";
Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, "FINISH_RUNNING", true);
- String dagId = DagManagerUtils.generateDagId(dag);
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
//Add a dag to the queue of dags
this.queue.offer(dag);
@@ -664,13 +666,13 @@ public class DagManagerTest {
}
// Cancel job2
- this.cancelQueue.offer(dagId);
+ this.cancelQueue.offer(DagManagerUtils.generateDagId(dag));
this._dagManagerThread.run();
Assert.assertTrue(this.failedDagIds.contains(dagId));
// Resume dag
- this.resumeQueue.offer(dagId);
+ this.resumeQueue.offer(DagManagerUtils.generateDagId(dag));
// Job2 rerunning
this._dagManagerThread.run();
@@ -697,8 +699,8 @@ public class DagManagerTest {
Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, "FINISH_RUNNING", false);
Dag<JobExecutionPlan> dag1 = buildDag(flowGroupId1, flowExecutionId+1, "FINISH_RUNNING", false);
- String dagId = DagManagerUtils.generateDagId(dag);
- String dagId1 = DagManagerUtils.generateDagId(dag1);
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
+ String dagId1 = DagManagerUtils.generateDagId(dag1).toString();
//Add a dag to the queue of dags
@@ -828,7 +830,6 @@ public class DagManagerTest {
JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor);
jobExecutionPlans.add(jobExecutionPlan);
Dag<JobExecutionPlan> dag = (new JobExecutionPlanDagFactory()).createDag(jobExecutionPlans);
- String dagId = DagManagerUtils.generateDagId(dag);
//Add a dag to the queue of dags
this.queue.offer(dag);
@@ -1225,11 +1226,11 @@ public class DagManagerTest {
private final Map<String, Dag<JobExecutionPlan>> dags = new ConcurrentHashMap<>();
public void writeCheckpoint(Dag<JobExecutionPlan> dag) {
- dags.put(DagManagerUtils.generateDagId(dag), dag);
+ dags.put(DagManagerUtils.generateDagId(dag).toString(), dag);
}
public void cleanUp(Dag<JobExecutionPlan> dag) {
- cleanUp(DagManagerUtils.generateDagId(dag));
+ cleanUp(DagManagerUtils.generateDagId(dag).toString());
}
public void cleanUp(String dagId) {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
index 849b8801e..0806d77ca 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
@@ -112,7 +112,7 @@ public class FSDagStateStoreTest {
this._dagStateStore.writeCheckpoint(dag);
Assert.assertTrue(dagFile.exists());
- this._dagStateStore.cleanUp(DagManagerUtils.generateDagId(dag));
+ this._dagStateStore.cleanUp(DagManagerUtils.generateDagId(dag).toString());
Assert.assertFalse(dagFile.exists());
}
@@ -129,7 +129,7 @@ public class FSDagStateStoreTest {
List<Dag<JobExecutionPlan>> dags = this._dagStateStore.getDags();
Assert.assertEquals(dags.size(), 2);
- Dag<JobExecutionPlan> singleDag = this._dagStateStore.getDag(DagManagerUtils.generateDagId(dags.get(0)));
+ Dag<JobExecutionPlan> singleDag = this._dagStateStore.getDag(DagManagerUtils.generateDagId(dags.get(0)).toString());
dags.add(singleDag);
for (Dag<JobExecutionPlan> dag: dags) {
Assert.assertEquals(dag.getNodes().size(), 2);
@@ -145,7 +145,7 @@ public class FSDagStateStoreTest {
Set<String> dagIds = this._dagStateStore.getDagIds();
Assert.assertEquals(dagIds.size(), 2);
for (Dag<JobExecutionPlan> dag: dags) {
- Assert.assertTrue(dagIds.contains(DagManagerUtils.generateDagId(dag)));
+ Assert.assertTrue(dagIds.contains(DagManagerUtils.generateDagId(dag).toString()));
}
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
index 60f0f6ca1..dfbbd6a92 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
@@ -80,15 +80,15 @@ public class MysqlDagStateStoreTest {
_dagStateStore.writeCheckpoint(dag_1);
// Verify get one dag
- Dag<JobExecutionPlan> dag = _dagStateStore.getDag(DagManagerUtils.generateDagId(dag_0));
+ Dag<JobExecutionPlan> dag = _dagStateStore.getDag(DagManagerUtils.generateDagId(dag_0).toString());
Assert.assertEquals(dag.getNodes().get(0), dag_0.getNodes().get(0));
Assert.assertEquals(dag.getNodes().get(1), dag_0.getNodes().get(1));
// Verify get dagIds
Set<String> dagIds = _dagStateStore.getDagIds();
Assert.assertEquals(dagIds.size(), 2);
- Assert.assertTrue(dagIds.contains(DagManagerUtils.generateDagId(dag_0)));
- Assert.assertTrue(dagIds.contains(DagManagerUtils.generateDagId(dag_1)));
+ Assert.assertTrue(dagIds.contains(DagManagerUtils.generateDagId(dag_0).toString()));
+ Assert.assertTrue(dagIds.contains(DagManagerUtils.generateDagId(dag_1).toString()));
// Verify get all dags
List<Dag<JobExecutionPlan>> dags = _dagStateStore.getDags();
@@ -145,7 +145,7 @@ public class MysqlDagStateStoreTest {
Assert.assertEquals(dags.size(), 2);
_dagStateStore.cleanUp(dags.get(0));
- _dagStateStore.cleanUp(DagManagerUtils.generateDagId(dags.get(1)));
+ _dagStateStore.cleanUp(DagManagerUtils.generateDagId(dags.get(1)).toString());
dags = _dagStateStore.getDags();
Assert.assertEquals(dags.size(), 0);