You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/13 00:10:15 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-729] Add
version strategy support for HiveCopyDataset
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5b4cd9e [GOBBLIN-729] Add version strategy support for HiveCopyDataset
5b4cd9e is described below
commit 5b4cd9e241e6cf00bf72645e0ca35a132c36089e
Author: Kuai Yu <ku...@linkedin.com>
AuthorDate: Fri Apr 12 17:10:09 2019 -0700
[GOBBLIN-729] Add version strategy support for HiveCopyDataset
Closes #2596 from yukuai518/hivest
---
.../management/copy/hive/HiveCopyEntityHelper.java | 50 ++++++++++++++++++----
.../copy/hive/HiveLocationDescriptor.java | 15 +++++++
.../copy/hive/HiveCopyEntityHelperTest.java | 15 ++++++-
3 files changed, 70 insertions(+), 10 deletions(-)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
index 9553959..019667d 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
@@ -134,7 +134,7 @@ public class HiveCopyEntityHelper {
DeregisterFileDeleteMethod.NO_DELETE;
/**
- * Config key to specify if {@link IMetaStoreClient }'s filtering method {@link listPartitionsByFilter} is not enough
+ * Config key to specify if {@link IMetaStoreClient }'s filtering method {@link IMetaStoreClient#listPartitionsByFilter} is not enough
* for filtering out specific partitions.
* For example, if you specify "Path" as the filter type and "Hourly" as the filtering condition,
* partitions with Path containing '/Hourly/' will be kept.
@@ -187,7 +187,7 @@ public class HiveCopyEntityHelper {
private final Optional<CommitStep> tableRegistrationStep;
private final Map<List<String>, Partition> sourcePartitions;
private final Map<List<String>, Partition> targetPartitions;
-
+ private final boolean enforceFileSizeMatch;
private final EventSubmitter eventSubmitter;
@Getter
protected final HiveTargetPathHelper targetPathHelper;
@@ -265,6 +265,7 @@ public class HiveCopyEntityHelper {
this.targetFs = targetFs;
this.targetPathHelper = new HiveTargetPathHelper(this.dataset);
+ this.enforceFileSizeMatch = configuration.isEnforceFileLengthMatch();
this.hiveRegProps = new HiveRegProps(new State(this.dataset.getProperties()));
this.targetURI = Optional.fromNullable(this.dataset.getProperties().getProperty(TARGET_METASTORE_URI_KEY));
this.targetClientPool = HiveMetastoreClientPool.get(this.dataset.getProperties(), this.targetURI);
@@ -587,8 +588,29 @@ public class HiveCopyEntityHelper {
HiveLocationDescriptor desiredTargetLocation, Optional<HiveLocationDescriptor> currentTargetLocation,
Optional<Partition> partition, MultiTimingEvent multiTimer, HiveCopyEntityHelper helper) throws IOException {
+ // populate version strategy before analyzing diffs
+ sourceLocation.populateDataFileVersionStrategy();
+ desiredTargetLocation.populateDataFileVersionStrategy();
+
DiffPathSet.DiffPathSetBuilder builder = DiffPathSet.builder();
+ // check the strategy is not empty
+ if (!sourceLocation.versionStrategy.isPresent() || !desiredTargetLocation.versionStrategy.isPresent()) {
+ log.warn("Version strategy doesn't exist ({},{}), cannot handle copy.",
+ sourceLocation.versionStrategy.isPresent(),
+ desiredTargetLocation.versionStrategy.isPresent());
+ return builder.build();
+ }
+
+ // check if the src and dst strategy are the same
+ if (!sourceLocation.versionStrategy.get().getClass().getName()
+ .equals(desiredTargetLocation.versionStrategy.get().getClass().getName())) {
+ log.warn("Version strategy src: {} and dst: {} doesn't match, cannot handle copy.",
+ sourceLocation.versionStrategy.get().getClass().getName(),
+ desiredTargetLocation.versionStrategy.get().getClass().getName());
+ return builder.build();
+ }
+
multiTimer.nextStage(Stages.SOURCE_PATH_LISTING);
// These are the paths at the source
Map<Path, FileStatus> sourcePaths = sourceLocation.getPaths();
@@ -616,8 +638,23 @@ public class HiveCopyEntityHelper {
if (desiredTargetExistingPaths.containsKey(newPath)) {
// If the file exists at the destination, check whether it should be replaced, if not, no need to copy
FileStatus existingTargetStatus = desiredTargetExistingPaths.get(newPath);
- if (!helper.shouldReplaceFile(existingTargetStatus, sourcePath)) {
- shouldCopy = false;
+
+ Comparable srcVer = sourceLocation.versionStrategy.get().getVersion(sourcePath.getPath());
+ Comparable dstVer = desiredTargetLocation.versionStrategy.get().getVersion(existingTargetStatus.getPath());
+
+ // destination has higher version, skip the copy
+ if (srcVer.compareTo(dstVer) <= 0) {
+ if (!helper.isEnforceFileSizeMatch() || existingTargetStatus.getLen() == sourcePath.getLen()) {
+ log.debug("Copy from src {} (v:{}) to dst {} (v:{}) can be skipped.",
+ sourcePath.getPath(), srcVer, existingTargetStatus.getPath(), dstVer);
+ shouldCopy = false;
+ } else {
+ log.debug("Copy from src {} (v:{}) to dst {} (v:{}) can not be skipped due to unmatched file length.",
+ sourcePath.getPath(), srcVer, existingTargetStatus.getPath(), dstVer);
+ }
+ } else {
+ log.debug("Copy from src {} (v:{}) to dst {} (v:{}) is needed due to a higher version.",
+ sourcePath.getPath(), srcVer, existingTargetStatus.getPath(), dstVer);
}
}
if (shouldCopy) {
@@ -663,11 +700,6 @@ public class HiveCopyEntityHelper {
return builder.build();
}
- private static boolean shouldReplaceFile(FileStatus referencePath, FileStatus replacementFile) {
- return replacementFile.getLen() != referencePath.getLen()
- || referencePath.getModificationTime() < replacementFile.getModificationTime();
- }
-
private void checkPartitionedTableCompatibility(Table desiredTargetTable, Table existingTargetTable)
throws IOException {
if (!desiredTargetTable.getDataLocation().equals(existingTargetTable.getDataLocation())) {
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveLocationDescriptor.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveLocationDescriptor.java
index d31c9cb..64969de 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveLocationDescriptor.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveLocationDescriptor.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Properties;
import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -30,15 +31,19 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.mapred.InputFormat;
+import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import org.apache.gobblin.data.management.copy.RecursivePathFinder;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
/**
* Contains data for a Hive location as well as additional data if {@link #HIVE_DATASET_COPY_ADDITIONAL_PATHS_RECURSIVELY_ENABLED} set to true.
*/
@Data
+@Slf4j
public class HiveLocationDescriptor {
public static final String HIVE_DATASET_COPY_ADDITIONAL_PATHS_RECURSIVELY_ENABLED =
HiveDatasetFinder.HIVE_DATASET_PREFIX + ".copy.additional.paths.recursively.enabled";
@@ -57,6 +62,16 @@ public class HiveLocationDescriptor {
protected final InputFormat<?, ?> inputFormat;
protected final FileSystem fileSystem;
protected final Properties properties;
+ protected Optional<DataFileVersionStrategy> versionStrategy;
+
+ protected void populateDataFileVersionStrategy() {
+ try {
+ this.versionStrategy = Optional.of(DataFileVersionStrategy
+ .instantiateDataFileVersionStrategy(fileSystem, ConfigUtils.propertiesToConfig(properties)));
+ } catch (IOException e) {
+ log.error("Cannot generate version strategy due to {}", e);
+ }
+ }
public Map<Path, FileStatus> getPaths() throws IOException {
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
index 2f252cc..91a2e67 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
@@ -26,6 +26,7 @@ import java.util.Properties;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -39,6 +40,8 @@ import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import lombok.AllArgsConstructor;
+
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
@@ -77,6 +80,7 @@ public class HiveCopyEntityHelperTest {
MultiTimingEvent timer = Mockito.mock(MultiTimingEvent.class);
HiveCopyEntityHelper helper = Mockito.mock(HiveCopyEntityHelper.class);
+ Mockito.when(helper.isEnforceFileSizeMatch()).thenReturn(true);
HiveTargetPathHelper targetPathHelper = Mockito.mock(HiveTargetPathHelper.class);
Mockito.when(targetPathHelper
.getTargetPath(Mockito.any(Path.class), Mockito.any(FileSystem.class), Mockito.any(Optional.class),
@@ -139,6 +143,7 @@ public class HiveCopyEntityHelperTest {
HiveDataset hiveDataset = Mockito.mock(HiveDataset.class);
MultiTimingEvent timer = Mockito.mock(MultiTimingEvent.class);
HiveCopyEntityHelper helper = Mockito.mock(HiveCopyEntityHelper.class);
+ Mockito.when(helper.isEnforceFileSizeMatch()).thenReturn(true);
HiveTargetPathHelper targetPathHelper = Mockito.mock(HiveTargetPathHelper.class);
Mockito.when(helper.getDataset()).thenReturn(hiveDataset);
Mockito.when(hiveDataset.getTable()).thenReturn(table);
@@ -200,6 +205,7 @@ public class HiveCopyEntityHelperTest {
HiveDataset hiveDataset = Mockito.mock(HiveDataset.class);
MultiTimingEvent timer = Mockito.mock(MultiTimingEvent.class);
HiveCopyEntityHelper helper = Mockito.mock(HiveCopyEntityHelper.class);
+ Mockito.when(helper.isEnforceFileSizeMatch()).thenReturn(true);
HiveTargetPathHelper targetPathHelper = Mockito.mock(HiveTargetPathHelper.class);
Mockito.when(helper.getDataset()).thenReturn(hiveDataset);
Mockito.when(hiveDataset.getTable()).thenReturn(table);
@@ -358,7 +364,7 @@ public class HiveCopyEntityHelperTest {
Map<Path, FileStatus> paths;
public TestLocationDescriptor(Map<Path, FileStatus> paths) {
- super(null, null, null, null);
+ super(null, null, new TestLocationFs(paths), new Properties());
this.paths = paths;
}
@@ -369,4 +375,11 @@ public class HiveCopyEntityHelperTest {
}
}
+ @AllArgsConstructor
+ class TestLocationFs extends LocalFileSystem {
+ private Map<Path, FileStatus> paths;
+ public FileStatus getFileStatus(Path f) throws IOException {
+ return paths.get(f);
+ }
+ }
}