You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/03/29 18:27:54 UTC

incubator-gobblin git commit: [GOBBLIN-446] Add support for migrating state for all jobs in a job store

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master eda77bcb6 -> 7f55214e2


[GOBBLIN-446] Add support for migrating state for all jobs in a job store

Closes #2321 from htran1/state_store_mig_all_jobs


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/7f55214e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/7f55214e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/7f55214e

Branch: refs/heads/master
Commit: 7f55214e20610a2b99652e003af9a7c684da7e49
Parents: eda77bc
Author: Hung Tran <hu...@linkedin.com>
Authored: Thu Mar 29 11:27:20 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Mar 29 11:27:20 2018 -0700

----------------------------------------------------------------------
 .../apache/gobblin/metastore/FsStateStore.java  | 25 ++++++++
 .../gobblin/metastore/MysqlStateStore.java      | 59 +++++++++++++++----
 .../apache/gobblin/metastore/StateStore.java    | 17 +++++-
 .../gobblin/metastore/FsStateStoreTest.java     | 26 ++++++--
 .../apache/gobblin/metastore/ZkStateStore.java  | 26 ++++++++
 .../gobblin/runtime/ZkDatasetStateStore.java    | 15 +++--
 .../runtime/ZkDatasetStateStoreTest.java        | 61 +++++++++++++++----
 .../gobblin/runtime/MysqlDatasetStateStore.java | 16 +++--
 .../gobblin/runtime/StateStoreMigrationCli.java | 62 +++++++++++++-------
 .../runtime/MysqlDatasetStateStoreTest.java     | 54 ++++++++++++++---
 10 files changed, 291 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
index 54dbdd7..4dd100d 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
@@ -317,6 +317,31 @@ public class FsStateStore<T extends State> implements StateStore<T> {
     return names;
   }
 
