You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2017/10/04 16:25:47 UTC
incubator-gobblin git commit: [GOBBLIN-270] Statestore Migration
Script
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 9795cd58d -> 8af87cb78
[GOBBLIN-270] Statestore Migration Script
Closes #2122 from autumnust/statestoremigration
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/8af87cb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/8af87cb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/8af87cb7
Branch: refs/heads/master
Commit: 8af87cb7807749b14640e4cc1feaeec173552d55
Parents: 9795cd5
Author: Lei Sun <au...@gmail.com>
Authored: Wed Oct 4 09:24:43 2017 -0700
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Wed Oct 4 09:24:43 2017 -0700
----------------------------------------------------------------------
gobblin-core/build.gradle | 3 +-
.../metastore/DatasetStoreDatasetFinder.java | 10 +-
.../apache/gobblin/metastore/StateStore.java | 2 +-
.../metadata/StateStoreEntryManager.java | 1 +
.../src/main/resources/migrationConfig | 25 ++++
.../gobblin/runtime/StateStoreMigrationCli.java | 114 +++++++++++++++++++
6 files changed, 151 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8af87cb7/gobblin-core/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-core/build.gradle b/gobblin-core/build.gradle
index ee7a77c..09eb38b 100644
--- a/gobblin-core/build.gradle
+++ b/gobblin-core/build.gradle
@@ -19,9 +19,10 @@ apply plugin: 'java'
dependencies {
compile project(":gobblin-api")
- compile project(":gobblin-core-base")
compile project(":gobblin-tunnel")
compile project(":gobblin-utility")
+ compile project(":gobblin-metastore")
+ compile project(":gobblin-core-base")
compile project(":gobblin-metrics-libs:gobblin-metrics")
compile project(":gobblin-modules:gobblin-avro-json")
compile project(":gobblin-modules:gobblin-metadata")
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8af87cb7/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java
index 75d083d..51d03ca 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java
@@ -29,6 +29,7 @@ import org.apache.gobblin.metastore.predicates.DatasetPredicate;
import org.apache.gobblin.metastore.predicates.StateStorePredicate;
import org.apache.gobblin.metastore.predicates.StoreNamePredicate;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -54,8 +55,12 @@ public class DatasetStoreDatasetFinder implements DatasetsFinder<DatasetStoreDat
this.predicate = buildPredicate();
}
+ public DatasetStoreDatasetFinder(Properties props) throws IOException {
+ this(FileSystem.get(new Configuration()), props);
+ }
+
private StateStorePredicate buildPredicate() {
- StateStorePredicate predicate= null;
+ StateStorePredicate predicate = null;
String storeName = null;
String datasetUrn;
@@ -66,7 +71,8 @@ public class DatasetStoreDatasetFinder implements DatasetsFinder<DatasetStoreDat
if (ConfigUtils.hasNonEmptyPath(this.config, DATASET_URN_FILTER)) {
if (storeName == null) {
- throw new IllegalArgumentException(DATASET_URN_FILTER + " requires " + STORE_NAME_FILTER + " to also be defined.");
+ throw new IllegalArgumentException(
+ DATASET_URN_FILTER + " requires " + STORE_NAME_FILTER + " to also be defined.");
}
datasetUrn = this.config.getString(DATASET_URN_FILTER);
predicate = new DatasetPredicate(storeName, datasetUrn, x -> true);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8af87cb7/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
index 46c2aa8..425914a 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
@@ -202,7 +202,7 @@ public interface StateStore<T extends State> {
throws IOException;
/**
- * Gets metadata for all tables matching the input
+ * Gets entry managers for all tables matching the input
* @param predicate Predicate used to filter tables. To allow state stores to push down predicates, use native extensions
* of {@link StateStorePredicate}.
* @return A list of all {@link StateStoreEntryManager}s matching the predicate.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8af87cb7/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
index c4c7796..33d404b 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
@@ -28,6 +28,7 @@ import lombok.Data;
/**
* Contains metadata about an entry in a {@link StateStore}.
+ * Exposes access to the {@link StateStore} that contains the entry.
* @param <T> type of {@link State} that can be read from this entry.
*/
@Data
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8af87cb7/gobblin-metastore/src/main/resources/migrationConfig
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/resources/migrationConfig b/gobblin-metastore/src/main/resources/migrationConfig
new file mode 100644
index 0000000..c238980
--- /dev/null
+++ b/gobblin-metastore/src/main/resources/migrationConfig
@@ -0,0 +1,25 @@
+# A simple template of configuration for migration of state store.
+
+source: {
+ state.store.type:fs
+
+ #Path up to store name. Don't add table name.
+ state.store.dir: ""
+}
+
+
+destination: {
+ state.store.db.user :
+ state.store.db.table :
+ state.store.type : org.apache.gobblin.runtime.MysqlDatasetStateStoreFactory
+ state.store.db.url : ""
+ state.store.db.password : ""
+
+ #Required
+ encrypt.key.loc: ""
+}
+
+
+keepOldState:true
+
+jobName:TestJob
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8af87cb7/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java
new file mode 100644
index 0000000..d4cb91e
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.runtime.cli.CliApplication;
+import org.apache.gobblin.runtime.cli.CliObjectFactory;
+import org.apache.gobblin.runtime.cli.CliObjectSupport;
+import org.apache.gobblin.runtime.cli.ConstructorAndPublicMethodsCliObjectFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.gobblin.configuration.ConfigurationKeys.*;
+
+
+/**
+ * A script used for state store migration:
+ * In the case that users are willing to change the storage medium of job state due to some reasons.
+ *
+ * Current implementation doesn't support data awareness on either source or target side.
+ * And only migrate a single job state instead of migrating all history versions.
+ */
+@Slf4j
+@Alias(value = "stateMigration", description = "Command line tools for migrating state store")
+public class StateStoreMigrationCli implements CliApplication {
+ private static final String SOURCE_KEY = "source";
+ private static final String DESTINATION_KEY = "destination";
+ private static final String JOB_NAME_KEY = "jobName";
+
+ /**
+ * Assume that there's no additional '/' in the end of state.store.dir's value
+ */
+ private String extractStoreName(String dirPath) {
+ return dirPath.substring(dirPath.lastIndexOf('/') + 1);
+ }
+
+ @Override
+ public void run(String[] args) throws Exception {
+ CliObjectFactory<Command> factory = new ConstructorAndPublicMethodsCliObjectFactory<>(Command.class);
+ Command command = factory.buildObject(args, 1, true, args[0]);
+
+ FileSystem fs = FileSystem.get(new Configuration());
+ FSDataInputStream inputStream = fs.open(command.path);
+ Config config = ConfigFactory.parseReader(new InputStreamReader(inputStream, Charset.defaultCharset()));
+ String storeName = this.extractStoreName(config.getConfig(SOURCE_KEY).getString(STATE_STORE_ROOT_DIR_KEY));
+
+ Preconditions.checkNotNull(config.getObject(SOURCE_KEY));
+ Preconditions.checkNotNull(config.getObject(DESTINATION_KEY));
+ Preconditions.checkNotNull(config.getString(JOB_NAME_KEY));
+
+ DatasetStateStore dstDatasetStateStore =
+ DatasetStateStore.buildDatasetStateStore(config.getConfig(DESTINATION_KEY));
+ DatasetStateStore srcDatasetStateStore = DatasetStateStore.buildDatasetStateStore(config.getConfig(SOURCE_KEY));
+
+ Map<String, JobState.DatasetState> map =
+ srcDatasetStateStore.getLatestDatasetStatesByUrns(config.getString(JOB_NAME_KEY));
+ for (Map.Entry<String, JobState.DatasetState> entry : map.entrySet()) {
+ dstDatasetStateStore.persistDatasetState(entry.getKey(), entry.getValue());
+ }
+ if (command.deleteSourceStateStore) {
+ try {
+ srcDatasetStateStore.delete(storeName);
+ } catch (IOException ioe) {
+ log.warn("The source state store has been deleted.", ioe);
+ }
+ }
+ }
+
+ /**
+ * This class has to been public static for being accessed by
+ * {@link ConstructorAndPublicMethodsCliObjectFactory#inferConstructorOptions}
+ */
+ public static class Command {
+
+ private final Path path;
+ private boolean deleteSourceStateStore = false;
+
+ @CliObjectSupport(argumentNames = "configPath")
+ public Command(String path) throws URISyntaxException, IOException {
+ this.path = new Path(path);
+ }
+
+ public void deleteSourceStateStore() {
+ this.deleteSourceStateStore = true;
+ }
+ }
+}