You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/03/29 18:27:54 UTC
incubator-gobblin git commit: [GOBBLIN-446] Add support for migrating
state for all jobs in a job store
Repository: incubator-gobblin
Updated Branches:
refs/heads/master eda77bcb6 -> 7f55214e2
[GOBBLIN-446] Add support for migrating state for all jobs in a job store
Closes #2321 from htran1/state_store_mig_all_jobs
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/7f55214e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/7f55214e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/7f55214e
Branch: refs/heads/master
Commit: 7f55214e20610a2b99652e003af9a7c684da7e49
Parents: eda77bc
Author: Hung Tran <hu...@linkedin.com>
Authored: Thu Mar 29 11:27:20 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Mar 29 11:27:20 2018 -0700
----------------------------------------------------------------------
.../apache/gobblin/metastore/FsStateStore.java | 25 ++++++++
.../gobblin/metastore/MysqlStateStore.java | 59 +++++++++++++++----
.../apache/gobblin/metastore/StateStore.java | 17 +++++-
.../gobblin/metastore/FsStateStoreTest.java | 26 ++++++--
.../apache/gobblin/metastore/ZkStateStore.java | 26 ++++++++
.../gobblin/runtime/ZkDatasetStateStore.java | 15 +++--
.../runtime/ZkDatasetStateStoreTest.java | 61 +++++++++++++++----
.../gobblin/runtime/MysqlDatasetStateStore.java | 16 +++--
.../gobblin/runtime/StateStoreMigrationCli.java | 62 +++++++++++++-------
.../runtime/MysqlDatasetStateStoreTest.java | 54 ++++++++++++++---
10 files changed, 291 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
index 54dbdd7..4dd100d 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
@@ -317,6 +317,31 @@ public class FsStateStore<T extends State> implements StateStore<T> {
return names;
}
+ /**
+ * Get store names in the state store
+ *
+ * @param predicate only returns names matching predicate
+ * @return (possibly empty) list of store names from the given store
+ * @throws IOException
+ */
+ public List<String> getStoreNames(Predicate<String> predicate)
+ throws IOException {
+ List<String> names = Lists.newArrayList();
+
+ Path storeRootPath = new Path(this.storeRootDir);
+ if (!this.fs.exists(storeRootPath)) {
+ return names;
+ }
+
+ for (FileStatus status : this.fs.listStatus(storeRootPath)) {
+ if (predicate.apply(status.getPath().getName())) {
+ names.add(status.getPath().getName());
+ }
+ }
+
+ return names;
+ }
+
@Override
public void createAlias(String storeName, String original, String alias) throws IOException {
Path originalTablePath = new Path(new Path(this.storeRootDir, storeName), original);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index d5ae6ff..169a5e1 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -17,15 +17,6 @@
package org.apache.gobblin.metastore;
-import com.google.common.base.Predicate;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.password.PasswordManager;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.io.StreamUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -45,10 +36,23 @@ import java.util.Collections;
import java.util.List;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
-import javax.sql.DataSource;
+
import org.apache.commons.dbcp.BasicDataSource;
import org.apache.hadoop.io.Text;
+import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.password.PasswordManager;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.io.StreamUtils;
+
+import javax.sql.DataSource;
+
/**
* An implementation of {@link StateStore} backed by MySQL.
*
@@ -87,6 +91,9 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
private static final String SELECT_JOB_STATE_NAMES_TEMPLATE =
"SELECT table_name FROM $TABLE$ WHERE store_name = ?";
+ private static final String SELECT_STORE_NAMES_TEMPLATE =
+ "SELECT distinct store_name FROM $TABLE$";
+
private static final String DELETE_JOB_STORE_TEMPLATE =
"DELETE FROM $TABLE$ WHERE store_name = ?";
@@ -114,6 +121,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
private final String DELETE_JOB_STORE_SQL;
private final String DELETE_JOB_STATE_SQL;
private final String CLONE_JOB_STATE_SQL;
+ private final String SELECT_STORE_NAMES_SQL;
/**
* Manages the persistence and retrieval of {@link State} in a MySQL database
@@ -137,6 +145,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
DELETE_JOB_STORE_SQL = DELETE_JOB_STORE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
DELETE_JOB_STATE_SQL = DELETE_JOB_STATE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
CLONE_JOB_STATE_SQL = CLONE_JOB_STATE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
+ SELECT_STORE_NAMES_SQL = SELECT_STORE_NAMES_TEMPLATE.replace("$TABLE$", stateStoreTableName);
// create table if it does not exist
String createJobTable = CREATE_JOB_STATE_TABLE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
@@ -379,6 +388,36 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
return names;
}
+ /**
+ * Get store names in the state store
+ *
+ * @param predicate only returns names matching predicate
+ * @return (possibly empty) list of store names from the given store
+ * @throws IOException
+ */
+ public List<String> getStoreNames(Predicate<String> predicate)
+ throws IOException {
+ List<String> names = Lists.newArrayList();
+
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement queryStatement = connection.prepareStatement(SELECT_STORE_NAMES_SQL)) {
+
+ try (ResultSet rs = queryStatement.executeQuery()) {
+ while (rs.next()) {
+ String name = rs.getString(1);
+ if (predicate.apply(name)) {
+ names.add(name);
+ }
+ }
+ }
+ } catch (SQLException e) {
+ throw new IOException(String.format("Could not query store names"), e);
+ }
+
+ return names;
+ }
+
+
@Override
public void createAlias(String storeName, String original, String alias) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
index 425914a..0a4cf43 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
@@ -17,12 +17,13 @@
package org.apache.gobblin.metastore;
-import com.google.common.base.Predicate;
-import com.typesafe.config.Config;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import com.google.common.base.Predicate;
+import com.typesafe.config.Config;
+
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.metadata.StateStoreEntryManager;
import org.apache.gobblin.metastore.predicates.StateStorePredicate;
@@ -172,6 +173,18 @@ public interface StateStore<T extends State> {
throws IOException;
/**
+ * Get store names in the state store
+ *
+ * @param predicate only returns names matching predicate
+ * @return (possibly empty) list of store names from the given store
+ * @throws IOException
+ */
+ public default List<String> getStoreNames(Predicate<String> predicate)
+ throws IOException {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ /**
* Create an alias for an existing table.
*
* @param storeName store name
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java
index 21c81ec..70160c8 100644
--- a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java
+++ b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java
@@ -17,13 +17,9 @@
package org.apache.gobblin.metastore;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.util.ClassAliasResolver;
import java.io.IOException;
import java.net.URL;
+import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -34,9 +30,15 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.google.common.base.Predicates;
import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ClassAliasResolver;
/**
@@ -65,6 +67,7 @@ public class FsStateStoreTest {
// cleanup in case files left behind by a prior run
this.stateStore.delete("testStore");
+ this.stateStore.delete("testStore2");
}
@Test
@@ -89,6 +92,9 @@ public class FsStateStoreTest {
Assert.assertFalse(this.stateStore.exists("testStore", "testTable"));
this.stateStore.putAll("testStore", "testTable", states);
Assert.assertTrue(this.stateStore.exists("testStore", "testTable"));
+
+ // for testing of getStoreNames
+ this.stateStore.putAll("testStore2", "testTable", states);
}
@Test(dependsOnMethods = { "testPut" })
@@ -117,6 +123,16 @@ public class FsStateStoreTest {
Assert.assertEquals(states.get(2).getProp("k3"), "v3");
}
+ @Test(dependsOnMethods = { "testGetAlias" })
+ public void testGetStoreNames() throws IOException {
+ List<String> storeNames = this.stateStore.getStoreNames(Predicates.alwaysTrue());
+ Collections.sort(storeNames);
+
+ Assert.assertTrue(storeNames.size() == 2);
+ Assert.assertEquals(storeNames.get(0), "testStore");
+ Assert.assertEquals(storeNames.get(1), "testStore2");
+ }
+
// Disable backwards compatibility change, since we are doing a major version upgrade
// .. and this is related to previous migration.
@Test
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
index 59d0e4f..c09c42a 100644
--- a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
+++ b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
@@ -252,6 +252,32 @@ public class ZkStateStore<T extends State> implements StateStore<T> {
return names;
}
+
+ /**
+ * Get store names in the state store
+ *
+ * @param predicate only returns names matching predicate
+ * @return (possibly empty) list of store names from the given store
+ * @throws IOException
+ */
+ public List<String> getStoreNames(Predicate<String> predicate)
+ throws IOException {
+ List<String> names = Lists.newArrayList();
+ String path = formPath("");
+
+ List<String> children = propStore.getChildNames(path, 0);
+
+ if (children != null) {
+ for (String c : children) {
+ if (predicate.apply(c)) {
+ names.add(c);
+ }
+ }
+ }
+
+ return names;
+ }
+
@Override
public void createAlias(String storeName, String original, String alias) throws IOException {
String pathOriginal = formPath(storeName, original);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
index e9ecf35..db882c0 100644
--- a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
+++ b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
@@ -17,19 +17,22 @@
package org.apache.gobblin.runtime;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.base.CharMatcher;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.metastore.ZkStateStore;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
index 1091cf7..2ee5f1d 100644
--- a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
+++ b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
@@ -17,31 +17,39 @@
package org.apache.gobblin.runtime;
-import org.apache.gobblin.config.ConfigBuilder;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.metastore.DatasetStateStore;
-import org.apache.gobblin.metastore.StateStore;
-import org.apache.gobblin.metastore.ZkStateStore;
-import org.apache.gobblin.metastore.ZkStateStoreConfigurationKeys;
-import org.apache.gobblin.util.ClassAliasResolver;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+
import org.apache.curator.test.TestingServer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.google.common.base.Predicates;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.ZkStateStore;
+import org.apache.gobblin.metastore.ZkStateStoreConfigurationKeys;
+import org.apache.gobblin.util.ClassAliasResolver;
+
/**
* Unit tests for {@link ZkDatasetStateStore}.
**/
@Test(groups = { "gobblin.runtime" })
public class ZkDatasetStateStoreTest {
- private static final String TEST_JOB_NAME = "TestJob";
- private static final String TEST_JOB_ID = "TestJob1";
+ private static final String TEST_JOB_NAME = "TestJobName1";
+ private static final String TEST_JOB_NAME2 = "TestJobName2";
+
+ private static final String TEST_JOB_ID = "TestJobId1";
private static final String TEST_TASK_ID_PREFIX = "TestTask-";
private static final String TEST_DATASET_URN = "TestDataset";
private static final String TEST_DATASET_URN2 = "TestDataset2";
@@ -73,6 +81,8 @@ public class ZkDatasetStateStoreTest {
// clear data that may have been left behind by a prior test run
zkJobStateStore.delete(TEST_JOB_NAME);
zkDatasetStateStore.delete(TEST_JOB_NAME);
+ zkJobStateStore.delete(TEST_JOB_NAME2);
+ zkDatasetStateStore.delete(TEST_JOB_NAME2);
}
@Test
@@ -97,6 +107,12 @@ public class ZkDatasetStateStoreTest {
zkJobStateStore.put(TEST_JOB_NAME,
ZkDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + ZkDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
jobState);
+
+ // second job name for testing getting store names in a later test case
+ jobState.setJobName(TEST_JOB_NAME2);
+ zkJobStateStore.put(TEST_JOB_NAME2,
+ ZkDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + ZkDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+ jobState);
}
@Test(dependsOnMethods = "testPersistJobState")
@@ -150,6 +166,10 @@ public class ZkDatasetStateStoreTest {
datasetState.setDuration(2000);
zkDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, datasetState);
+
+ // second job name for testing getting store names in a later test case
+ datasetState.setJobName(TEST_JOB_NAME2);
+ zkDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, datasetState);
}
@Test(dependsOnMethods = "testPersistDatasetState")
@@ -175,7 +195,24 @@ public class ZkDatasetStateStoreTest {
}
}
- @Test(dependsOnMethods = "testGetDatasetState")
+ @Test(dependsOnMethods = { "testGetDatasetState" })
+ public void testGetStoreNames() throws IOException {
+ List<String> storeNames = this.zkJobStateStore.getStoreNames(Predicates.alwaysTrue());
+ Collections.sort(storeNames);
+
+ Assert.assertTrue(storeNames.size() == 2);
+ Assert.assertEquals(storeNames.get(0), TEST_JOB_NAME);
+ Assert.assertEquals(storeNames.get(1), TEST_JOB_NAME2);
+
+ storeNames = this.zkDatasetStateStore.getStoreNames(Predicates.alwaysTrue());
+ Collections.sort(storeNames);
+
+ Assert.assertTrue(storeNames.size() == 2);
+ Assert.assertEquals(storeNames.get(0), TEST_JOB_NAME);
+ Assert.assertEquals(storeNames.get(1), TEST_JOB_NAME2);
+ }
+
+ @Test(dependsOnMethods = "testGetStoreNames")
public void testGetPreviousDatasetStatesByUrns() throws IOException {
Map<String, JobState.DatasetState> datasetStatesByUrns =
zkDatasetStateStore.getLatestDatasetStatesByUrns(TEST_JOB_NAME);
@@ -240,6 +277,8 @@ public class ZkDatasetStateStoreTest {
public void tearDown() throws IOException {
zkJobStateStore.delete(TEST_JOB_NAME);
zkDatasetStateStore.delete(TEST_JOB_NAME);
+ zkJobStateStore.delete(TEST_JOB_NAME2);
+ zkDatasetStateStore.delete(TEST_JOB_NAME2);
if (testingServer != null) {
testingServer.close();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
index 741ac07..6e3e57a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
@@ -17,19 +17,23 @@
package org.apache.gobblin.runtime;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.base.CharMatcher;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.metastore.MysqlStateStore;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+
import javax.sql.DataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java
index d4cb91e..2c57da9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java
@@ -17,26 +17,33 @@
package org.apache.gobblin.runtime;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
+import java.util.List;
import java.util.Map;
-import lombok.extern.slf4j.Slf4j;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.runtime.cli.CliApplication;
import org.apache.gobblin.runtime.cli.CliObjectFactory;
+import org.apache.gobblin.runtime.cli.CliObjectOption;
import org.apache.gobblin.runtime.cli.CliObjectSupport;
import org.apache.gobblin.runtime.cli.ConstructorAndPublicMethodsCliObjectFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.gobblin.util.ConfigUtils;
+import lombok.extern.slf4j.Slf4j;
import static org.apache.gobblin.configuration.ConfigurationKeys.*;
@@ -53,13 +60,8 @@ public class StateStoreMigrationCli implements CliApplication {
private static final String SOURCE_KEY = "source";
private static final String DESTINATION_KEY = "destination";
private static final String JOB_NAME_KEY = "jobName";
-
- /**
- * Assume that there's no additional '/' in the end of state.store.dir's value
- */
- private String extractStoreName(String dirPath) {
- return dirPath.substring(dirPath.lastIndexOf('/') + 1);
- }
+ private static final String MIGRATE_ALL_JOBS = "migrateAllJobs";
+ private static final String DEFAULT_MIGRATE_ALL_JOBS = "false";
@Override
public void run(String[] args) throws Exception {
@@ -69,26 +71,41 @@ public class StateStoreMigrationCli implements CliApplication {
FileSystem fs = FileSystem.get(new Configuration());
FSDataInputStream inputStream = fs.open(command.path);
Config config = ConfigFactory.parseReader(new InputStreamReader(inputStream, Charset.defaultCharset()));
- String storeName = this.extractStoreName(config.getConfig(SOURCE_KEY).getString(STATE_STORE_ROOT_DIR_KEY));
Preconditions.checkNotNull(config.getObject(SOURCE_KEY));
Preconditions.checkNotNull(config.getObject(DESTINATION_KEY));
- Preconditions.checkNotNull(config.getString(JOB_NAME_KEY));
DatasetStateStore dstDatasetStateStore =
DatasetStateStore.buildDatasetStateStore(config.getConfig(DESTINATION_KEY));
DatasetStateStore srcDatasetStateStore = DatasetStateStore.buildDatasetStateStore(config.getConfig(SOURCE_KEY));
+ Map<String, JobState.DatasetState> map;
+
+ // if migrating state for all jobs then list the store names (job names) and copy the current jst files
+ if (ConfigUtils.getBoolean(config, MIGRATE_ALL_JOBS, Boolean.valueOf(DEFAULT_MIGRATE_ALL_JOBS))) {
+ List<String> jobNames = srcDatasetStateStore.getStoreNames(Predicates.alwaysTrue());
- Map<String, JobState.DatasetState> map =
- srcDatasetStateStore.getLatestDatasetStatesByUrns(config.getString(JOB_NAME_KEY));
+ for (String jobName : jobNames) {
+ migrateStateForJob(srcDatasetStateStore, dstDatasetStateStore, jobName, command.deleteSourceStateStore);
+ }
+ } else {
+ Preconditions.checkNotNull(config.getString(JOB_NAME_KEY));
+ migrateStateForJob(srcDatasetStateStore, dstDatasetStateStore, config.getString(JOB_NAME_KEY),
+ command.deleteSourceStateStore);
+ }
+ }
+
+ private static void migrateStateForJob(DatasetStateStore srcDatasetStateStore, DatasetStateStore dstDatasetStateStore,
+ String jobName, boolean deleteFromSource) throws IOException {
+ Map<String, JobState.DatasetState> map = srcDatasetStateStore.getLatestDatasetStatesByUrns(jobName);
for (Map.Entry<String, JobState.DatasetState> entry : map.entrySet()) {
dstDatasetStateStore.persistDatasetState(entry.getKey(), entry.getValue());
}
- if (command.deleteSourceStateStore) {
+
+ if (deleteFromSource) {
try {
- srcDatasetStateStore.delete(storeName);
+ srcDatasetStateStore.delete(jobName);
} catch (IOException ioe) {
- log.warn("The source state store has been deleted.", ioe);
+ log.warn("The source state store has been deleted", ioe);
}
}
}
@@ -107,6 +124,7 @@ public class StateStoreMigrationCli implements CliApplication {
this.path = new Path(path);
}
+ @CliObjectOption
public void deleteSourceStateStore() {
this.deleteSourceStateStore = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
index 45bb44d..e8ea281 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
@@ -17,6 +17,19 @@
package org.apache.gobblin.runtime;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.dbcp.BasicDataSource;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicates;
+
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
@@ -26,13 +39,6 @@ import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.util.ClassAliasResolver;
-import java.io.IOException;
-import java.util.Map;
-import org.apache.commons.dbcp.BasicDataSource;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
/**
@@ -44,6 +50,7 @@ public class MysqlDatasetStateStoreTest {
private static final String TEST_STATE_STORE = "TestStateStore";
private static final String TEST_JOB_NAME = "TestJob";
private static final String TEST_JOB_NAME_LOWER = "testjob";
+ private static final String TEST_JOB_NAME2 = "TestJob2";
private static final String TEST_JOB_ID = "TestJob1";
private static final String TEST_TASK_ID_PREFIX = "TestTask-";
private static final String TEST_DATASET_URN = "TestDataset";
@@ -87,6 +94,8 @@ public class MysqlDatasetStateStoreTest {
// clear data that may have been left behind by a prior test run
dbJobStateStore.delete(TEST_JOB_NAME);
dbDatasetStateStore.delete(TEST_JOB_NAME);
+ dbJobStateStore.delete(TEST_JOB_NAME2);
+ dbDatasetStateStore.delete(TEST_JOB_NAME2);
}
@Test
@@ -118,6 +127,12 @@ public class MysqlDatasetStateStoreTest {
dbJobStateStore.put(TEST_JOB_NAME_LOWER,
MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
jobState);
+
+ // second job name for testing getting store names in a later test case
+ jobState.setJobName(TEST_JOB_NAME2);
+ dbJobStateStore.put(TEST_JOB_NAME2,
+ MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+ jobState);
}
@Test(dependsOnMethods = "testPersistJobState")
@@ -189,6 +204,10 @@ public class MysqlDatasetStateStoreTest {
datasetState.setDuration(3000);
dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN_LOWER, datasetState);
+
+ // second job name for testing getting store names in a later test case
+ datasetState.setJobName(TEST_JOB_NAME2);
+ dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, datasetState);
}
@Test(dependsOnMethods = "testPersistDatasetState")
@@ -214,7 +233,26 @@ public class MysqlDatasetStateStoreTest {
}
}
- @Test(dependsOnMethods = "testGetDatasetState")
+ @Test(dependsOnMethods = { "testGetDatasetState" })
+ public void testGetStoreNames() throws IOException {
+ List<String> storeNames = this.dbJobStateStore.getStoreNames(Predicates.alwaysTrue());
+ Collections.sort(storeNames);
+
+ Assert.assertTrue(storeNames.size() == 3);
+ Assert.assertEquals(storeNames.get(0), TEST_JOB_NAME);
+ Assert.assertEquals(storeNames.get(1), TEST_JOB_NAME2);
+ Assert.assertEquals(storeNames.get(2), TEST_JOB_NAME_LOWER);
+
+
+ storeNames = this.dbDatasetStateStore.getStoreNames(Predicates.alwaysTrue());
+ Collections.sort(storeNames);
+
+ Assert.assertTrue(storeNames.size() == 2);
+ Assert.assertEquals(storeNames.get(0), TEST_JOB_NAME);
+ Assert.assertEquals(storeNames.get(1), TEST_JOB_NAME2);
+ }
+
+ @Test(dependsOnMethods = "testGetStoreNames")
public void testGetPreviousDatasetStatesByUrns() throws IOException {
Map<String, JobState.DatasetState> datasetStatesByUrns =
dbDatasetStateStore.getLatestDatasetStatesByUrns(TEST_JOB_NAME);