You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2017/10/04 16:25:47 UTC

incubator-gobblin git commit: [GOBBLIN-270] Statestore Migration Script

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 9795cd58d -> 8af87cb78


[GOBBLIN-270] Statestore Migration Script

Closes #2122 from autumnust/statestoremigration


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/8af87cb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/8af87cb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/8af87cb7

Branch: refs/heads/master
Commit: 8af87cb7807749b14640e4cc1feaeec173552d55
Parents: 9795cd5
Author: Lei Sun <au...@gmail.com>
Authored: Wed Oct 4 09:24:43 2017 -0700
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Wed Oct 4 09:24:43 2017 -0700

----------------------------------------------------------------------
 gobblin-core/build.gradle                       |   3 +-
 .../metastore/DatasetStoreDatasetFinder.java    |  10 +-
 .../apache/gobblin/metastore/StateStore.java    |   2 +-
 .../metadata/StateStoreEntryManager.java        |   1 +
 .../src/main/resources/migrationConfig          |  25 ++++
 .../gobblin/runtime/StateStoreMigrationCli.java | 114 +++++++++++++++++++
 6 files changed, 151 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8af87cb7/gobblin-core/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-core/build.gradle b/gobblin-core/build.gradle
index ee7a77c..09eb38b 100644
--- a/gobblin-core/build.gradle
+++ b/gobblin-core/build.gradle
@@ -19,9 +19,10 @@ apply plugin: 'java'
 
 dependencies {
   compile project(":gobblin-api")
-  compile project(":gobblin-core-base")
   compile project(":gobblin-tunnel")
   compile project(":gobblin-utility")
+  compile project(":gobblin-metastore")
+  compile project(":gobblin-core-base")
   compile project(":gobblin-metrics-libs:gobblin-metrics")
   compile project(":gobblin-modules:gobblin-avro-json")
   compile project(":gobblin-modules:gobblin-metadata")

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8af87cb7/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java
index 75d083d..51d03ca 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java
@@ -29,6 +29,7 @@ import org.apache.gobblin.metastore.predicates.DatasetPredicate;
 import org.apache.gobblin.metastore.predicates.StateStorePredicate;
 import org.apache.gobblin.metastore.predicates.StoreNamePredicate;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -54,8 +55,12 @@ public class DatasetStoreDatasetFinder implements DatasetsFinder<DatasetStoreDat
     this.predicate = buildPredicate();
   }
 