+  /**
+   * Get store names in the state store
+   *
+   * @param predicate only returns names matching predicate
+   * @return (possibly empty) list of store names from the given store
+   * @throws IOException
+   */
+  public List<String> getStoreNames(Predicate<String> predicate)
+      throws IOException {
+    List<String> names = Lists.newArrayList();
+
+    Path storeRootPath = new Path(this.storeRootDir);
+    if (!this.fs.exists(storeRootPath)) {
+      return names;
+    }
+
+    for (FileStatus status : this.fs.listStatus(storeRootPath)) {
+      if (predicate.apply(status.getPath().getName())) {
+        names.add(status.getPath().getName());
+      }
+    }
+
+    return names;
+  }
+
   @Override
   public void createAlias(String storeName, String original, String alias) throws IOException {
     Path originalTablePath = new Path(new Path(this.storeRootDir, storeName), original);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index d5ae6ff..169a5e1 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -17,15 +17,6 @@
 
 package org.apache.gobblin.metastore;
 
-import com.google.common.base.Predicate;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.password.PasswordManager;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.io.StreamUtils;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -45,10 +36,23 @@ import java.util.Collections;
 import java.util.List;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
-import javax.sql.DataSource;
+
 import org.apache.commons.dbcp.BasicDataSource;
 import org.apache.hadoop.io.Text;
 
+import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.password.PasswordManager;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.io.StreamUtils;
+
+import javax.sql.DataSource;
+
 /**
  * An implementation of {@link StateStore} backed by MySQL.
  *
@@ -87,6 +91,9 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
   private static final String SELECT_JOB_STATE_NAMES_TEMPLATE =
       "SELECT table_name FROM $TABLE$ WHERE store_name = ?";
 
+  private static final String SELECT_STORE_NAMES_TEMPLATE =
+      "SELECT distinct store_name FROM $TABLE$";
+
   private static final String DELETE_JOB_STORE_TEMPLATE =
       "DELETE FROM $TABLE$ WHERE store_name = ?";
 
@@ -114,6 +121,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
   private final String DELETE_JOB_STORE_SQL;
   private final String DELETE_JOB_STATE_SQL;
   private final String CLONE_JOB_STATE_SQL;
+  private final String SELECT_STORE_NAMES_SQL;
 
   /**
    * Manages the persistence and retrieval of {@link State} in a MySQL database
@@ -137,6 +145,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
     DELETE_JOB_STORE_SQL = DELETE_JOB_STORE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
     DELETE_JOB_STATE_SQL = DELETE_JOB_STATE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
     CLONE_JOB_STATE_SQL = CLONE_JOB_STATE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
+    SELECT_STORE_NAMES_SQL = SELECT_STORE_NAMES_TEMPLATE.replace("$TABLE$", stateStoreTableName);
 
     // create table if it does not exist
     String createJobTable = CREATE_JOB_STATE_TABLE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
@@ -379,6 +388,36 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
     return names;
   }
 
+  /**
+   * Get store names in the state store
+   *
+   * @param predicate only returns names matching predicate
+   * @return (possibly empty) list of store names from the given store
+   * @throws IOException
+   */
+  public List<String> getStoreNames(Predicate<String> predicate)
+      throws IOException {
+    List<String> names = Lists.newArrayList();
+
+    try (Connection connection = dataSource.getConnection();
+        PreparedStatement queryStatement = connection.prepareStatement(SELECT_STORE_NAMES_SQL)) {
+
+      try (ResultSet rs = queryStatement.executeQuery()) {
+        while (rs.next()) {
+          String name = rs.getString(1);
+          if (predicate.apply(name)) {
+            names.add(name);
+          }
+        }
+      }
+    } catch (SQLException e) {
+      throw new IOException(String.format("Could not query store names"), e);
+    }
+
+    return names;
+  }
+
+
   @Override
   public void createAlias(String storeName, String original, String alias) throws IOException {
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
index 425914a..0a4cf43 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
@@ -17,12 +17,13 @@
 
 package org.apache.gobblin.metastore;
 
-import com.google.common.base.Predicate;
-import com.typesafe.config.Config;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 
+import com.google.common.base.Predicate;
+import com.typesafe.config.Config;
+
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.metadata.StateStoreEntryManager;
 import org.apache.gobblin.metastore.predicates.StateStorePredicate;
@@ -172,6 +173,18 @@ public interface StateStore<T extends State> {
       throws IOException;
 
   /**
+   * Get store names in the state store
+   *
+   * @param predicate only returns names matching predicate
+   * @return (possibly empty) list of store names from the given store
+   * @throws IOException
+   */
+  public default List<String> getStoreNames(Predicate<String> predicate)
+      throws IOException {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  /**
    * Create an alias for an existing table.
    *
    * @param storeName store name

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java
index 21c81ec..70160c8 100644
--- a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java
+++ b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java
@@ -17,13 +17,9 @@
 
 package org.apache.gobblin.metastore;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.util.ClassAliasResolver;
 import java.io.IOException;
 import java.net.URL;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -34,9 +30,15 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Predicates;
 import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ClassAliasResolver;
 
 
 /**
@@ -65,6 +67,7 @@ public class FsStateStoreTest {
 
     // cleanup in case files left behind by a prior run
     this.stateStore.delete("testStore");
+    this.stateStore.delete("testStore2");
   }
 
   @Test
@@ -89,6 +92,9 @@ public class FsStateStoreTest {
     Assert.assertFalse(this.stateStore.exists("testStore", "testTable"));
     this.stateStore.putAll("testStore", "testTable", states);
     Assert.assertTrue(this.stateStore.exists("testStore", "testTable"));
+
+    // for testing of getStoreNames
+    this.stateStore.putAll("testStore2", "testTable", states);
   }
 
   @Test(dependsOnMethods = { "testPut" })
@@ -117,6 +123,16 @@ public class FsStateStoreTest {
     Assert.assertEquals(states.get(2).getProp("k3"), "v3");
   }
 
+  @Test(dependsOnMethods = { "testGetAlias" })
+  public void testGetStoreNames() throws IOException {
+    List<String> storeNames = this.stateStore.getStoreNames(Predicates.alwaysTrue());
+    Collections.sort(storeNames);
+
+    Assert.assertTrue(storeNames.size() == 2);
+    Assert.assertEquals(storeNames.get(0), "testStore");
+    Assert.assertEquals(storeNames.get(1), "testStore2");
+  }
+
 //  Disable backwards compatibility change, since we are doing a major version upgrade
 //  .. and this is related to previous migration.
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
index 59d0e4f..c09c42a 100644
--- a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
+++ b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
@@ -252,6 +252,32 @@ public class ZkStateStore<T extends State> implements StateStore<T> {
 
     return names;
   }
+
+  /**
+   * Get store names in the state store
+   *
+   * @param predicate only returns names matching predicate
+   * @return (possibly empty) list of store names from the given store
+   * @throws IOException
+   */
+  public List<String> getStoreNames(Predicate<String> predicate)
+      throws IOException {
+    List<String> names = Lists.newArrayList();
+    String path = formPath("");
+
+    List<String> children = propStore.getChildNames(path, 0);
+
+    if (children != null) {
+      for (String c : children) {
+        if (predicate.apply(c)) {
+          names.add(c);
+        }
+      }
+    }
+
+    return names;
+  }
+
   @Override
   public void createAlias(String storeName, String original, String alias) throws IOException {
     String pathOriginal = formPath(storeName, original);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
index e9ecf35..db882c0 100644
--- a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
+++ b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
@@ -17,19 +17,22 @@
 
 package org.apache.gobblin.runtime;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.CharMatcher;
 import com.google.common.base.Predicate;
 import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.DatasetStateStore;
 import org.apache.gobblin.metastore.ZkStateStore;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
index 1091cf7..2ee5f1d 100644
--- a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
+++ b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
@@ -17,31 +17,39 @@
 
 package org.apache.gobblin.runtime;
 
-import org.apache.gobblin.config.ConfigBuilder;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.metastore.DatasetStateStore;
-import org.apache.gobblin.metastore.StateStore;
-import org.apache.gobblin.metastore.ZkStateStore;
-import org.apache.gobblin.metastore.ZkStateStoreConfigurationKeys;
-import org.apache.gobblin.util.ClassAliasResolver;
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+
 import org.apache.curator.test.TestingServer;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Predicates;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.ZkStateStore;
+import org.apache.gobblin.metastore.ZkStateStoreConfigurationKeys;
+import org.apache.gobblin.util.ClassAliasResolver;
+
 
 /**
  * Unit tests for {@link ZkDatasetStateStore}.
  **/
 @Test(groups = { "gobblin.runtime" })
 public class ZkDatasetStateStoreTest {
-  private static final String TEST_JOB_NAME = "TestJob";
-  private static final String TEST_JOB_ID = "TestJob1";
+  private static final String TEST_JOB_NAME = "TestJobName1";
+  private static final String TEST_JOB_NAME2 = "TestJobName2";
+
+  private static final String TEST_JOB_ID = "TestJobId1";
   private static final String TEST_TASK_ID_PREFIX = "TestTask-";
   private static final String TEST_DATASET_URN = "TestDataset";
   private static final String TEST_DATASET_URN2 = "TestDataset2";
@@ -73,6 +81,8 @@ public class ZkDatasetStateStoreTest {
     // clear data that may have been left behind by a prior test run
     zkJobStateStore.delete(TEST_JOB_NAME);
     zkDatasetStateStore.delete(TEST_JOB_NAME);
+    zkJobStateStore.delete(TEST_JOB_NAME2);
+    zkDatasetStateStore.delete(TEST_JOB_NAME2);
   }
 
   @Test
@@ -97,6 +107,12 @@ public class ZkDatasetStateStoreTest {
     zkJobStateStore.put(TEST_JOB_NAME,
         ZkDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + ZkDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
         jobState);
+
+    // second job name for testing getting store names in a later test case
+    jobState.setJobName(TEST_JOB_NAME2);
+    zkJobStateStore.put(TEST_JOB_NAME2,
+        ZkDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + ZkDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+        jobState);
   }
 
   @Test(dependsOnMethods = "testPersistJobState")
@@ -150,6 +166,10 @@ public class ZkDatasetStateStoreTest {
     datasetState.setDuration(2000);
 
     zkDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, datasetState);
+
+    // second job name for testing getting store names in a later test case
+    datasetState.setJobName(TEST_JOB_NAME2);
+    zkDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, datasetState);
   }
 
   @Test(dependsOnMethods = "testPersistDatasetState")
@@ -175,7 +195,24 @@ public class ZkDatasetStateStoreTest {
     }
   }
 
-  @Test(dependsOnMethods = "testGetDatasetState")
+  @Test(dependsOnMethods = { "testGetDatasetState" })
+  public void testGetStoreNames() throws IOException {
+    List<String> storeNames = this.zkJobStateStore.getStoreNames(Predicates.alwaysTrue());
+    Collections.sort(storeNames);
+
+    Assert.assertTrue(storeNames.size() == 2);
+    Assert.assertEquals(storeNames.get(0), TEST_JOB_NAME);
+    Assert.assertEquals(storeNames.get(1), TEST_JOB_NAME2);
+
+    storeNames = this.zkDatasetStateStore.getStoreNames(Predicates.alwaysTrue());
+    Collections.sort(storeNames);
+
+    Assert.assertTrue(storeNames.size() == 2);
+    Assert.assertEquals(storeNames.get(0), TEST_JOB_NAME);
+    Assert.assertEquals(storeNames.get(1), TEST_JOB_NAME2);
+  }
+
+  @Test(dependsOnMethods = "testGetStoreNames")
   public void testGetPreviousDatasetStatesByUrns() throws IOException {
     Map<String, JobState.DatasetState> datasetStatesByUrns =
         zkDatasetStateStore.getLatestDatasetStatesByUrns(TEST_JOB_NAME);
@@ -240,6 +277,8 @@ public class ZkDatasetStateStoreTest {
   public void tearDown() throws IOException {
     zkJobStateStore.delete(TEST_JOB_NAME);
     zkDatasetStateStore.delete(TEST_JOB_NAME);
+    zkJobStateStore.delete(TEST_JOB_NAME2);
+    zkDatasetStateStore.delete(TEST_JOB_NAME2);
 
     if (testingServer != null) {
       testingServer.close();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
index 741ac07..6e3e57a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
@@ -17,19 +17,23 @@
 
 package org.apache.gobblin.runtime;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.CharMatcher;
 import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.DatasetStateStore;
 import org.apache.gobblin.metastore.MysqlStateStore;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+
 import javax.sql.DataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java
index d4cb91e..2c57da9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java
@@ -17,26 +17,33 @@
 
 package org.apache.gobblin.runtime;
 
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.net.URISyntaxException;
 import java.nio.charset.Charset;
+import java.util.List;
 import java.util.Map;
-import lombok.extern.slf4j.Slf4j;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
 import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.metastore.DatasetStateStore;
 import org.apache.gobblin.runtime.cli.CliApplication;
 import org.apache.gobblin.runtime.cli.CliObjectFactory;
+import org.apache.gobblin.runtime.cli.CliObjectOption;
 import org.apache.gobblin.runtime.cli.CliObjectSupport;
 import org.apache.gobblin.runtime.cli.ConstructorAndPublicMethodsCliObjectFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.gobblin.util.ConfigUtils;
 
+import lombok.extern.slf4j.Slf4j;
 import static org.apache.gobblin.configuration.ConfigurationKeys.*;
 
 
@@ -53,13 +60,8 @@ public class StateStoreMigrationCli implements CliApplication {
   private static final String SOURCE_KEY = "source";
   private static final String DESTINATION_KEY = "destination";
   private static final String JOB_NAME_KEY = "jobName";
-
-  /**
-   * Assume that there's no additional '/' in the end of state.store.dir's value
-   */
-  private String extractStoreName(String dirPath) {
-    return dirPath.substring(dirPath.lastIndexOf('/') + 1);
-  }
+  private static final String MIGRATE_ALL_JOBS = "migrateAllJobs";
+  private static final String DEFAULT_MIGRATE_ALL_JOBS = "false";
 
   @Override
   public void run(String[] args) throws Exception {
@@ -69,26 +71,41 @@ public class StateStoreMigrationCli implements CliApplication {
     FileSystem fs = FileSystem.get(new Configuration());
     FSDataInputStream inputStream = fs.open(command.path);
     Config config = ConfigFactory.parseReader(new InputStreamReader(inputStream, Charset.defaultCharset()));
-    String storeName = this.extractStoreName(config.getConfig(SOURCE_KEY).getString(STATE_STORE_ROOT_DIR_KEY));
 
     Preconditions.checkNotNull(config.getObject(SOURCE_KEY));
     Preconditions.checkNotNull(config.getObject(DESTINATION_KEY));
-    Preconditions.checkNotNull(config.getString(JOB_NAME_KEY));
 
     DatasetStateStore dstDatasetStateStore =
         DatasetStateStore.buildDatasetStateStore(config.getConfig(DESTINATION_KEY));
     DatasetStateStore srcDatasetStateStore = DatasetStateStore.buildDatasetStateStore(config.getConfig(SOURCE_KEY));
+    Map<String, JobState.DatasetState> map;
+
+    // if migrating state for all jobs then list the store names (job names) and copy the current jst files
+    if (ConfigUtils.getBoolean(config, MIGRATE_ALL_JOBS, Boolean.valueOf(DEFAULT_MIGRATE_ALL_JOBS))) {
+      List<String> jobNames = srcDatasetStateStore.getStoreNames(Predicates.alwaysTrue());
 
-    Map<String, JobState.DatasetState> map =
-        srcDatasetStateStore.getLatestDatasetStatesByUrns(config.getString(JOB_NAME_KEY));
+      for (String jobName : jobNames) {
+        migrateStateForJob(srcDatasetStateStore, dstDatasetStateStore, jobName, command.deleteSourceStateStore);
+      }
+    } else {
+      Preconditions.checkNotNull(config.getString(JOB_NAME_KEY));
+      migrateStateForJob(srcDatasetStateStore, dstDatasetStateStore, config.getString(JOB_NAME_KEY),
+          command.deleteSourceStateStore);
+    }
+  }
+
+  private static void migrateStateForJob(DatasetStateStore srcDatasetStateStore, DatasetStateStore dstDatasetStateStore,
+      String jobName, boolean deleteFromSource) throws IOException {
+    Map<String, JobState.DatasetState> map = srcDatasetStateStore.getLatestDatasetStatesByUrns(jobName);
     for (Map.Entry<String, JobState.DatasetState> entry : map.entrySet()) {
       dstDatasetStateStore.persistDatasetState(entry.getKey(), entry.getValue());
     }
-    if (command.deleteSourceStateStore) {
+
+    if (deleteFromSource) {
       try {
-        srcDatasetStateStore.delete(storeName);
+        srcDatasetStateStore.delete(jobName);
       } catch (IOException ioe) {
-        log.warn("The source state store has been deleted.", ioe);
+        log.warn("The source state store has been deleted", ioe);
       }
     }
   }
@@ -107,6 +124,7 @@ public class StateStoreMigrationCli implements CliApplication {
       this.path = new Path(path);
     }
 
+    @CliObjectOption
     public void deleteSourceStateStore() {
       this.deleteSourceStateStore = true;
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
index 45bb44d..e8ea281 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
@@ -17,6 +17,19 @@
 
 package org.apache.gobblin.runtime;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.dbcp.BasicDataSource;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicates;
+
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
@@ -26,13 +39,6 @@ import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.util.ClassAliasResolver;
-import java.io.IOException;
-import java.util.Map;
-import org.apache.commons.dbcp.BasicDataSource;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
 
 
 /**
@@ -44,6 +50,7 @@ public class MysqlDatasetStateStoreTest {
   private static final String TEST_STATE_STORE = "TestStateStore";
   private static final String TEST_JOB_NAME = "TestJob";
   private static final String TEST_JOB_NAME_LOWER = "testjob";
+  private static final String TEST_JOB_NAME2 = "TestJob2";
   private static final String TEST_JOB_ID = "TestJob1";
   private static final String TEST_TASK_ID_PREFIX = "TestTask-";
   private static final String TEST_DATASET_URN = "TestDataset";
@@ -87,6 +94,8 @@ public class MysqlDatasetStateStoreTest {
     // clear data that may have been left behind by a prior test run
     dbJobStateStore.delete(TEST_JOB_NAME);
     dbDatasetStateStore.delete(TEST_JOB_NAME);
+    dbJobStateStore.delete(TEST_JOB_NAME2);
+    dbDatasetStateStore.delete(TEST_JOB_NAME2);
   }
 
   @Test
@@ -118,6 +127,12 @@ public class MysqlDatasetStateStoreTest {
     dbJobStateStore.put(TEST_JOB_NAME_LOWER,
         MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
         jobState);
+
+    // second job name for testing getting store names in a later test case
+    jobState.setJobName(TEST_JOB_NAME2);
+    dbJobStateStore.put(TEST_JOB_NAME2,
+        MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+        jobState);
   }
 
   @Test(dependsOnMethods = "testPersistJobState")
@@ -189,6 +204,10 @@ public class MysqlDatasetStateStoreTest {
     datasetState.setDuration(3000);
 
     dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN_LOWER, datasetState);
+
+    // second job name for testing getting store names in a later test case
+    datasetState.setJobName(TEST_JOB_NAME2);
+    dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, datasetState);
   }
 
   @Test(dependsOnMethods = "testPersistDatasetState")
@@ -214,7 +233,26 @@ public class MysqlDatasetStateStoreTest {
     }
   }
 
-  @Test(dependsOnMethods = "testGetDatasetState")
+  @Test(dependsOnMethods = { "testGetDatasetState" })
+  public void testGetStoreNames() throws IOException {
+    List<String> storeNames = this.dbJobStateStore.getStoreNames(Predicates.alwaysTrue());
+    Collections.sort(storeNames);
+
+    Assert.assertTrue(storeNames.size() == 3);
+    Assert.assertEquals(storeNames.get(0), TEST_JOB_NAME);
+    Assert.assertEquals(storeNames.get(1), TEST_JOB_NAME2);
+    Assert.assertEquals(storeNames.get(2), TEST_JOB_NAME_LOWER);
+
+
+    storeNames = this.dbDatasetStateStore.getStoreNames(Predicates.alwaysTrue());
+    Collections.sort(storeNames);
+
+    Assert.assertTrue(storeNames.size() == 2);
+    Assert.assertEquals(storeNames.get(0), TEST_JOB_NAME);
+    Assert.assertEquals(storeNames.get(1), TEST_JOB_NAME2);
+  }
+
+  @Test(dependsOnMethods = "testGetStoreNames")
   public void testGetPreviousDatasetStatesByUrns() throws IOException {
     Map<String, JobState.DatasetState> datasetStatesByUrns =
         dbDatasetStateStore.getLatestDatasetStatesByUrns(TEST_JOB_NAME);