You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/04 00:57:51 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-712] Add version strategy pickup for ConfigBasedDataset distcp workflow

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 31d85d7  [GOBBLIN-712] Add version strategy pickup for ConfigBasedDataset distcp workflow
31d85d7 is described below

commit 31d85d7a6d5e4bd96f44184af7a49d699ff6f3d9
Author: Kuai Yu <ku...@linkedin.com>
AuthorDate: Wed Apr 3 17:57:35 2019 -0700

    [GOBBLIN-712] Add version strategy pickup for ConfigBasedDataset distcp workflow
    
    Closes #2579 from yukuai518/vs
---
 .../data/management/copy/CopyConfiguration.java    |  8 +-
 .../copy/replication/ConfigBasedDataset.java       | 87 ++++++++++++++++++++--
 .../copy/replication/ReplicationConfiguration.java | 20 +++++
 .../copy/replication/ConfigBasedDatasetTest.java   |  2 +
 4 files changed, 109 insertions(+), 8 deletions(-)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
index c4d07e2..56d292c 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
@@ -23,7 +23,7 @@ import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 
-import org.apache.gobblin.util.request_allocation.ConcurrentBoundedPriorityIterable;
+import org.apache.gobblin.util.PropertiesUtils;
 import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -59,6 +59,8 @@ public class CopyConfiguration {
   public static final String INCLUDE_EMPTY_DIRECTORIES = COPY_PREFIX + ".includeEmptyDirectories";
   public static final String APPLY_FILTER_TO_DIRECTORIES = COPY_PREFIX + ".applyFilterToDirectories";
 
+  public static final String ENFORCE_FILE_LENGTH_MATCH = COPY_PREFIX + "enforce.fileLength.match";
+  public static final String DEFAULT_ENFORCE_FILE_LENGTH_MATCH = "true";
   public static final String PRIORITIZER_ALIAS_KEY = PRIORITIZATION_PREFIX + ".prioritizerAlias";
   public static final String MAX_COPY_PREFIX = PRIORITIZATION_PREFIX + ".maxCopy";
 
@@ -95,7 +97,7 @@ public class CopyConfiguration {
   private final Config config;
 
   private final boolean abortOnSingleDatasetFailure;
-
+  private final boolean enforceFileLengthMatch;
   public static class CopyConfigurationBuilder {
 
     private PreserveAttributes preserve;
@@ -133,7 +135,7 @@ public class CopyConfiguration {
         this.prioritizer = Optional.absent();
       }
       this.maxToCopy = CopyResourcePool.fromConfig(ConfigUtils.getConfigOrEmpty(this.config, MAX_COPY_PREFIX));
-
+      this.enforceFileLengthMatch = PropertiesUtils.getPropAsBoolean(properties, ENFORCE_FILE_LENGTH_MATCH, DEFAULT_ENFORCE_FILE_LENGTH_MATCH);
       this.storeRejectedRequestsSetting =
           properties.getProperty(CopyConfiguration.STORE_REJECTED_REQUESTS_KEY, DEFAULT_STORE_REJECTED_REQUESTS);
 
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
index cc893a6..29d8276 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
@@ -29,10 +29,13 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.data.management.copy.CopyConfiguration;
 import org.apache.gobblin.data.management.copy.CopyEntity;
@@ -44,6 +47,9 @@ import org.apache.gobblin.data.management.dataset.DatasetUtils;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.commit.DeleteFileCommitStep;
+import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
+import org.apache.gobblin.util.filesystem.ModTimeDataFileVersionStrategy;
+
 import lombok.extern.slf4j.Slf4j;
 
 
@@ -70,6 +76,8 @@ public class ConfigBasedDataset implements CopyableDataset {
   private String datasetURN;
   private boolean watermarkEnabled;
   private final PathFilter pathFilter;
+  private final Optional<DataFileVersionStrategy> srcDataFileVersionStrategy;
+  private final Optional<DataFileVersionStrategy> dstDataFileVersionStrategy;
 
   //Apply filter to directories
   private final boolean applyFilterToDirectories;
@@ -84,6 +92,8 @@ public class ConfigBasedDataset implements CopyableDataset {
     this.pathFilter = DatasetUtils.instantiatePathFilter(this.props);
     this.applyFilterToDirectories =
         Boolean.parseBoolean(this.props.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false"));
+    this.srcDataFileVersionStrategy = getDataFileVersionStrategy(this.copyRoute.getCopyFrom(), rc, props);
+    this.dstDataFileVersionStrategy = getDataFileVersionStrategy(this.copyRoute.getCopyTo(), rc, props);
   }
 
   public ConfigBasedDataset(ReplicationConfiguration rc, Properties props, CopyRoute copyRoute, String datasetURN) {
@@ -94,6 +104,40 @@ public class ConfigBasedDataset implements CopyableDataset {
     this.pathFilter = DatasetUtils.instantiatePathFilter(this.props);
     this.applyFilterToDirectories =
         Boolean.parseBoolean(this.props.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false"));
+    this.srcDataFileVersionStrategy = getDataFileVersionStrategy(this.copyRoute.getCopyFrom(), rc, props);
+    this.dstDataFileVersionStrategy = getDataFileVersionStrategy(this.copyRoute.getCopyTo(), rc, props);
+  }
+
+  /**
+   * Get the version strategy that can retrieve the data file version from the end point.
+   *
+   * @return the version strategy. Empty value when the version is not supported for this end point.
+   */
+  private Optional<DataFileVersionStrategy> getDataFileVersionStrategy(EndPoint endPoint, ReplicationConfiguration rc, Properties props) {
+    if (!(endPoint instanceof HadoopFsEndPoint)) {
+      log.warn("Data file version currently only handle the Hadoop Fs EndPoint replication");
+      return Optional.absent();
+    }
+    Configuration conf = HadoopUtils.newConfiguration();
+    try {
+      HadoopFsEndPoint hEndpoint = (HadoopFsEndPoint) endPoint;
+      FileSystem fs = FileSystem.get(hEndpoint.getFsURI(), conf);
+
+      // If configStore doesn't contain the strategy, check from job properties.
+      // If no strategy is found, default to the modification time strategy.
+      Optional<String> versionStrategy = rc.getVersionStrategyFromConfigStore();
+      Config versionStrategyConfig = ConfigFactory.parseMap(ImmutableMap.of(
+          DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY, versionStrategy.isPresent()? versionStrategy.get() :
+              props.getProperty(DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY,
+                  ModTimeDataFileVersionStrategy.Factory.class.getName())));
+
+      DataFileVersionStrategy strategy = DataFileVersionStrategy.instantiateDataFileVersionStrategy(fs, versionStrategyConfig);
+      log.debug("{} has version strategy {}", hEndpoint.getClusterName());
+      return Optional.of(strategy);
+    } catch (IOException e) {
+      log.error("Version strategy cannot be created due to {}", e);
+      return Optional.absent();
+    }
   }
 
   private void calculateDatasetURN() {
@@ -128,6 +172,19 @@ public class ConfigBasedDataset implements CopyableDataset {
       return copyableFiles;
     }
 
+    if (!this.srcDataFileVersionStrategy.isPresent() || !this.dstDataFileVersionStrategy.isPresent()) {
+      log.warn("Version strategy doesn't exist, cannot handle copy");
+      return copyableFiles;
+    }
+
+    if (!this.srcDataFileVersionStrategy.get().getClass().getName()
+        .equals(this.dstDataFileVersionStrategy.get().getClass().getName())) {
+      log.warn("Version strategy src: {} and dst: {} doesn't match, cannot handle copy.",
+          this.srcDataFileVersionStrategy.get().getClass().getName(),
+          this.dstDataFileVersionStrategy.get().getClass().getName());
+      return copyableFiles;
+    }
+
     //For {@link HadoopFsEndPoint}s, set pathfilter and applyFilterToDirectories
     HadoopFsEndPoint copyFrom = (HadoopFsEndPoint) copyFromRaw;
     HadoopFsEndPoint copyTo = (HadoopFsEndPoint) copyToRaw;
@@ -177,12 +234,32 @@ public class ConfigBasedDataset implements CopyableDataset {
         watermarkMetadataCopied = true;
       }
 
-      // skip copy same file
-      if (copyToFileMap.containsKey(newPath) && copyToFileMap.get(newPath).getLen() == originFileStatus.getLen()
-          && copyToFileMap.get(newPath).getModificationTime() > originFileStatus.getModificationTime()) {
-        log.debug("Copy from timestamp older than copy to timestamp, skipped copy {} for dataset with metadata {}",
-            originFileStatus.getPath(), this.rc.getMetaData());
+
+      boolean shouldCopy = true;
+      if (copyToFileMap.containsKey(newPath)) {
+        Comparable srcVer = this.srcDataFileVersionStrategy.get().getVersion(originFileStatus.getPath());
+        Comparable dstVer = this.dstDataFileVersionStrategy.get().getVersion(copyToFileMap.get(newPath).getPath());
+
+        // destination has higher version, skip the copy
+        if (srcVer.compareTo(dstVer) <= 0) {
+          if (!copyConfiguration.isEnforceFileLengthMatch() || copyToFileMap.get(newPath).getLen() == originFileStatus.getLen()) {
+            log.debug("Copy from src {} (v:{}) to dst {} (v:{}) can be skipped.",
+                originFileStatus.getPath(), srcVer, copyToFileMap.get(newPath).getPath(), dstVer);
+            shouldCopy = false;
+          } else {
+            log.debug("Copy from src {} (v:{}) to dst {} (v:{}) can not be skipped due to unmatched file length.",
+                originFileStatus.getPath(), srcVer, copyToFileMap.get(newPath).getPath(), dstVer);
+          }
+        } else {
+          log.debug("Copy from src {} (v:{}) to dst {} (v:{}) is needed due to a higher version.",
+              originFileStatus.getPath(), srcVer, copyToFileMap.get(newPath).getPath(), dstVer);
+        }
       } else {
+        log.debug("Copy from src {} to dst {} is needed because dst doesn't contain the file",
+            originFileStatus.getPath(), copyToFileMap.get(newPath));
+      }
+
+      if (shouldCopy) {
         // need to remove those files in the target File System
         if (copyToFileMap.containsKey(newPath)) {
           deletedPaths.add(newPath);
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
index b2df7f1..fea9a35 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
@@ -27,10 +27,15 @@ import java.util.Set;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
+import org.apache.gobblin.util.filesystem.ModTimeDataFileVersionStrategy;
+
 import lombok.Getter;
 
 
@@ -115,6 +120,9 @@ public class ReplicationConfiguration {
   @Getter
   private final boolean deleteTargetIfNotExistOnSource;
 
+  @Getter
+  private final Optional<String> versionStrategyFromConfigStore;
+
   public static ReplicationConfiguration buildFromConfig(Config input)
       throws InstantiationException, IllegalAccessException, ClassNotFoundException {
     Preconditions.checkArgument(input != null, "can not build ReplicationConfig from null");
@@ -131,6 +139,7 @@ public class ReplicationConfiguration {
         .withDataFlowTopologyConfig(config)
         .withCopyRouteGenerator(config)
         .withDeleteTarget(config)
+        .withVersionStrategyFromConfigStore(config)
         .build();
   }
 
@@ -143,6 +152,7 @@ public class ReplicationConfiguration {
     this.dataFlowToplogy = builder.dataFlowTopology;
     this.copyRouteGenerator = builder.copyRouteGenerator;
     this.deleteTargetIfNotExistOnSource = builder.deleteTargetIfNotExistOnSource;
+    this.versionStrategyFromConfigStore = builder.versionStrategyFromConfigStore;
   }
 
   private static class Builder {
@@ -169,6 +179,16 @@ public class ReplicationConfiguration {
 
     private boolean deleteTargetIfNotExistOnSource = false;
 
+    private Optional<String> versionStrategyFromConfigStore = Optional.absent();
+
+
+    public Builder withVersionStrategyFromConfigStore(Config config) {
+      this.versionStrategyFromConfigStore = config.hasPath(DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY)?
+          Optional.of(config.getString(DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY)) :
+          Optional.absent();
+      return this;
+    }
+
     public Builder withReplicationMetaData(ReplicationMetaData metaData) {
       this.metaData = metaData;
       return this;
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java
index f925243..9a01447 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java
@@ -47,6 +47,7 @@ import org.apache.gobblin.source.extractor.extract.LongWatermark;
 import org.apache.gobblin.util.FileListUtils;
 import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.commit.DeleteFileCommitStep;
+import org.apache.gobblin.util.filesystem.ModTimeDataFileVersionStrategy;
 
 
 /**
@@ -89,6 +90,7 @@ public class ConfigBasedDatasetTest {
     ReplicationConfiguration mockRC = Mockito.mock(ReplicationConfiguration.class);
     Mockito.when(mockRC.getCopyMode()).thenReturn(ReplicationCopyMode.PULL);
     Mockito.when(mockRC.getMetaData()).thenReturn(mockMetaData);
+    Mockito.when(mockRC.getVersionStrategyFromConfigStore()).thenReturn(Optional.of(ModTimeDataFileVersionStrategy.Factory.class.getName()));
 
     HadoopFsEndPoint copyFrom = Mockito.mock(HadoopFsEndPoint.class);
     Mockito.when(copyFrom.getDatasetPath()).thenReturn(new Path(sourceDir));