You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/04/24 18:24:31 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-751] Make enforced file size matching to be configurable

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 67e986e  [GOBBLIN-751] Make enforced file size matching to be configurable
67e986e is described below

commit 67e986e92337fe41bcb975d6f212ab4582dbf4dd
Author: Kuai Yu <ku...@linkedin.com>
AuthorDate: Wed Apr 24 11:24:24 2019 -0700

    [GOBBLIN-751] Make enforced file size matching to be configurable
    
    Make enforced file size matching to be
    configurable.
    
    Dear Gobblin maintainers,
    
    Please accept this PR. I understand that it will
    not be reviewed until I have checked off all the
    steps below!
    
    ### JIRA
    - [x] My PR addresses the following [Gobblin JIRA]
    (https://issues.apache.org/jira/browse/GOBBLIN/)
    issues and references them in the PR title. For
    example, "[GOBBLIN-XXX] My Gobblin PR"
        -
    https://issues.apache.org/jira/browse/GOBBLIN-751
    
    ### Description
    - [x] Here are some details about my PR, including
    screenshots (if applicable):
      For better rollout (selectively rollout a few
    datasets for validation)
       (1)This PR makes 'enforced file size matching' to
    be configurable when we copy data files.
       (2)This PR also make the dataFileVersionStrategy
    to be configurable for different dataset during
    the publisher phase.
    
    ### Tests
    - [x] My PR adds the following unit tests __OR__
    does not need testing for this extremely good
    reason:
    
    ### Commits
    - [ ] My commits all reference JIRA issues in
    their subject lines, and I have squashed multiple
    commits if they address the same issue. In
    addition, my commits follow the guidelines from
    "[How to write a good git commit
    message](http://chris.beams.io/posts/git-
    commit/)":
        1. Subject is separated from body by a blank line
        2. Subject is limited to 50 characters
        3. Subject does not end with a period
        4. Subject uses the imperative mood ("add", not
    "adding")
        5. Body wraps at 72 characters
        6. Body explains "what" and "why", not "how"
    
    Make enforced file size matching to be
    configurable for different ConfigBasedDataset
    
    Fix the spelling
    
    Closes #2616 from yukuai518/cuz
