You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/02/17 18:42:30 UTC
[gobblin] branch master updated: [GOBBLIN-1602] Change hive table location and partition check to validate using FS r… (#3459)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 91a2d1c [GOBBLIN-1602] Change hive table location and partition check to validate using FS r… (#3459)
91a2d1c is described below
commit 91a2d1cc679a8285f7b05f76d16c17eb20206525
Author: William Lo <lo...@gmail.com>
AuthorDate: Thu Feb 17 10:42:26 2022 -0800
[GOBBLIN-1602] Change hive table location and partition check to validate using FS r… (#3459)
* Change hive table location and partition check to validate using FS resolvePath to resolve logical paths
* Add tests for Unpartitioned file set
* Address review, add additional throw if locations mismatch for partition location validation
* Fix checkstyles again
* allow partial success policy for workunits
---
.../management/copy/hive/HiveCopyEntityHelper.java | 6 ++--
.../management/copy/hive/HivePartitionFileSet.java | 20 +++++++----
.../data/management/copy/hive/HiveUtils.java | 17 +++++++++
.../copy/hive/UnpartitionedTableFileSet.java | 6 ++--
.../copy/hive/UnpartitionedTableFileSetTest.java | 40 ++++++++++++++++++++++
5 files changed, 78 insertions(+), 11 deletions(-)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
index 1403e20..a1ce36c 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
@@ -750,9 +750,9 @@ public class HiveCopyEntityHelper {
private void checkPartitionedTableCompatibility(Table desiredTargetTable, Table existingTargetTable)
throws IOException {
- if (!desiredTargetTable.getDataLocation().equals(existingTargetTable.getDataLocation())) {
- throw new HiveTableLocationNotMatchException(desiredTargetTable.getDataLocation(),
- existingTargetTable.getDataLocation());
+
+ if (HiveUtils.areTablePathsEquivalent(this.targetFs, desiredTargetTable.getDataLocation(), existingTargetTable.getDataLocation())) {
+ throw new HiveTableLocationNotMatchException(desiredTargetTable.getDataLocation(), existingTargetTable.getDataLocation());
}
if (desiredTargetTable.isPartitioned() != existingTargetTable.isPartitioned()) {
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
index 8f1f208..62ead53 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Properties;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
@@ -40,6 +41,8 @@ import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.gobblin.hive.spec.SimpleHiveSpec;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.MultiTimingEvent;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.commit.DeleteFileCommitStep;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -99,7 +102,13 @@ public class HivePartitionFileSet extends HiveFileSet {
hiveCopyEntityHelper.getExistingEntityPolicy() != HiveCopyEntityHelper.ExistingEntityPolicy.REPLACE_TABLE_AND_PARTITIONS) {
log.error("Source and target partitions are not compatible. Aborting copy of partition " + this.partition,
ioe);
- return Lists.newArrayList();
+ // Silence error and continue processing workunits if we allow partial success
+ if (ConfigUtils.getString(hiveCopyEntityHelper.getConfiguration().getConfig(), ConfigurationKeys.JOB_COMMIT_POLICY_KEY,
+ JobCommitPolicy.COMMIT_ON_FULL_SUCCESS.toString()).equals(JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS.toString())) {
+ return Lists.newArrayList();
+ } else {
+ throw ioe;
+ }
}
log.warn("Source and target partitions are not compatible. Will override target partition: " + ioe.getMessage());
log.debug("Incompatibility details: ", ioe);
@@ -194,12 +203,11 @@ public class HivePartitionFileSet extends HiveFileSet {
}
}
- private static void checkPartitionCompatibility(Partition desiredTargetPartition, Partition existingTargetPartition)
+ private void checkPartitionCompatibility(Partition desiredTargetPartition, Partition existingTargetPartition)
throws IOException {
- if (!desiredTargetPartition.getDataLocation().equals(existingTargetPartition.getDataLocation())) {
- throw new IOException(
- String.format("Desired target location %s and already registered target location %s do not agree.",
- desiredTargetPartition.getDataLocation(), existingTargetPartition.getDataLocation()));
+ if (!HiveUtils.areTablePathsEquivalent(hiveCopyEntityHelper.getTargetFs(), desiredTargetPartition.getDataLocation(),
+ existingTargetPartition.getDataLocation())) {
+ throw new HiveTableLocationNotMatchException(desiredTargetPartition.getDataLocation(), existingTargetPartition.getDataLocation());
}
}
}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java
index a982e8a..0bc57b4 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.data.management.copy.hive;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -24,6 +25,7 @@ import java.util.Set;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -167,4 +169,19 @@ public class HiveUtils {
public static boolean isPartitioned(Table table) {
return table.isPartitioned();
}
+
+ /**
+ * @param fs User configured filesystem of the target table
+ * @param userSpecifiedPath user specified path of the copy table location or partition
+ * @param existingTablePath path of an already registered Hive table or partition
+ * @return true if the filesystem resolves them to be equivalent, false otherwise
+ */
+ public static boolean areTablePathsEquivalent(FileSystem fs, Path userSpecifiedPath, Path existingTablePath) throws IOException {
+ try {
+ return fs.resolvePath(existingTablePath).equals(fs.resolvePath(userSpecifiedPath));
+ } catch (FileNotFoundException e) {
+ // The userSpecifiedPath must not exist here, so the paths are not equal
+ return false;
+ }
+ }
}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
index 86cc490..527fd12 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
@@ -64,11 +64,13 @@ public class UnpartitionedTableFileSet extends HiveFileSet {
Optional<Table> existingTargetTable = this.helper.getExistingTargetTable();
if (existingTargetTable.isPresent()) {
- if (!this.helper.getTargetTable().getDataLocation().equals(existingTargetTable.get().getDataLocation())) {
+ // Use update policy if user defined table path for their copy location does not match pre-existing table path
+ if (!HiveUtils.areTablePathsEquivalent(this.helper.getTargetFs(), this.helper.getTargetTable().getDataLocation(),
+ existingTargetTable.get().getDataLocation())) {
switch (this.helper.getExistingEntityPolicy()){
case UPDATE_TABLE:
// Update the location of files while keep the existing table entity.
- log.warn("Source table will not be deregistered while file locaiton has been changed, update source table's"
+ log.warn("Source table will not be deregistered while file location has been changed, update source table's"
+ " file location to" + this.helper.getTargetTable().getDataLocation());
existingTargetTable = Optional.absent();
break ;
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSetTest.java
index b3102e4..28ab815 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSetTest.java
@@ -18,13 +18,17 @@
package org.apache.gobblin.data.management.copy.hive;
import com.google.common.base.Optional;
+import com.google.common.base.Predicates;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.junit.Assert;
import org.mockito.Mockito;
import org.testng.annotations.Test;
+import static org.mockito.AdditionalAnswers.returnsFirstArg;
import java.util.List;
@@ -44,11 +48,47 @@ public class UnpartitionedTableFileSetTest {
Mockito.when(helper.getDataset()).thenReturn(hiveDataset);
Mockito.when(helper.getExistingTargetTable()).thenReturn(Optional.of(existingTargetTable));
Mockito.when(helper.getTargetTable()).thenReturn(table);
+ // Mock filesystem resolver
+ FileSystem mockFS = Mockito.mock(FileSystem.class);
+ Mockito.when(helper.getTargetFs()).thenReturn(mockFS);
+ Mockito.when(mockFS.resolvePath(Mockito.any())).then(returnsFirstArg());
+
+ Mockito.when(helper.getExistingEntityPolicy()).thenReturn(HiveCopyEntityHelper.ExistingEntityPolicy.ABORT);
+ MetricContext metricContext = MetricContext.builder("testUnpartitionedTableFileSet").build();
+ EventSubmitter eventSubmitter = new EventSubmitter.Builder(metricContext,"loc.nomatch.exp").build();
+ Mockito.when(helper.getEventSubmitter()).thenReturn(eventSubmitter);
+ UnpartitionedTableFileSet upts = new UnpartitionedTableFileSet("testLocationMatch",hiveDataset,helper);
+ List<CopyEntity> copyEntities = (List<CopyEntity>)upts.generateCopyEntities();
+ }
+
+ @Test
+ public void testHiveTableLocationMatchDifferentPathsResolved() throws Exception {
+ Path testPath = new Path("/testPath/db/table");
+ Path existingTablePath = new Path("/existing/testPath/db/table");
+ Table table = new Table("testDb","table1");
+ table.setDataLocation(testPath);
+ Table existingTargetTable = new Table("testDb","table1");
+ existingTargetTable.setDataLocation(existingTablePath);
+ HiveDataset hiveDataset = Mockito.mock(HiveDataset.class);
+ Mockito.when(hiveDataset.getTable()).thenReturn(table);
+ HiveCopyEntityHelper helper = Mockito.mock(HiveCopyEntityHelper.class);
+ Mockito.when(helper.getDataset()).thenReturn(hiveDataset);
+ Mockito.when(helper.getExistingTargetTable()).thenReturn(Optional.of(existingTargetTable));
+ Mockito.when(helper.getTargetTable()).thenReturn(table);
+ // Only test that the files will be empty and hive will mark that the paths are equivalent, shortcircuit out
+ Mockito.when(helper.getFastTableSkip()).thenReturn(Optional.of(Predicates.alwaysTrue()));
+ // Mock filesystem resolver
+ FileSystem mockFS = Mockito.mock(FileSystem.class);
+ Mockito.when(helper.getTargetFs()).thenReturn(mockFS);
+ Mockito.when(mockFS.resolvePath(Mockito.any())).thenReturn(new Path("hdfs://testPath/db/table"));
+
Mockito.when(helper.getExistingEntityPolicy()).thenReturn(HiveCopyEntityHelper.ExistingEntityPolicy.ABORT);
MetricContext metricContext = MetricContext.builder("testUnpartitionedTableFileSet").build();
EventSubmitter eventSubmitter = new EventSubmitter.Builder(metricContext,"loc.nomatch.exp").build();
Mockito.when(helper.getEventSubmitter()).thenReturn(eventSubmitter);
UnpartitionedTableFileSet upts = new UnpartitionedTableFileSet("testLocationMatch",hiveDataset,helper);
List<CopyEntity> copyEntities = (List<CopyEntity>)upts.generateCopyEntities();
+ // Size should be 0 since fast table skip predicate is always true
+ Assert.assertEquals(copyEntities.size(), 0);
}
}