You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/13 00:10:15 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-729] Add version strategy support for HiveCopyDataset

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b4cd9e  [GOBBLIN-729] Add version strategy support for HiveCopyDataset
5b4cd9e is described below

commit 5b4cd9e241e6cf00bf72645e0ca35a132c36089e
Author: Kuai Yu <ku...@linkedin.com>
AuthorDate: Fri Apr 12 17:10:09 2019 -0700

    [GOBBLIN-729] Add version strategy support for HiveCopyDataset
    
    Closes #2596 from yukuai518/hivest
---
 .../management/copy/hive/HiveCopyEntityHelper.java | 50 ++++++++++++++++++----
 .../copy/hive/HiveLocationDescriptor.java          | 15 +++++++
 .../copy/hive/HiveCopyEntityHelperTest.java        | 15 ++++++-
 3 files changed, 70 insertions(+), 10 deletions(-)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
index 9553959..019667d 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
@@ -134,7 +134,7 @@ public class HiveCopyEntityHelper {
       DeregisterFileDeleteMethod.NO_DELETE;
 
   /**
-   * Config key to specify if {@link IMetaStoreClient }'s filtering method {@link listPartitionsByFilter} is not enough
+   * Config key to specify if {@link IMetaStoreClient }'s filtering method {@link IMetaStoreClient#listPartitionsByFilter} is not enough
    * for filtering out specific partitions.
    * For example, if you specify "Path" as the filter type and "Hourly" as the filtering condition,
    * partitions with Path containing '/Hourly/' will be kept.
@@ -187,7 +187,7 @@ public class HiveCopyEntityHelper {
   private final Optional<CommitStep> tableRegistrationStep;
   private final Map<List<String>, Partition> sourcePartitions;
   private final Map<List<String>, Partition> targetPartitions;
-
+  private final boolean enforceFileSizeMatch;
   private final EventSubmitter eventSubmitter;
   @Getter
   protected final HiveTargetPathHelper targetPathHelper;
@@ -265,6 +265,7 @@ public class HiveCopyEntityHelper {
       this.targetFs = targetFs;
 
       this.targetPathHelper = new HiveTargetPathHelper(this.dataset);
+      this.enforceFileSizeMatch = configuration.isEnforceFileLengthMatch();
       this.hiveRegProps = new HiveRegProps(new State(this.dataset.getProperties()));
       this.targetURI = Optional.fromNullable(this.dataset.getProperties().getProperty(TARGET_METASTORE_URI_KEY));
       this.targetClientPool = HiveMetastoreClientPool.get(this.dataset.getProperties(), this.targetURI);
@@ -587,8 +588,29 @@ public class HiveCopyEntityHelper {
       HiveLocationDescriptor desiredTargetLocation, Optional<HiveLocationDescriptor> currentTargetLocation,
       Optional<Partition> partition, MultiTimingEvent multiTimer, HiveCopyEntityHelper helper) throws IOException {
 
+    // populate version strategy before analyzing diffs
+    sourceLocation.populateDataFileVersionStrategy();
+    desiredTargetLocation.populateDataFileVersionStrategy();
+
     DiffPathSet.DiffPathSetBuilder builder = DiffPathSet.builder();
 
+    // check the strategy is not empty
+    if (!sourceLocation.versionStrategy.isPresent() || !desiredTargetLocation.versionStrategy.isPresent()) {
+      log.warn("Version strategy doesn't exist ({},{}), cannot handle copy.",
+          sourceLocation.versionStrategy.isPresent(),
+          desiredTargetLocation.versionStrategy.isPresent());
+      return builder.build();
+    }
+
+    // check if the src and dst strategy are the same
+    if (!sourceLocation.versionStrategy.get().getClass().getName()
+        .equals(desiredTargetLocation.versionStrategy.get().getClass().getName())) {
+      log.warn("Version strategy src: {} and dst: {} doesn't match, cannot handle copy.",
+          sourceLocation.versionStrategy.get().getClass().getName(),
+          desiredTargetLocation.versionStrategy.get().getClass().getName());
+      return builder.build();
+    }
+
     multiTimer.nextStage(Stages.SOURCE_PATH_LISTING);
     // These are the paths at the source
     Map<Path, FileStatus> sourcePaths = sourceLocation.getPaths();
@@ -616,8 +638,23 @@ public class HiveCopyEntityHelper {
       if (desiredTargetExistingPaths.containsKey(newPath)) {
         // If the file exists at the destination, check whether it should be replaced, if not, no need to copy
         FileStatus existingTargetStatus = desiredTargetExistingPaths.get(newPath);
-        if (!helper.shouldReplaceFile(existingTargetStatus, sourcePath)) {
-          shouldCopy = false;
+
+        Comparable srcVer = sourceLocation.versionStrategy.get().getVersion(sourcePath.getPath());
+        Comparable dstVer = desiredTargetLocation.versionStrategy.get().getVersion(existingTargetStatus.getPath());
+
+        // destination has higher version, skip the copy
+        if (srcVer.compareTo(dstVer) <= 0) {
+          if (!helper.isEnforceFileSizeMatch() || existingTargetStatus.getLen() == sourcePath.getLen()) {
+            log.debug("Copy from src {} (v:{}) to dst {} (v:{}) can be skipped.",
+                sourcePath.getPath(), srcVer, existingTargetStatus.getPath(), dstVer);
+            shouldCopy = false;
+          } else {
+            log.debug("Copy from src {} (v:{}) to dst {} (v:{}) can not be skipped due to unmatched file length.",
+                sourcePath.getPath(), srcVer, existingTargetStatus.getPath(), dstVer);
+          }
+        } else {
+          log.debug("Copy from src {} (v:{}) to dst {} (v:{}) is needed due to a higher version.",
+              sourcePath.getPath(), srcVer, existingTargetStatus.getPath(), dstVer);
         }
       }
       if (shouldCopy) {
@@ -663,11 +700,6 @@ public class HiveCopyEntityHelper {
     return builder.build();
   }
 
-  private static boolean shouldReplaceFile(FileStatus referencePath, FileStatus replacementFile) {
-    return replacementFile.getLen() != referencePath.getLen()
-        || referencePath.getModificationTime() < replacementFile.getModificationTime();
-  }
-
   private void checkPartitionedTableCompatibility(Table desiredTargetTable, Table existingTargetTable)
       throws IOException {
     if (!desiredTargetTable.getDataLocation().equals(existingTargetTable.getDataLocation())) {
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveLocationDescriptor.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveLocationDescriptor.java
index d31c9cb..64969de 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveLocationDescriptor.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveLocationDescriptor.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -30,15 +31,19 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.mapred.InputFormat;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 
 import org.apache.gobblin.data.management.copy.RecursivePathFinder;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
 
 /**
  * Contains data for a Hive location as well as additional data if {@link #HIVE_DATASET_COPY_ADDITIONAL_PATHS_RECURSIVELY_ENABLED} set to true.
  */
 @Data
+@Slf4j
 public class HiveLocationDescriptor {
   public static final String HIVE_DATASET_COPY_ADDITIONAL_PATHS_RECURSIVELY_ENABLED =
       HiveDatasetFinder.HIVE_DATASET_PREFIX + ".copy.additional.paths.recursively.enabled";
@@ -57,6 +62,16 @@ public class HiveLocationDescriptor {
   protected final InputFormat<?, ?> inputFormat;
   protected final FileSystem fileSystem;
   protected final Properties properties;
+  protected Optional<DataFileVersionStrategy> versionStrategy;
+
+  protected void populateDataFileVersionStrategy() {
+    try {
+      this.versionStrategy = Optional.of(DataFileVersionStrategy
+          .instantiateDataFileVersionStrategy(fileSystem, ConfigUtils.propertiesToConfig(properties)));
+    } catch (IOException e) {
+      log.error("Cannot generate version strategy due to {}", e);
+    }
+  }
 
   public Map<Path, FileStatus> getPaths() throws IOException {
 
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
index 2f252cc..91a2e67 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
@@ -26,6 +26,7 @@ import java.util.Properties;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -39,6 +40,8 @@ import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import lombok.AllArgsConstructor;
+
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.data.management.copy.CopyEntity;
 import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
@@ -77,6 +80,7 @@ public class HiveCopyEntityHelperTest {
 
     MultiTimingEvent timer = Mockito.mock(MultiTimingEvent.class);
     HiveCopyEntityHelper helper = Mockito.mock(HiveCopyEntityHelper.class);
+    Mockito.when(helper.isEnforceFileSizeMatch()).thenReturn(true);
     HiveTargetPathHelper targetPathHelper = Mockito.mock(HiveTargetPathHelper.class);
     Mockito.when(targetPathHelper
         .getTargetPath(Mockito.any(Path.class), Mockito.any(FileSystem.class), Mockito.any(Optional.class),
@@ -139,6 +143,7 @@ public class HiveCopyEntityHelperTest {
     HiveDataset hiveDataset = Mockito.mock(HiveDataset.class);
     MultiTimingEvent timer = Mockito.mock(MultiTimingEvent.class);
     HiveCopyEntityHelper helper = Mockito.mock(HiveCopyEntityHelper.class);
+    Mockito.when(helper.isEnforceFileSizeMatch()).thenReturn(true);
     HiveTargetPathHelper targetPathHelper = Mockito.mock(HiveTargetPathHelper.class);
     Mockito.when(helper.getDataset()).thenReturn(hiveDataset);
     Mockito.when(hiveDataset.getTable()).thenReturn(table);
@@ -200,6 +205,7 @@ public class HiveCopyEntityHelperTest {
     HiveDataset hiveDataset = Mockito.mock(HiveDataset.class);
     MultiTimingEvent timer = Mockito.mock(MultiTimingEvent.class);
     HiveCopyEntityHelper helper = Mockito.mock(HiveCopyEntityHelper.class);
+    Mockito.when(helper.isEnforceFileSizeMatch()).thenReturn(true);
     HiveTargetPathHelper targetPathHelper = Mockito.mock(HiveTargetPathHelper.class);
     Mockito.when(helper.getDataset()).thenReturn(hiveDataset);
     Mockito.when(hiveDataset.getTable()).thenReturn(table);
@@ -358,7 +364,7 @@ public class HiveCopyEntityHelperTest {
     Map<Path, FileStatus> paths;
 
     public TestLocationDescriptor(Map<Path, FileStatus> paths) {
-      super(null, null, null, null);
+      super(null, null, new TestLocationFs(paths), new Properties());
       this.paths = paths;
     }
 
@@ -369,4 +375,11 @@ public class HiveCopyEntityHelperTest {
     }
   }
 
+  @AllArgsConstructor
+  class TestLocationFs extends LocalFileSystem {
+    private Map<Path, FileStatus> paths;
+    public FileStatus getFileStatus(Path f) throws IOException {
+      return paths.get(f);
+    }
+  }
 }