You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/02/17 18:42:30 UTC

[gobblin] branch master updated: [GOBBLIN-1602] Change hive table location and partition check to validate using FS r… (#3459)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 91a2d1c  [GOBBLIN-1602] Change hive table location and partition check to validate using FS r… (#3459)
91a2d1c is described below

commit 91a2d1cc679a8285f7b05f76d16c17eb20206525
Author: William Lo <lo...@gmail.com>
AuthorDate: Thu Feb 17 10:42:26 2022 -0800

    [GOBBLIN-1602] Change hive table location and partition check to validate using FS r… (#3459)
    
    * Change hive table location and partition check to validate using FS resolvePath to resolve logical paths
    
    * Add tests for Unpartitioned file set
    
    * Address review, add additional throw if locations mismatch for partition location validation
    
    * Fix checkstyles again
    
    * allow partial success policy for workunits
---
 .../management/copy/hive/HiveCopyEntityHelper.java |  6 ++--
 .../management/copy/hive/HivePartitionFileSet.java | 20 +++++++----
 .../data/management/copy/hive/HiveUtils.java       | 17 +++++++++
 .../copy/hive/UnpartitionedTableFileSet.java       |  6 ++--
 .../copy/hive/UnpartitionedTableFileSetTest.java   | 40 ++++++++++++++++++++++
 5 files changed, 78 insertions(+), 11 deletions(-)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
index 1403e20..a1ce36c 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
@@ -750,9 +750,9 @@ public class HiveCopyEntityHelper {
 
   private void checkPartitionedTableCompatibility(Table desiredTargetTable, Table existingTargetTable)
       throws IOException {
-    if (!desiredTargetTable.getDataLocation().equals(existingTargetTable.getDataLocation())) {
-      throw new HiveTableLocationNotMatchException(desiredTargetTable.getDataLocation(),
-          existingTargetTable.getDataLocation());
+
+    if (HiveUtils.areTablePathsEquivalent(this.targetFs, desiredTargetTable.getDataLocation(), existingTargetTable.getDataLocation())) {
+      throw new HiveTableLocationNotMatchException(desiredTargetTable.getDataLocation(), existingTargetTable.getDataLocation());
     }
 
     if (desiredTargetTable.isPartitioned() != existingTargetTable.isPartitioned()) {
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
index 8f1f208..62ead53 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Properties;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.data.management.copy.CopyEntity;
 import org.apache.gobblin.data.management.copy.CopyableFile;
 import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
@@ -40,6 +41,8 @@ import org.apache.gobblin.hive.spec.HiveSpec;
 import org.apache.gobblin.hive.spec.SimpleHiveSpec;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.MultiTimingEvent;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.commit.DeleteFileCommitStep;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -99,7 +102,13 @@ public class HivePartitionFileSet extends HiveFileSet {
               hiveCopyEntityHelper.getExistingEntityPolicy() != HiveCopyEntityHelper.ExistingEntityPolicy.REPLACE_TABLE_AND_PARTITIONS) {
             log.error("Source and target partitions are not compatible. Aborting copy of partition " + this.partition,
                 ioe);
-            return Lists.newArrayList();
+            // Silence error and continue processing workunits if we allow partial success
+            if (ConfigUtils.getString(hiveCopyEntityHelper.getConfiguration().getConfig(), ConfigurationKeys.JOB_COMMIT_POLICY_KEY,
+                JobCommitPolicy.COMMIT_ON_FULL_SUCCESS.toString()).equals(JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS.toString())) {
+              return Lists.newArrayList();
+            } else {
+              throw ioe;
+            }
           }
           log.warn("Source and target partitions are not compatible. Will override target partition: " + ioe.getMessage());
           log.debug("Incompatibility details: ", ioe);
@@ -194,12 +203,11 @@ public class HivePartitionFileSet extends HiveFileSet {
     }
   }
 
-  private static void checkPartitionCompatibility(Partition desiredTargetPartition, Partition existingTargetPartition)
+  private void checkPartitionCompatibility(Partition desiredTargetPartition, Partition existingTargetPartition)
       throws IOException {
-    if (!desiredTargetPartition.getDataLocation().equals(existingTargetPartition.getDataLocation())) {
-      throw new IOException(
-          String.format("Desired target location %s and already registered target location %s do not agree.",
-              desiredTargetPartition.getDataLocation(), existingTargetPartition.getDataLocation()));
+    if (!HiveUtils.areTablePathsEquivalent(hiveCopyEntityHelper.getTargetFs(), desiredTargetPartition.getDataLocation(),
+        existingTargetPartition.getDataLocation())) {
+      throw new HiveTableLocationNotMatchException(desiredTargetPartition.getDataLocation(), existingTargetPartition.getDataLocation());
     }
   }
 }
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java
index a982e8a..0bc57b4 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.data.management.copy.hive;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -24,6 +25,7 @@ import java.util.Set;
 
 import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -167,4 +169,19 @@ public class HiveUtils {
   public static boolean isPartitioned(Table table) {
     return table.isPartitioned();
   }
+
+  /**
+   * @param fs User configured filesystem of the target table
+   * @param userSpecifiedPath user specified path of the copy table location or partition
+   * @param existingTablePath path of an already registered Hive table or partition
+   * @return true if the filesystem resolves them to be equivalent, false otherwise
+   */
+  public static boolean areTablePathsEquivalent(FileSystem fs, Path userSpecifiedPath, Path existingTablePath) throws IOException {
+    try {
+      return fs.resolvePath(existingTablePath).equals(fs.resolvePath(userSpecifiedPath));
+    } catch (FileNotFoundException e) {
+      // The userSpecifiedPath must not exist here, so the paths are not equal
+      return false;
+    }
+  }
 }
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
index 86cc490..527fd12 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
@@ -64,11 +64,13 @@ public class UnpartitionedTableFileSet extends HiveFileSet {
 
     Optional<Table> existingTargetTable = this.helper.getExistingTargetTable();
     if (existingTargetTable.isPresent()) {
-      if (!this.helper.getTargetTable().getDataLocation().equals(existingTargetTable.get().getDataLocation())) {
+      // Use update policy if user defined table path for their copy location does not match pre-existing table path
+      if (!HiveUtils.areTablePathsEquivalent(this.helper.getTargetFs(), this.helper.getTargetTable().getDataLocation(),
+          existingTargetTable.get().getDataLocation())) {
         switch (this.helper.getExistingEntityPolicy()){
           case UPDATE_TABLE:
             // Update the location of files while keep the existing table entity.
-            log.warn("Source table will not be deregistered while file locaiton has been changed, update source table's"
+            log.warn("Source table will not be deregistered while file location has been changed, update source table's"
                 + " file location to" + this.helper.getTargetTable().getDataLocation());
             existingTargetTable = Optional.absent();
             break ;
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSetTest.java
index b3102e4..28ab815 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSetTest.java
@@ -18,13 +18,17 @@
 package org.apache.gobblin.data.management.copy.hive;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Predicates;
 import org.apache.gobblin.data.management.copy.CopyEntity;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.junit.Assert;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
+import static org.mockito.AdditionalAnswers.returnsFirstArg;
 
 import java.util.List;
 
@@ -44,11 +48,47 @@ public class UnpartitionedTableFileSetTest {
         Mockito.when(helper.getDataset()).thenReturn(hiveDataset);
         Mockito.when(helper.getExistingTargetTable()).thenReturn(Optional.of(existingTargetTable));
         Mockito.when(helper.getTargetTable()).thenReturn(table);
+        // Mock filesystem resolver
+        FileSystem mockFS = Mockito.mock(FileSystem.class);
+        Mockito.when(helper.getTargetFs()).thenReturn(mockFS);
+        Mockito.when(mockFS.resolvePath(Mockito.any())).then(returnsFirstArg());
+
+        Mockito.when(helper.getExistingEntityPolicy()).thenReturn(HiveCopyEntityHelper.ExistingEntityPolicy.ABORT);
+        MetricContext metricContext = MetricContext.builder("testUnpartitionedTableFileSet").build();
+        EventSubmitter eventSubmitter = new EventSubmitter.Builder(metricContext,"loc.nomatch.exp").build();
+        Mockito.when(helper.getEventSubmitter()).thenReturn(eventSubmitter);
+        UnpartitionedTableFileSet upts = new UnpartitionedTableFileSet("testLocationMatch",hiveDataset,helper);
+        List<CopyEntity> copyEntities = (List<CopyEntity>)upts.generateCopyEntities();
+    }
+
+    @Test
+    public void testHiveTableLocationMatchDifferentPathsResolved() throws Exception {
+        Path testPath = new Path("/testPath/db/table");
+        Path existingTablePath = new Path("/existing/testPath/db/table");
+        Table table = new Table("testDb","table1");
+        table.setDataLocation(testPath);
+        Table existingTargetTable = new Table("testDb","table1");
+        existingTargetTable.setDataLocation(existingTablePath);
+        HiveDataset hiveDataset = Mockito.mock(HiveDataset.class);
+        Mockito.when(hiveDataset.getTable()).thenReturn(table);
+        HiveCopyEntityHelper helper = Mockito.mock(HiveCopyEntityHelper.class);
+        Mockito.when(helper.getDataset()).thenReturn(hiveDataset);
+        Mockito.when(helper.getExistingTargetTable()).thenReturn(Optional.of(existingTargetTable));
+        Mockito.when(helper.getTargetTable()).thenReturn(table);
+        // Only test that the files will be empty and hive will mark that the paths are equivalent, shortcircuit out
+        Mockito.when(helper.getFastTableSkip()).thenReturn(Optional.of(Predicates.alwaysTrue()));
+        // Mock filesystem resolver
+        FileSystem mockFS = Mockito.mock(FileSystem.class);
+        Mockito.when(helper.getTargetFs()).thenReturn(mockFS);
+        Mockito.when(mockFS.resolvePath(Mockito.any())).thenReturn(new Path("hdfs://testPath/db/table"));
+
         Mockito.when(helper.getExistingEntityPolicy()).thenReturn(HiveCopyEntityHelper.ExistingEntityPolicy.ABORT);
         MetricContext metricContext = MetricContext.builder("testUnpartitionedTableFileSet").build();
         EventSubmitter eventSubmitter = new EventSubmitter.Builder(metricContext,"loc.nomatch.exp").build();
         Mockito.when(helper.getEventSubmitter()).thenReturn(eventSubmitter);
         UnpartitionedTableFileSet upts = new UnpartitionedTableFileSet("testLocationMatch",hiveDataset,helper);
         List<CopyEntity> copyEntities = (List<CopyEntity>)upts.generateCopyEntities();
+        // Size should be 0 since fast table skip predicate is always true
+        Assert.assertEquals(copyEntities.size(), 0);
     }
 }