+  public DatasetStoreDatasetFinder(Properties props) throws IOException {
+    this(FileSystem.get(new Configuration()), props);
+  }
+
   private StateStorePredicate buildPredicate() {
-    StateStorePredicate predicate= null;
+    StateStorePredicate predicate = null;
     String storeName = null;
     String datasetUrn;
 
@@ -66,7 +71,8 @@ public class DatasetStoreDatasetFinder implements DatasetsFinder<DatasetStoreDat
 
     if (ConfigUtils.hasNonEmptyPath(this.config, DATASET_URN_FILTER)) {
       if (storeName == null) {
-        throw new IllegalArgumentException(DATASET_URN_FILTER + " requires " + STORE_NAME_FILTER + " to also be defined.");
+        throw new IllegalArgumentException(
+            DATASET_URN_FILTER + " requires " + STORE_NAME_FILTER + " to also be defined.");
       }
       datasetUrn = this.config.getString(DATASET_URN_FILTER);
       predicate = new DatasetPredicate(storeName, datasetUrn, x -> true);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8af87cb7/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
index 46c2aa8..425914a 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
@@ -202,7 +202,7 @@ public interface StateStore<T extends State> {
       throws IOException;
 
   /**
-   * Gets metadata for all tables matching the input
+   * Gets entry managers for all tables matching the input
    * @param predicate Predicate used to filter tables. To allow state stores to push down predicates, use native extensions
    *                  of {@link StateStorePredicate}.
    * @return A list of all {@link StateStoreEntryManager}s matching the predicate.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8af87cb7/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
index c4c7796..33d404b 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
@@ -28,6 +28,7 @@ import lombok.Data;
 
 /**
  * Contains metadata about an entry in a {@link StateStore}.
+ * Exposes access to the {@link StateStore} that contains the entry.
  * @param <T> type of {@link State} that can be read from this entry.
  */
 @Data

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8af87cb7/gobblin-metastore/src/main/resources/migrationConfig
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/resources/migrationConfig b/gobblin-metastore/src/main/resources/migrationConfig
new file mode 100644
index 0000000..c238980
--- /dev/null
+++ b/gobblin-metastore/src/main/resources/migrationConfig
@@ -0,0 +1,25 @@
+# A simple template of configuration for migration of state store.
+
+source: {
+	state.store.type:fs
+
+	#Path up to store name. Don't add table name.
+	state.store.dir: ""
+}
+
+
+destination: {
+	state.store.db.user :
+	state.store.db.table :
+	state.store.type : org.apache.gobblin.runtime.MysqlDatasetStateStoreFactory
+	state.store.db.url : ""
+	state.store.db.password : ""
+
+	#Required
+	encrypt.key.loc: ""
+}
+
+
+keepOldState:true
+
+jobName:TestJob
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8af87cb7/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java
new file mode 100644
index 0000000..d4cb91e
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.runtime.cli.CliApplication;
+import org.apache.gobblin.runtime.cli.CliObjectFactory;
+import org.apache.gobblin.runtime.cli.CliObjectSupport;
+import org.apache.gobblin.runtime.cli.ConstructorAndPublicMethodsCliObjectFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.gobblin.configuration.ConfigurationKeys.*;
+
+
+/**
+ * A script used for state store migration:
+ * In the case that users are willing to change the storage medium of job state due to some reasons.
+ *
+ * Current implementation doesn't support data awareness on either source or target side.
+ * And only migrate a single job state instead of migrating all history versions.
+ */
+@Slf4j
+@Alias(value = "stateMigration", description = "Command line tools for migrating state store")
+public class StateStoreMigrationCli implements CliApplication {
+  private static final String SOURCE_KEY = "source";
+  private static final String DESTINATION_KEY = "destination";
+  private static final String JOB_NAME_KEY = "jobName";
+
+  /**
+   * Assume that there's no additional '/' in the end of state.store.dir's value
+   */
+  private String extractStoreName(String dirPath) {
+    return dirPath.substring(dirPath.lastIndexOf('/') + 1);
+  }
+
+  @Override
+  public void run(String[] args) throws Exception {
+    CliObjectFactory<Command> factory = new ConstructorAndPublicMethodsCliObjectFactory<>(Command.class);
+    Command command = factory.buildObject(args, 1, true, args[0]);
+
+    FileSystem fs = FileSystem.get(new Configuration());
+    FSDataInputStream inputStream = fs.open(command.path);
+    Config config = ConfigFactory.parseReader(new InputStreamReader(inputStream, Charset.defaultCharset()));
+    String storeName = this.extractStoreName(config.getConfig(SOURCE_KEY).getString(STATE_STORE_ROOT_DIR_KEY));
+
+    Preconditions.checkNotNull(config.getObject(SOURCE_KEY));
+    Preconditions.checkNotNull(config.getObject(DESTINATION_KEY));
+    Preconditions.checkNotNull(config.getString(JOB_NAME_KEY));
+
+    DatasetStateStore dstDatasetStateStore =
+        DatasetStateStore.buildDatasetStateStore(config.getConfig(DESTINATION_KEY));
+    DatasetStateStore srcDatasetStateStore = DatasetStateStore.buildDatasetStateStore(config.getConfig(SOURCE_KEY));
+
+    Map<String, JobState.DatasetState> map =
+        srcDatasetStateStore.getLatestDatasetStatesByUrns(config.getString(JOB_NAME_KEY));
+    for (Map.Entry<String, JobState.DatasetState> entry : map.entrySet()) {
+      dstDatasetStateStore.persistDatasetState(entry.getKey(), entry.getValue());
+    }
+    if (command.deleteSourceStateStore) {
+      try {
+        srcDatasetStateStore.delete(storeName);
+      } catch (IOException ioe) {
+        log.warn("The source state store has been deleted.", ioe);
+      }
+    }
+  }
+
+  /**
+   * This class has to been public static for being accessed by
+   * {@link ConstructorAndPublicMethodsCliObjectFactory#inferConstructorOptions}
+   */
+  public static class Command {
+
+    private final Path path;
+    private boolean deleteSourceStateStore = false;
+
+    @CliObjectSupport(argumentNames = "configPath")
+    public Command(String path) throws URISyntaxException, IOException {
+      this.path = new Path(path);
+    }
+
+    public void deleteSourceStateStore() {
+      this.deleteSourceStateStore = true;
+    }
+  }
+}