---
 .../gobblin/data/management/copy/CopyableFile.java |  8 +++--
 .../copy/publisher/CopyDataPublisher.java          | 24 +++++++++++----
 .../copy/replication/ConfigBasedDataset.java       | 36 ++++++++++++++--------
 .../copy/replication/ReplicationConfiguration.java | 17 ++++++++--
 .../copy/ConcurrentBoundedWorkUnitListTest.java    |  2 +-
 .../copy/CopySourcePrioritizationTest.java         |  2 +-
 .../data/management/copy/CopyableFileTest.java     |  8 ++---
 .../data/management/copy/CopyableFileUtils.java    |  4 +--
 .../copy/replication/ConfigBasedDatasetTest.java   |  6 ++--
 .../util/filesystem/DataFileVersionStrategy.java   |  3 +-
 .../filesystem/ModTimeDataFileVersionStrategy.java |  3 ++
 11 files changed, 77 insertions(+), 36 deletions(-)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index 6c1093f..5fbae45 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -100,11 +100,14 @@ public class CopyableFile extends CopyEntity implements File {
   /** Timestamp of file as in upstream. */
   private long upstreamTimestamp;
 
+  private String dataFileVersionStrategy;
+
   @lombok.Builder(builderClassName = "Builder", builderMethodName = "_hiddenBuilder")
   public CopyableFile(FileStatus origin, Path destination, OwnerAndPermission destinationOwnerAndPermission,
       List<OwnerAndPermission> ancestorsOwnerAndPermission, byte[] checksum, PreserveAttributes preserve,
       String fileSet, long originTimestamp, long upstreamTimestamp, Map<String, String> additionalMetadata,
-      String datasetOutputPath) {
+      String datasetOutputPath,
+      String dataFileVersionStrategy) {
     super(fileSet, additionalMetadata);
     this.origin = origin;
     this.destination = destination;
@@ -112,6 +115,7 @@ public class CopyableFile extends CopyEntity implements File {
     this.ancestorsOwnerAndPermission = ancestorsOwnerAndPermission;
     this.checksum = checksum;
     this.preserve = preserve;
+    this.dataFileVersionStrategy = dataFileVersionStrategy;
     this.originTimestamp = originTimestamp;
     this.upstreamTimestamp = upstreamTimestamp;
     this.datasetOutputPath = datasetOutputPath;
@@ -262,7 +266,7 @@ public class CopyableFile extends CopyEntity implements File {
 
       return new CopyableFile(this.origin, this.destination, this.destinationOwnerAndPermission,
           this.ancestorsOwnerAndPermission, this.checksum, this.preserve, this.fileSet, this.originTimestamp,
-          this.upstreamTimestamp, this.additionalMetadata, this.datasetOutputPath);
+          this.upstreamTimestamp, this.additionalMetadata, this.datasetOutputPath, this.dataFileVersionStrategy);
     }
 
     private List<OwnerAndPermission> replicateAncestorsOwnerAndPermission(FileSystem originFs, Path originPath,
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index 4f7c0ed..f983c2a 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -36,10 +36,12 @@ import org.apache.hadoop.fs.Path;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -83,7 +85,7 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl
   public boolean isThreadSafe() {
     return this.getClass() == CopyDataPublisher.class;
   }
-
+  private final FileSystem srcFs;
   private final FileSystem fs;
   protected final EventSubmitter eventSubmitter;
   protected final RecoveryHelper recoveryHelper;
@@ -128,8 +130,8 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl
 
     Config config = ConfigUtils.propertiesToConfig(state.getProperties());
 
-    this.srcDataFileVersionStrategy = DataFileVersionStrategy
-        .instantiateDataFileVersionStrategy(HadoopUtils.getSourceFileSystem(state), config);
+    this.srcFs = HadoopUtils.getSourceFileSystem(state);
+    this.srcDataFileVersionStrategy = DataFileVersionStrategy.instantiateDataFileVersionStrategy(this.srcFs, config);
     this.dstDataFileVersionStrategy = DataFileVersionStrategy.instantiateDataFileVersionStrategy(this.fs, config);
   }
 
@@ -231,11 +233,21 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl
       CopyEntity copyEntity = CopySource.deserializeCopyEntity(wus);
       if (copyEntity instanceof CopyableFile) {
         CopyableFile copyableFile = (CopyableFile) copyEntity;
+        DataFileVersionStrategy srcVS = this.srcDataFileVersionStrategy;
+        DataFileVersionStrategy dstVS = this.dstDataFileVersionStrategy;
+
+        // Prefer to use copyableFile's specific version strategy
+        if (copyableFile.getDataFileVersionStrategy() != null) {
+          Config versionStrategyConfig = ConfigFactory.parseMap(ImmutableMap.of(
+              DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY, copyableFile.getDataFileVersionStrategy()));
+          srcVS = DataFileVersionStrategy.instantiateDataFileVersionStrategy(this.srcFs, versionStrategyConfig);
+          dstVS = DataFileVersionStrategy.instantiateDataFileVersionStrategy(this.fs, versionStrategyConfig);
+        }
 
         if (copyableFile.getPreserve().preserve(PreserveAttributes.Option.VERSION)
-            && this.dstDataFileVersionStrategy.hasCharacteristic(DataFileVersionStrategy.Characteristic.SETTABLE)) {
-          this.dstDataFileVersionStrategy.setVersion(copyableFile.getDestination(),
-              this.srcDataFileVersionStrategy.getVersion(copyableFile.getOrigin().getPath()));
+            && dstVS.hasCharacteristic(DataFileVersionStrategy.Characteristic.SETTABLE)) {
+          dstVS.setVersion(copyableFile.getDestination(),
+              srcVS.getVersion(copyableFile.getOrigin().getPath()));
         }
 
         if (wus.getWorkingState() == WorkingState.COMMITTED) {
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
index 83cd76e..4d99e3a 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
@@ -48,7 +48,6 @@ import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.commit.DeleteFileCommitStep;
 import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
-import org.apache.gobblin.util.filesystem.ModTimeDataFileVersionStrategy;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -86,6 +85,9 @@ public class ConfigBasedDataset implements CopyableDataset {
   //Apply filter to directories
   private final boolean applyFilterToDirectories;
 
+  //Version strategy from config store
+  private Optional<String> versionStrategyFromCS = Optional.absent();
+
   public ConfigBasedDataset(ReplicationConfiguration rc, Properties props, CopyRoute copyRoute) {
     this.props = props;
     this.copyRoute = copyRoute;
@@ -96,8 +98,8 @@ public class ConfigBasedDataset implements CopyableDataset {
     this.pathFilter = DatasetUtils.instantiatePathFilter(this.props);
     this.applyFilterToDirectories =
         Boolean.parseBoolean(this.props.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false"));
-    this.srcDataFileVersionStrategy = getDataFileVersionStrategy(this.copyRoute.getCopyFrom(), rc, props);
-    this.dstDataFileVersionStrategy = getDataFileVersionStrategy(this.copyRoute.getCopyTo(), rc, props);
+    this.srcDataFileVersionStrategy = initDataFileVersionStrategy(this.copyRoute.getCopyFrom(), rc, props);
+    this.dstDataFileVersionStrategy = initDataFileVersionStrategy(this.copyRoute.getCopyTo(), rc, props);
   }
 
   public ConfigBasedDataset(ReplicationConfiguration rc, Properties props, CopyRoute copyRoute, String datasetURN) {
@@ -108,8 +110,8 @@ public class ConfigBasedDataset implements CopyableDataset {
     this.pathFilter = DatasetUtils.instantiatePathFilter(this.props);
     this.applyFilterToDirectories =
         Boolean.parseBoolean(this.props.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false"));
-    this.srcDataFileVersionStrategy = getDataFileVersionStrategy(this.copyRoute.getCopyFrom(), rc, props);
-    this.dstDataFileVersionStrategy = getDataFileVersionStrategy(this.copyRoute.getCopyTo(), rc, props);
+    this.srcDataFileVersionStrategy = initDataFileVersionStrategy(this.copyRoute.getCopyFrom(), rc, props);
+    this.dstDataFileVersionStrategy = initDataFileVersionStrategy(this.copyRoute.getCopyTo(), rc, props);
   }
 
   /**
@@ -117,7 +119,9 @@ public class ConfigBasedDataset implements CopyableDataset {
    *
    * @return the version strategy. Empty value when the version is not supported for this end point.
    */
-  private Optional<DataFileVersionStrategy> getDataFileVersionStrategy(EndPoint endPoint, ReplicationConfiguration rc, Properties props) {
+  private Optional<DataFileVersionStrategy> initDataFileVersionStrategy(EndPoint endPoint, ReplicationConfiguration rc, Properties props) {
+
+    // rc is the dataset config???
     if (!(endPoint instanceof HadoopFsEndPoint)) {
       log.warn("Data file version currently only handle the Hadoop Fs EndPoint replication");
       return Optional.absent();
@@ -129,14 +133,14 @@ public class ConfigBasedDataset implements CopyableDataset {
 
       // If configStore doesn't contain the strategy, check from job properties.
       // If no strategy is found, default to the modification time strategy.
-      Optional<String> versionStrategy = rc.getVersionStrategyFromConfigStore();
+      this.versionStrategyFromCS = rc.getVersionStrategyFromConfigStore();
+      String nonEmptyStrategy = versionStrategyFromCS.isPresent()? versionStrategyFromCS.get() :
+          props.getProperty(DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY, DataFileVersionStrategy.DEFAULT_DATA_FILE_VERSION_STRATEGY);
       Config versionStrategyConfig = ConfigFactory.parseMap(ImmutableMap.of(
-          DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY, versionStrategy.isPresent()? versionStrategy.get() :
-              props.getProperty(DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY,
-                  ModTimeDataFileVersionStrategy.Factory.class.getName())));
+          DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY, nonEmptyStrategy));
 
       DataFileVersionStrategy strategy = DataFileVersionStrategy.instantiateDataFileVersionStrategy(fs, versionStrategyConfig);
-      log.debug("{} has version strategy {}", hEndpoint.getClusterName());
+      log.debug("{} has version strategy {}", hEndpoint.getClusterName(), strategy.getClass().getName());
       return Optional.of(strategy);
     } catch (IOException e) {
       log.error("Version strategy cannot be created due to {}", e);
@@ -168,6 +172,10 @@ public class ConfigBasedDataset implements CopyableDataset {
   @Override
   public Collection<? extends CopyEntity> getCopyableFiles(FileSystem targetFs, CopyConfiguration copyConfiguration)
       throws IOException {
+    boolean enforceFileSizeMatch = this.rc.getEnforceFileSizeMatchFromConfigStore().isPresent()?
+        this.rc.getEnforceFileSizeMatchFromConfigStore().get() :
+        copyConfiguration.isEnforceFileLengthMatch();
+
     List<CopyEntity> copyableFiles = Lists.newArrayList();
     EndPoint copyFromRaw = copyRoute.getCopyFrom();
     EndPoint copyToRaw = copyRoute.getCopyTo();
@@ -246,7 +254,7 @@ public class ConfigBasedDataset implements CopyableDataset {
 
         // destination has higher version, skip the copy
         if (srcVer.compareTo(dstVer) <= 0) {
-          if (!copyConfiguration.isEnforceFileLengthMatch() || copyToFileMap.get(newPath).getLen() == originFileStatus.getLen()) {
+          if (!enforceFileSizeMatch || copyToFileMap.get(newPath).getLen() == originFileStatus.getLen()) {
             log.debug("Copy from src {} (v:{}) to dst {} (v:{}) can be skipped.",
                 originFileStatus.getPath(), srcVer, copyToFileMap.get(newPath).getPath(), dstVer);
             shouldCopy = false;
@@ -270,7 +278,9 @@ public class ConfigBasedDataset implements CopyableDataset {
         }
         CopyableFile copyableFile = CopyableFile
             .fromOriginAndDestination(copyFromFs, originFileStatus, copyToFs.makeQualified(newPath), copyConfiguration)
-            .fileSet(PathUtils.getPathWithoutSchemeAndAuthority(copyTo.getDatasetPath()).toString()).build();
+            .fileSet(PathUtils.getPathWithoutSchemeAndAuthority(copyTo.getDatasetPath()).toString())
+            .dataFileVersionStrategy(this.versionStrategyFromCS.isPresent()? this.versionStrategyFromCS.get(): null)
+            .build();
         copyableFile.setFsDatasets(copyFromFs, copyToFs);
         copyableFiles.add(copyableFile);
       }
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
index fea9a35..be07d50 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
@@ -27,14 +27,12 @@ import java.util.Set;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
-import org.apache.gobblin.util.filesystem.ModTimeDataFileVersionStrategy;
 
 import lombok.Getter;
 
@@ -123,6 +121,9 @@ public class ReplicationConfiguration {
   @Getter
   private final Optional<String> versionStrategyFromConfigStore;
 
+  @Getter
+  private final Optional<Boolean> enforceFileSizeMatchFromConfigStore;
+
   public static ReplicationConfiguration buildFromConfig(Config input)
       throws InstantiationException, IllegalAccessException, ClassNotFoundException {
     Preconditions.checkArgument(input != null, "can not build ReplicationConfig from null");
@@ -140,6 +141,7 @@ public class ReplicationConfiguration {
         .withCopyRouteGenerator(config)
         .withDeleteTarget(config)
         .withVersionStrategyFromConfigStore(config)
+        .withEnforceFileSizeMatchFromConfigStore(config)
         .build();
   }
 
@@ -153,6 +155,7 @@ public class ReplicationConfiguration {
     this.copyRouteGenerator = builder.copyRouteGenerator;
     this.deleteTargetIfNotExistOnSource = builder.deleteTargetIfNotExistOnSource;
     this.versionStrategyFromConfigStore = builder.versionStrategyFromConfigStore;
+    this.enforceFileSizeMatchFromConfigStore = builder.enforceFileMatchFromConfigStore;
   }
 
   private static class Builder {
@@ -181,6 +184,14 @@ public class ReplicationConfiguration {
 
     private Optional<String> versionStrategyFromConfigStore = Optional.absent();
 
+    private Optional<Boolean> enforceFileMatchFromConfigStore = Optional.absent();
+
+    public Builder withEnforceFileSizeMatchFromConfigStore(Config config) {
+      this.enforceFileMatchFromConfigStore = config.hasPath(CopyConfiguration.ENFORCE_FILE_LENGTH_MATCH)?
+          Optional.of(config.getBoolean(CopyConfiguration.ENFORCE_FILE_LENGTH_MATCH)) :
+          Optional.absent();
+      return this;
+    }
 
     public Builder withVersionStrategyFromConfigStore(Config config) {
       this.versionStrategyFromConfigStore = config.hasPath(DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY)?
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/ConcurrentBoundedWorkUnitListTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/ConcurrentBoundedWorkUnitListTest.java
index faa39bb..100f736 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/ConcurrentBoundedWorkUnitListTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/ConcurrentBoundedWorkUnitListTest.java
@@ -145,7 +145,7 @@ public class ConcurrentBoundedWorkUnitListTest {
 
     return new CopyableFile(origin, targetPath, new OwnerAndPermission(null, null, null),
         Lists.<OwnerAndPermission>newArrayList(), null, PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps
-        .<String, String>newHashMap(), "");
+        .<String, String>newHashMap(), "", null);
 
   }
 
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourcePrioritizationTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourcePrioritizationTest.java
index 5fdc532..071e72c 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourcePrioritizationTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourcePrioritizationTest.java
@@ -220,7 +220,7 @@ public class CopySourcePrioritizationTest {
   private static CopyableFile createCopyableFile(String path, String fileSet) {
     return new CopyableFile(new FileStatus(0, false, 0, 0, 0, new Path(path)), new Path(path),
         new OwnerAndPermission("owner", "group", FsPermission.getDefault()), null, null,
-        PreserveAttributes.fromMnemonicString(""), fileSet, 0, 0, Maps.<String, String>newHashMap(), "");
+        PreserveAttributes.fromMnemonicString(""), fileSet, 0, 0, Maps.<String, String>newHashMap(), "", null);
   }
 
   public static class MyPrioritizer implements FileSetComparator {
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
index 65425fa..5c8ac96 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
@@ -55,7 +55,7 @@ public class CopyableFileTest {
             new OwnerAndPermission("owner", "group", FsPermission.getDefault()),
             Lists.newArrayList(new OwnerAndPermission("owner2", "group2", FsPermission.getDefault())),
             "checksum".getBytes(), PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps
-            .<String, String>newHashMap(), "");
+            .<String, String>newHashMap(), "", null);
 
     DatasetDescriptor dataset = new DatasetDescriptor("hive", "db.table");
     PartitionDescriptor descriptor = new PartitionDescriptor("datepartition=2018/09/05", dataset);
@@ -74,7 +74,7 @@ public class CopyableFileTest {
         new CopyableFile(null, null, new OwnerAndPermission("owner", "group",
             FsPermission.getDefault()), Lists.newArrayList(new OwnerAndPermission(null, "group2", FsPermission
             .getDefault())), "checksum".getBytes(), PreserveAttributes.fromMnemonicString(""), "", 0, 0,
-            Maps.<String, String>newHashMap(), "");
+            Maps.<String, String>newHashMap(), "", null);
 
     String serialized = CopyEntity.serialize(copyableFile);
     CopyEntity deserialized = CopyEntity.deserialize(serialized);
@@ -114,7 +114,7 @@ public class CopyableFileTest {
     // Test when source file is not a directory
     FileStatus origin = new FileStatus(0l, false, 0, 0l, 0l, new Path(originPath));
     CopyableFile copyableFile = new CopyableFile(origin, new Path(destinationPath), null, null, null,
-        PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), "");
+        PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), "", null);
     copyableFile.setFsDatasets(originFs, targetFs);
     DatasetDescriptor source = (DatasetDescriptor) copyableFile.getSourceData();
     Assert.assertEquals(source.getName(), "/data/databases/source");
@@ -130,7 +130,7 @@ public class CopyableFileTest {
     destinationPath = targetFsUri + destinationPath;
     origin = new FileStatus(0l, true, 0, 0l, 0l, new Path(originPath));
     copyableFile = new CopyableFile(origin, new Path(destinationPath), null, null, null,
-        PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), "");
+        PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), "", null);
     copyableFile.setFsDatasets(originFs, targetFs);
     source = (DatasetDescriptor) copyableFile.getSourceData();
     Assert.assertEquals(source.getName(), "/data/databases/source/profile");
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java
index d8cb938..4e85d5a 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java
@@ -39,7 +39,7 @@ public class CopyableFileUtils {
     FileStatus status = new FileStatus(0l, false, 0, 0l, 0l, new Path(resourcePath));
 
     return new CopyableFile(status, new Path(getRandomPath()), null, null, null,
-        PreserveAttributes.fromMnemonicString(""), "", 0 ,0, Maps.<String, String>newHashMap(), "");
+        PreserveAttributes.fromMnemonicString(""), "", 0 ,0, Maps.<String, String>newHashMap(), "", null);
   }
 
   public static CopyableFile getTestCopyableFile() {
@@ -83,7 +83,7 @@ public class CopyableFileUtils {
     Path destinationRelativePath = new Path(relativePath);
 
     return new CopyableFile(status, new Path(destinationPath), ownerAndPermission, null, null,
-        PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), "");
+        PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), "", null);
   }
 
   private static String getRandomPath() {
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java
index 9a01447..0b1b523 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java
@@ -47,7 +47,7 @@ import org.apache.gobblin.source.extractor.extract.LongWatermark;
 import org.apache.gobblin.util.FileListUtils;
 import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.commit.DeleteFileCommitStep;
-import org.apache.gobblin.util.filesystem.ModTimeDataFileVersionStrategy;
+import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
 
 
 /**
@@ -90,8 +90,8 @@ public class ConfigBasedDatasetTest {
     ReplicationConfiguration mockRC = Mockito.mock(ReplicationConfiguration.class);
     Mockito.when(mockRC.getCopyMode()).thenReturn(ReplicationCopyMode.PULL);
     Mockito.when(mockRC.getMetaData()).thenReturn(mockMetaData);
-    Mockito.when(mockRC.getVersionStrategyFromConfigStore()).thenReturn(Optional.of(ModTimeDataFileVersionStrategy.Factory.class.getName()));
-
+    Mockito.when(mockRC.getVersionStrategyFromConfigStore()).thenReturn(Optional.of(DataFileVersionStrategy.DEFAULT_DATA_FILE_VERSION_STRATEGY));
+    Mockito.when(mockRC.getEnforceFileSizeMatchFromConfigStore()).thenReturn(Optional.absent());
     HadoopFsEndPoint copyFrom = Mockito.mock(HadoopFsEndPoint.class);
     Mockito.when(copyFrom.getDatasetPath()).thenReturn(new Path(sourceDir));
     Mockito.when(copyFrom.getFsURI()).thenReturn(local);
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/DataFileVersionStrategy.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/DataFileVersionStrategy.java
index ca237ee..75228da 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/DataFileVersionStrategy.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/DataFileVersionStrategy.java
@@ -65,12 +65,13 @@ public interface DataFileVersionStrategy<T extends Comparable<T> & Serializable>
   }
 
   String DATA_FILE_VERSION_STRATEGY_KEY = "org.apache.gobblin.dataFileVersionStrategy";
+  String DEFAULT_DATA_FILE_VERSION_STRATEGY = "modtime";
 
   /**
    * Instantiate a {@link DataFileVersionStrategy} according to input configuration.
    */
   static DataFileVersionStrategy instantiateDataFileVersionStrategy(FileSystem fs, Config config) throws IOException {
-    String versionStrategy = ConfigUtils.getString(config, DATA_FILE_VERSION_STRATEGY_KEY, ModTimeDataFileVersionStrategy.Factory.class.getName());
+    String versionStrategy = ConfigUtils.getString(config, DATA_FILE_VERSION_STRATEGY_KEY, DEFAULT_DATA_FILE_VERSION_STRATEGY);
 
     ClassAliasResolver resolver = new ClassAliasResolver(DataFileVersionFactory.class);
 
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ModTimeDataFileVersionStrategy.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ModTimeDataFileVersionStrategy.java
index 2f2fa22..46008f7 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ModTimeDataFileVersionStrategy.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ModTimeDataFileVersionStrategy.java
@@ -28,6 +28,8 @@ import com.typesafe.config.Config;
 
 import lombok.Data;
 
+import org.apache.gobblin.annotation.Alias;
+
 
 /**
  * An implementation of {@link DataFileVersionStrategy} that uses modtime as the file version.
@@ -37,6 +39,7 @@ import lombok.Data;
 @Data
 public class ModTimeDataFileVersionStrategy implements DataFileVersionStrategy<Long> {
 
+  @Alias(value = "modtime")
 	public static class Factory implements DataFileVersionStrategy.DataFileVersionFactory<Long> {
 		@Override
 		public DataFileVersionStrategy<Long> createDataFileVersionStrategy(FileSystem fs, Config config) {