You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/04 00:57:51 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-712] Add
version strategy pickup for ConfigBasedDataset distcp workflow
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 31d85d7 [GOBBLIN-712] Add version strategy pickup for ConfigBasedDataset distcp workflow
31d85d7 is described below
commit 31d85d7a6d5e4bd96f44184af7a49d699ff6f3d9
Author: Kuai Yu <ku...@linkedin.com>
AuthorDate: Wed Apr 3 17:57:35 2019 -0700
[GOBBLIN-712] Add version strategy pickup for ConfigBasedDataset distcp workflow
Closes #2579 from yukuai518/vs
---
.../data/management/copy/CopyConfiguration.java | 8 +-
.../copy/replication/ConfigBasedDataset.java | 87 ++++++++++++++++++++--
.../copy/replication/ReplicationConfiguration.java | 20 +++++
.../copy/replication/ConfigBasedDatasetTest.java | 2 +
4 files changed, 109 insertions(+), 8 deletions(-)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
index c4d07e2..56d292c 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
@@ -23,7 +23,7 @@ import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
-import org.apache.gobblin.util.request_allocation.ConcurrentBoundedPriorityIterable;
+import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -59,6 +59,8 @@ public class CopyConfiguration {
public static final String INCLUDE_EMPTY_DIRECTORIES = COPY_PREFIX + ".includeEmptyDirectories";
public static final String APPLY_FILTER_TO_DIRECTORIES = COPY_PREFIX + ".applyFilterToDirectories";
+ public static final String ENFORCE_FILE_LENGTH_MATCH = COPY_PREFIX + "enforce.fileLength.match";
+ public static final String DEFAULT_ENFORCE_FILE_LENGTH_MATCH = "true";
public static final String PRIORITIZER_ALIAS_KEY = PRIORITIZATION_PREFIX + ".prioritizerAlias";
public static final String MAX_COPY_PREFIX = PRIORITIZATION_PREFIX + ".maxCopy";
@@ -95,7 +97,7 @@ public class CopyConfiguration {
private final Config config;
private final boolean abortOnSingleDatasetFailure;
-
+ private final boolean enforceFileLengthMatch;
public static class CopyConfigurationBuilder {
private PreserveAttributes preserve;
@@ -133,7 +135,7 @@ public class CopyConfiguration {
this.prioritizer = Optional.absent();
}
this.maxToCopy = CopyResourcePool.fromConfig(ConfigUtils.getConfigOrEmpty(this.config, MAX_COPY_PREFIX));
-
+ this.enforceFileLengthMatch = PropertiesUtils.getPropAsBoolean(properties, ENFORCE_FILE_LENGTH_MATCH, DEFAULT_ENFORCE_FILE_LENGTH_MATCH);
this.storeRejectedRequestsSetting =
properties.getProperty(CopyConfiguration.STORE_REJECTED_REQUESTS_KEY, DEFAULT_STORE_REJECTED_REQUESTS);
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
index cc893a6..29d8276 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
@@ -29,10 +29,13 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
@@ -44,6 +47,9 @@ import org.apache.gobblin.data.management.dataset.DatasetUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.commit.DeleteFileCommitStep;
+import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
+import org.apache.gobblin.util.filesystem.ModTimeDataFileVersionStrategy;
+
import lombok.extern.slf4j.Slf4j;
@@ -70,6 +76,8 @@ public class ConfigBasedDataset implements CopyableDataset {
private String datasetURN;
private boolean watermarkEnabled;
private final PathFilter pathFilter;
+ private final Optional<DataFileVersionStrategy> srcDataFileVersionStrategy;
+ private final Optional<DataFileVersionStrategy> dstDataFileVersionStrategy;
//Apply filter to directories
private final boolean applyFilterToDirectories;
@@ -84,6 +92,8 @@ public class ConfigBasedDataset implements CopyableDataset {
this.pathFilter = DatasetUtils.instantiatePathFilter(this.props);
this.applyFilterToDirectories =
Boolean.parseBoolean(this.props.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false"));
+ this.srcDataFileVersionStrategy = getDataFileVersionStrategy(this.copyRoute.getCopyFrom(), rc, props);
+ this.dstDataFileVersionStrategy = getDataFileVersionStrategy(this.copyRoute.getCopyTo(), rc, props);
}
public ConfigBasedDataset(ReplicationConfiguration rc, Properties props, CopyRoute copyRoute, String datasetURN) {
@@ -94,6 +104,40 @@ public class ConfigBasedDataset implements CopyableDataset {
this.pathFilter = DatasetUtils.instantiatePathFilter(this.props);
this.applyFilterToDirectories =
Boolean.parseBoolean(this.props.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false"));
+ this.srcDataFileVersionStrategy = getDataFileVersionStrategy(this.copyRoute.getCopyFrom(), rc, props);
+ this.dstDataFileVersionStrategy = getDataFileVersionStrategy(this.copyRoute.getCopyTo(), rc, props);
+ }
+
+ /**
+ * Get the version strategy that can retrieve the data file version from the end point.
+ *
+ * @return the version strategy. Empty value when the version is not supported for this end point.
+ */
+ private Optional<DataFileVersionStrategy> getDataFileVersionStrategy(EndPoint endPoint, ReplicationConfiguration rc, Properties props) {
+ if (!(endPoint instanceof HadoopFsEndPoint)) {
+ log.warn("Data file version currently only handle the Hadoop Fs EndPoint replication");
+ return Optional.absent();
+ }
+ Configuration conf = HadoopUtils.newConfiguration();
+ try {
+ HadoopFsEndPoint hEndpoint = (HadoopFsEndPoint) endPoint;
+ FileSystem fs = FileSystem.get(hEndpoint.getFsURI(), conf);
+
+ // If configStore doesn't contain the strategy, check from job properties.
+ // If no strategy is found, default to the modification time strategy.
+ Optional<String> versionStrategy = rc.getVersionStrategyFromConfigStore();
+ Config versionStrategyConfig = ConfigFactory.parseMap(ImmutableMap.of(
+ DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY, versionStrategy.isPresent()? versionStrategy.get() :
+ props.getProperty(DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY,
+ ModTimeDataFileVersionStrategy.Factory.class.getName())));
+
+ DataFileVersionStrategy strategy = DataFileVersionStrategy.instantiateDataFileVersionStrategy(fs, versionStrategyConfig);
+ log.debug("{} has version strategy {}", hEndpoint.getClusterName());
+ return Optional.of(strategy);
+ } catch (IOException e) {
+ log.error("Version strategy cannot be created due to {}", e);
+ return Optional.absent();
+ }
}
private void calculateDatasetURN() {
@@ -128,6 +172,19 @@ public class ConfigBasedDataset implements CopyableDataset {
return copyableFiles;
}
+ if (!this.srcDataFileVersionStrategy.isPresent() || !this.dstDataFileVersionStrategy.isPresent()) {
+ log.warn("Version strategy doesn't exist, cannot handle copy");
+ return copyableFiles;
+ }
+
+ if (!this.srcDataFileVersionStrategy.get().getClass().getName()
+ .equals(this.dstDataFileVersionStrategy.get().getClass().getName())) {
+ log.warn("Version strategy src: {} and dst: {} doesn't match, cannot handle copy.",
+ this.srcDataFileVersionStrategy.get().getClass().getName(),
+ this.dstDataFileVersionStrategy.get().getClass().getName());
+ return copyableFiles;
+ }
+
//For {@link HadoopFsEndPoint}s, set pathfilter and applyFilterToDirectories
HadoopFsEndPoint copyFrom = (HadoopFsEndPoint) copyFromRaw;
HadoopFsEndPoint copyTo = (HadoopFsEndPoint) copyToRaw;
@@ -177,12 +234,32 @@ public class ConfigBasedDataset implements CopyableDataset {
watermarkMetadataCopied = true;
}
- // skip copy same file
- if (copyToFileMap.containsKey(newPath) && copyToFileMap.get(newPath).getLen() == originFileStatus.getLen()
- && copyToFileMap.get(newPath).getModificationTime() > originFileStatus.getModificationTime()) {
- log.debug("Copy from timestamp older than copy to timestamp, skipped copy {} for dataset with metadata {}",
- originFileStatus.getPath(), this.rc.getMetaData());
+
+ boolean shouldCopy = true;
+ if (copyToFileMap.containsKey(newPath)) {
+ Comparable srcVer = this.srcDataFileVersionStrategy.get().getVersion(originFileStatus.getPath());
+ Comparable dstVer = this.dstDataFileVersionStrategy.get().getVersion(copyToFileMap.get(newPath).getPath());
+
+ // destination has higher version, skip the copy
+ if (srcVer.compareTo(dstVer) <= 0) {
+ if (!copyConfiguration.isEnforceFileLengthMatch() || copyToFileMap.get(newPath).getLen() == originFileStatus.getLen()) {
+ log.debug("Copy from src {} (v:{}) to dst {} (v:{}) can be skipped.",
+ originFileStatus.getPath(), srcVer, copyToFileMap.get(newPath).getPath(), dstVer);
+ shouldCopy = false;
+ } else {
+ log.debug("Copy from src {} (v:{}) to dst {} (v:{}) can not be skipped due to unmatched file length.",
+ originFileStatus.getPath(), srcVer, copyToFileMap.get(newPath).getPath(), dstVer);
+ }
+ } else {
+ log.debug("Copy from src {} (v:{}) to dst {} (v:{}) is needed due to a higher version.",
+ originFileStatus.getPath(), srcVer, copyToFileMap.get(newPath).getPath(), dstVer);
+ }
} else {
+ log.debug("Copy from src {} to dst {} is needed because dst doesn't contain the file",
+ originFileStatus.getPath(), copyToFileMap.get(newPath));
+ }
+
+ if (shouldCopy) {
// need to remove those files in the target File System
if (copyToFileMap.containsKey(newPath)) {
deletedPaths.add(newPath);
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
index b2df7f1..fea9a35 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
@@ -27,10 +27,15 @@ import java.util.Set;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
+import org.apache.gobblin.util.filesystem.ModTimeDataFileVersionStrategy;
+
import lombok.Getter;
@@ -115,6 +120,9 @@ public class ReplicationConfiguration {
@Getter
private final boolean deleteTargetIfNotExistOnSource;
+ @Getter
+ private final Optional<String> versionStrategyFromConfigStore;
+
public static ReplicationConfiguration buildFromConfig(Config input)
throws InstantiationException, IllegalAccessException, ClassNotFoundException {
Preconditions.checkArgument(input != null, "can not build ReplicationConfig from null");
@@ -131,6 +139,7 @@ public class ReplicationConfiguration {
.withDataFlowTopologyConfig(config)
.withCopyRouteGenerator(config)
.withDeleteTarget(config)
+ .withVersionStrategyFromConfigStore(config)
.build();
}
@@ -143,6 +152,7 @@ public class ReplicationConfiguration {
this.dataFlowToplogy = builder.dataFlowTopology;
this.copyRouteGenerator = builder.copyRouteGenerator;
this.deleteTargetIfNotExistOnSource = builder.deleteTargetIfNotExistOnSource;
+ this.versionStrategyFromConfigStore = builder.versionStrategyFromConfigStore;
}
private static class Builder {
@@ -169,6 +179,16 @@ public class ReplicationConfiguration {
private boolean deleteTargetIfNotExistOnSource = false;
+ private Optional<String> versionStrategyFromConfigStore = Optional.absent();
+
+
+ public Builder withVersionStrategyFromConfigStore(Config config) {
+ this.versionStrategyFromConfigStore = config.hasPath(DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY)?
+ Optional.of(config.getString(DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY)) :
+ Optional.absent();
+ return this;
+ }
+
public Builder withReplicationMetaData(ReplicationMetaData metaData) {
this.metaData = metaData;
return this;
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java
index f925243..9a01447 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java
@@ -47,6 +47,7 @@ import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.commit.DeleteFileCommitStep;
+import org.apache.gobblin.util.filesystem.ModTimeDataFileVersionStrategy;
/**
@@ -89,6 +90,7 @@ public class ConfigBasedDatasetTest {
ReplicationConfiguration mockRC = Mockito.mock(ReplicationConfiguration.class);
Mockito.when(mockRC.getCopyMode()).thenReturn(ReplicationCopyMode.PULL);
Mockito.when(mockRC.getMetaData()).thenReturn(mockMetaData);
+ Mockito.when(mockRC.getVersionStrategyFromConfigStore()).thenReturn(Optional.of(ModTimeDataFileVersionStrategy.Factory.class.getName()));
HadoopFsEndPoint copyFrom = Mockito.mock(HadoopFsEndPoint.class);
Mockito.when(copyFrom.getDatasetPath()).thenReturn(new Path(sourceDir));