You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:22 UTC
[04/50] incubator-gobblin git commit: [GOBBLIN-381][GOBBLIN-368] Add
ability to filter hidden directories for ConfigBasedDatasets
[GOBBLIN-381][GOBBLIN-368] Add ability to filter hidden directories for ConfigBasedDatasets
Closes #2260 from sv2000/gobblin-381
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/11abf9f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/11abf9f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/11abf9f5
Branch: refs/heads/0.12.0
Commit: 11abf9f5f604a90376f73c01ed5e4b73b14c35cf
Parents: f2f6e46
Author: suvasude <su...@linkedin.biz>
Authored: Wed Jan 24 14:43:56 2018 -0800
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Wed Jan 24 14:43:56 2018 -0800
----------------------------------------------------------------------
.../copy/replication/ConfigBasedDataset.java | 64 ++++----
.../copy/replication/HadoopFsEndPoint.java | 12 +-
.../replication/ReplicaHadoopFsEndPoint.java | 53 +++---
.../replication/SourceHadoopFsEndPoint.java | 37 +++--
.../replication/ConfigBasedDatasetTest.java | 162 +++++++++++++++++++
.../replication/ConfigBasedDatasetsTest.java | 135 ----------------
.../configBasedDatasetTest/src/_dir1/file1 | 1 +
.../configBasedDatasetTest/src/_dir1/file2 | 1 +
8 files changed, 263 insertions(+), 202 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
index 3881323..293034b 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
@@ -29,8 +29,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -71,14 +69,21 @@ public class ConfigBasedDataset implements CopyableDataset {
private final ReplicationConfiguration rc;
private String datasetURN;
private boolean watermarkEnabled;
+ private final PathFilter pathFilter;
+
+ //Apply filter to directories
+ private final boolean applyFilterToDirectories;
public ConfigBasedDataset(ReplicationConfiguration rc, Properties props, CopyRoute copyRoute) {
this.props = props;
this.copyRoute = copyRoute;
this.rc = rc;
calculateDatasetURN();
- this.watermarkEnabled = Boolean.parseBoolean
- (this.props.getProperty(ConfigBasedDatasetsFinder.WATERMARK_ENABLE, "true"));
+ this.watermarkEnabled =
+ Boolean.parseBoolean(this.props.getProperty(ConfigBasedDatasetsFinder.WATERMARK_ENABLE, "true"));
+ this.pathFilter = DatasetUtils.instantiatePathFilter(this.props);
+ this.applyFilterToDirectories =
+ Boolean.parseBoolean(this.props.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false"));
}
public ConfigBasedDataset(ReplicationConfiguration rc, Properties props, CopyRoute copyRoute, String datasetURN) {
@@ -86,9 +91,12 @@ public class ConfigBasedDataset implements CopyableDataset {
this.copyRoute = copyRoute;
this.rc = rc;
this.datasetURN = datasetURN;
+ this.pathFilter = DatasetUtils.instantiatePathFilter(this.props);
+ this.applyFilterToDirectories =
+ Boolean.parseBoolean(this.props.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false"));
}
- private void calculateDatasetURN(){
+ private void calculateDatasetURN() {
EndPoint e = this.copyRoute.getCopyTo();
if (e instanceof HadoopFsEndPoint) {
HadoopFsEndPoint copyTo = (HadoopFsEndPoint) e;
@@ -120,6 +128,14 @@ public class ConfigBasedDataset implements CopyableDataset {
return copyableFiles;
}
+ //For {@link HadoopFsEndPoint}s, set pathfilter and applyFilterToDirectories
+ HadoopFsEndPoint copyFrom = (HadoopFsEndPoint) copyFromRaw;
+ HadoopFsEndPoint copyTo = (HadoopFsEndPoint) copyToRaw;
+ copyFrom.setPathFilter(pathFilter);
+ copyFrom.setApplyFilterToDirectories(applyFilterToDirectories);
+ copyTo.setPathFilter(pathFilter);
+ copyTo.setApplyFilterToDirectories(applyFilterToDirectories);
+
if (this.watermarkEnabled) {
if ((!copyFromRaw.getWatermark().isPresent() && copyToRaw.getWatermark().isPresent()) || (
copyFromRaw.getWatermark().isPresent() && copyToRaw.getWatermark().isPresent()
@@ -132,8 +148,6 @@ public class ConfigBasedDataset implements CopyableDataset {
}
}
- HadoopFsEndPoint copyFrom = (HadoopFsEndPoint) copyFromRaw;
- HadoopFsEndPoint copyTo = (HadoopFsEndPoint) copyToRaw;
Configuration conf = HadoopUtils.newConfiguration();
FileSystem copyFromFs = FileSystem.get(copyFrom.getFsURI(), conf);
FileSystem copyToFs = FileSystem.get(copyTo.getFsURI(), conf);
@@ -141,20 +155,10 @@ public class ConfigBasedDataset implements CopyableDataset {
Collection<FileStatus> allFilesInSource = copyFrom.getFiles();
Collection<FileStatus> allFilesInTarget = copyTo.getFiles();
- final PathFilter pathFilter = DatasetUtils.instantiatePathFilter(this.props);
- Predicate<FileStatus> predicate = new Predicate<FileStatus>() {
- @Override
- public boolean apply(FileStatus input) {
- return pathFilter.accept(input.getPath());
- }
- };
-
- Set<FileStatus> copyFromFileStatuses = Sets.newHashSet(Collections2.filter(allFilesInSource, predicate));
+ Set<FileStatus> copyFromFileStatuses = Sets.newHashSet(allFilesInSource);
Map<Path, FileStatus> copyToFileMap = Maps.newHashMap();
- for(FileStatus f: allFilesInTarget){
- if(pathFilter.accept(f.getPath())){
- copyToFileMap.put(PathUtils.getPathWithoutSchemeAndAuthority(f.getPath()), f);
- }
+ for (FileStatus f : allFilesInTarget) {
+ copyToFileMap.put(PathUtils.getPathWithoutSchemeAndAuthority(f.getPath()), f);
}
Collection<Path> deletedPaths = Lists.newArrayList();
@@ -184,10 +188,11 @@ public class ConfigBasedDataset implements CopyableDataset {
deletedPaths.add(newPath);
}
- copyableFiles
- .add(CopyableFile.fromOriginAndDestination(copyFromFs, originFileStatus, copyToFs.makeQualified(newPath), copyConfiguration)
- .fileSet(PathUtils.getPathWithoutSchemeAndAuthority(copyTo.getDatasetPath()).toString()).build());
-
+ copyableFiles.add(
+ CopyableFile.fromOriginAndDestination(copyFromFs, originFileStatus, copyToFs.makeQualified(newPath),
+ copyConfiguration)
+ .fileSet(PathUtils.getPathWithoutSchemeAndAuthority(copyTo.getDatasetPath()).toString())
+ .build());
}
// clean up already checked paths
@@ -202,18 +207,17 @@ public class ConfigBasedDataset implements CopyableDataset {
// delete old files first
if (!deletedPaths.isEmpty()) {
DeleteFileCommitStep deleteCommitStep = DeleteFileCommitStep.fromPaths(copyToFs, deletedPaths, this.props);
- copyableFiles.add(new PrePublishStep(copyTo.getDatasetPath().toString(), Maps.<String, String> newHashMap(),
- deleteCommitStep, 0));
+ copyableFiles.add(
+ new PrePublishStep(copyTo.getDatasetPath().toString(), Maps.<String, String>newHashMap(), deleteCommitStep,
+ 0));
}
// generate the watermark file even if watermark checking is disabled. Make sure it can come into functional once disired.
if ((!watermarkMetadataCopied) && copyFrom.getWatermark().isPresent()) {
- copyableFiles.add(new PostPublishStep(copyTo.getDatasetPath().toString(), Maps.<String, String> newHashMap(),
+ copyableFiles.add(new PostPublishStep(copyTo.getDatasetPath().toString(), Maps.<String, String>newHashMap(),
new WatermarkMetadataGenerationCommitStep(copyTo.getFsURI().toString(), copyTo.getDatasetPath(),
- copyFrom.getWatermark().get()),
- 1));
+ copyFrom.getWatermark().get()), 1));
}
return copyableFiles;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/HadoopFsEndPoint.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/HadoopFsEndPoint.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/HadoopFsEndPoint.java
index 97fd20e..ea93ea8 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/HadoopFsEndPoint.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/HadoopFsEndPoint.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.data.management.copy.replication;
import java.io.IOException;
import java.net.URI;
+import lombok.Getter;
+import lombok.Setter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -28,9 +30,15 @@ import com.typesafe.config.Config;
import org.apache.gobblin.util.HadoopUtils;
import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.PathFilter;
+
@Slf4j
-public abstract class HadoopFsEndPoint implements EndPoint{
+@Getter
+@Setter
+public abstract class HadoopFsEndPoint implements EndPoint {
+ private PathFilter pathFilter;
+ private boolean applyFilterToDirectories;
/**
*
@@ -56,7 +64,7 @@ public abstract class HadoopFsEndPoint implements EndPoint{
* @param path The path to be checked. For fs availability checking, just use "/"
* @return If the filesystem/path exists or not.
*/
- public boolean isPathAvailable(Path path){
+ public boolean isPathAvailable(Path path) {
try {
Configuration conf = HadoopUtils.newConfiguration();
FileSystem fs = FileSystem.get(this.getFsURI(), conf);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicaHadoopFsEndPoint.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicaHadoopFsEndPoint.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicaHadoopFsEndPoint.java
index 8ab4e7e..024b239 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicaHadoopFsEndPoint.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicaHadoopFsEndPoint.java
@@ -72,30 +72,31 @@ public class ReplicaHadoopFsEndPoint extends HadoopFsEndPoint {
}
@Override
- public synchronized Collection<FileStatus> getFiles() throws IOException{
- if(filesInitialized){
+ public synchronized Collection<FileStatus> getFiles() throws IOException {
+ if (filesInitialized) {
return this.allFileStatus;
}
this.filesInitialized = true;
FileSystem fs = FileSystem.get(rc.getFsURI(), new Configuration());
- if(!fs.exists(this.rc.getPath())){
+ if (!fs.exists(this.rc.getPath())) {
return Collections.emptyList();
}
Collection<Path> validPaths = ReplicationDataValidPathPicker.getValidPaths(this);
- //ReplicationDataValidPathPicker.getValidPaths(fs, this.rc.getPath(), this.rdc);
+ //ReplicationDataValidPathPicker.getValidPaths(fs, this.rc.getPath(), this.rdc);
- for(Path p: validPaths){
- this.allFileStatus.addAll(FileListUtils.listFilesRecursively(fs, p));
+ for (Path p : validPaths) {
+ this.allFileStatus.addAll(
+ FileListUtils.listFilesRecursively(fs, p, super.getPathFilter(), super.isApplyFilterToDirectories()));
}
return this.allFileStatus;
}
@Override
public synchronized Optional<ComparableWatermark> getWatermark() {
- if(this.watermarkInitialized) {
+ if (this.watermarkInitialized) {
return this.cachedWatermark;
}
@@ -104,12 +105,12 @@ public class ReplicaHadoopFsEndPoint extends HadoopFsEndPoint {
Path metaData = new Path(rc.getPath(), WATERMARK_FILE);
FileSystem fs = FileSystem.get(rc.getFsURI(), new Configuration());
if (fs.exists(metaData)) {
- try(FSDataInputStream fin = fs.open(metaData)){
+ try (FSDataInputStream fin = fs.open(metaData)) {
InputStreamReader reader = new InputStreamReader(fin, Charsets.UTF_8);
String content = CharStreams.toString(reader);
Watermark w = WatermarkMetadataUtil.deserialize(content);
- if(w instanceof ComparableWatermark){
- this.cachedWatermark = Optional.of((ComparableWatermark)w);
+ if (w instanceof ComparableWatermark) {
+ this.cachedWatermark = Optional.of((ComparableWatermark) w);
}
}
return this.cachedWatermark;
@@ -120,7 +121,7 @@ public class ReplicaHadoopFsEndPoint extends HadoopFsEndPoint {
} catch (IOException e) {
log.warn("Can not find " + WATERMARK_FILE + " for replica " + this);
return this.cachedWatermark;
- } catch (WatermarkMetadataUtil.WatermarkMetadataMulFormatException e){
+ } catch (WatermarkMetadataUtil.WatermarkMetadataMulFormatException e) {
log.warn("Can not create watermark from " + WATERMARK_FILE + " for replica " + this);
return this.cachedWatermark;
}
@@ -143,8 +144,11 @@ public class ReplicaHadoopFsEndPoint extends HadoopFsEndPoint {
@Override
public String toString() {
- return Objects.toStringHelper(this.getClass()).add("is source", this.isSource()).add("end point name", this.getEndPointName())
- .add("hadoopfs config", this.rc).toString();
+ return Objects.toStringHelper(this.getClass())
+ .add("is source", this.isSource())
+ .add("end point name", this.getEndPointName())
+ .add("hadoopfs config", this.rc)
+ .toString();
}
@Override
@@ -153,7 +157,7 @@ public class ReplicaHadoopFsEndPoint extends HadoopFsEndPoint {
}
@Override
- public Path getDatasetPath(){
+ public Path getDatasetPath() {
return this.rc.getPath();
}
@@ -168,23 +172,30 @@ public class ReplicaHadoopFsEndPoint extends HadoopFsEndPoint {
@Override
public boolean equals(Object obj) {
- if (this == obj)
+ if (this == obj) {
return true;
- if (obj == null)
+ }
+ if (obj == null) {
return false;
- if (getClass() != obj.getClass())
+ }
+ if (getClass() != obj.getClass()) {
return false;
+ }
ReplicaHadoopFsEndPoint other = (ReplicaHadoopFsEndPoint) obj;
if (rc == null) {
- if (other.rc != null)
+ if (other.rc != null) {
return false;
- } else if (!rc.equals(other.rc))
+ }
+ } else if (!rc.equals(other.rc)) {
return false;
+ }
if (replicaName == null) {
- if (other.replicaName != null)
+ if (other.replicaName != null) {
return false;
- } else if (!replicaName.equals(other.replicaName))
+ }
+ } else if (!replicaName.equals(other.replicaName)) {
return false;
+ }
return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java
index 0769c5c..2a56f2e 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java
@@ -39,7 +39,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
-public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{
+public class SourceHadoopFsEndPoint extends HadoopFsEndPoint {
@Getter
private final HadoopFsReplicaConfig rc;
@@ -57,8 +57,8 @@ public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{
}
@Override
- public synchronized Collection<FileStatus> getFiles() throws IOException{
- if(!this.initialized){
+ public synchronized Collection<FileStatus> getFiles() throws IOException {
+ if (!this.initialized) {
this.getWatermark();
}
return this.allFileStatus;
@@ -66,7 +66,7 @@ public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{
@Override
public synchronized Optional<ComparableWatermark> getWatermark() {
- if(this.initialized) {
+ if (this.initialized) {
return this.cachedWatermark;
}
try {
@@ -74,8 +74,9 @@ public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{
FileSystem fs = FileSystem.get(rc.getFsURI(), new Configuration());
Collection<Path> validPaths = ReplicationDataValidPathPicker.getValidPaths(this);
- for(Path p: validPaths){
- this.allFileStatus.addAll(FileListUtils.listFilesRecursively(fs, p));
+ for (Path p : validPaths) {
+ this.allFileStatus.addAll(
+ FileListUtils.listFilesRecursively(fs, p, super.getPathFilter(), super.isApplyFilterToDirectories()));
}
for (FileStatus f : this.allFileStatus) {
@@ -115,8 +116,11 @@ public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{
@Override
public String toString() {
- return Objects.toStringHelper(this.getClass()).add("is source", this.isSource()).add("end point name", this.getEndPointName())
- .add("hadoopfs config", this.rc).toString();
+ return Objects.toStringHelper(this.getClass())
+ .add("is source", this.isSource())
+ .add("end point name", this.getEndPointName())
+ .add("hadoopfs config", this.rc)
+ .toString();
}
@Override
@@ -125,7 +129,7 @@ public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{
}
@Override
- public Path getDatasetPath(){
+ public Path getDatasetPath() {
return this.rc.getPath();
}
@@ -139,18 +143,23 @@ public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{
@Override
public boolean equals(Object obj) {
- if (this == obj)
+ if (this == obj) {
return true;
- if (obj == null)
+ }
+ if (obj == null) {
return false;
- if (getClass() != obj.getClass())
+ }
+ if (getClass() != obj.getClass()) {
return false;
+ }
SourceHadoopFsEndPoint other = (SourceHadoopFsEndPoint) obj;
if (rc == null) {
- if (other.rc != null)
+ if (other.rc != null) {
return false;
- } else if (!rc.equals(other.rc))
+ }
+ } else if (!rc.equals(other.rc)) {
return false;
+ }
return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java
new file mode 100644
index 0000000..f925243
--- /dev/null
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.replication;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.Set;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.data.management.dataset.DatasetUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
+import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
+import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
+import org.apache.gobblin.source.extractor.ComparableWatermark;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.commit.DeleteFileCommitStep;
+
+
+/**
+ * Unit test for {@link ConfigBasedDataset}
+ * @author mitu
+ *
+ */
+@Test(groups = {"gobblin.data.management.copy.replication"})
+@Slf4j
+public class ConfigBasedDatasetTest {
+
+ public Collection<? extends CopyEntity> testGetCopyableFilesHelper(String sourceDir, String destinationDir,
+ long sourceWatermark, boolean isFilterEnabled) throws Exception {
+ FileSystem localFs = FileSystem.getLocal(new Configuration());
+ URI local = localFs.getUri();
+
+ Properties properties = new Properties();
+ properties.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/publisher");
+ PathFilter pathFilter = DatasetUtils.instantiatePathFilter(properties);
+ boolean applyFilterToDirectories = false;
+ if (isFilterEnabled) {
+ properties.setProperty(DatasetUtils.CONFIGURATION_KEY_PREFIX + "path.filter.class",
+ "org.apache.gobblin.util.filters.HiddenFilter");
+ properties.setProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "true");
+
+ pathFilter = DatasetUtils.instantiatePathFilter(properties);
+ applyFilterToDirectories =
+ Boolean.parseBoolean(properties.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false"));
+ }
+
+ CopyConfiguration copyConfiguration =
+ CopyConfiguration.builder(FileSystem.getLocal(new Configuration()), properties)
+ .publishDir(new Path(destinationDir))
+ .preserve(PreserveAttributes.fromMnemonicString("ugp"))
+ .build();
+
+ ReplicationMetaData mockMetaData = Mockito.mock(ReplicationMetaData.class);
+ Mockito.when(mockMetaData.toString()).thenReturn("Mock Meta Data");
+
+ ReplicationConfiguration mockRC = Mockito.mock(ReplicationConfiguration.class);
+ Mockito.when(mockRC.getCopyMode()).thenReturn(ReplicationCopyMode.PULL);
+ Mockito.when(mockRC.getMetaData()).thenReturn(mockMetaData);
+
+ HadoopFsEndPoint copyFrom = Mockito.mock(HadoopFsEndPoint.class);
+ Mockito.when(copyFrom.getDatasetPath()).thenReturn(new Path(sourceDir));
+ Mockito.when(copyFrom.getFsURI()).thenReturn(local);
+ ComparableWatermark sw = new LongWatermark(sourceWatermark);
+ Mockito.when(copyFrom.getWatermark()).thenReturn(Optional.of(sw));
+ Mockito.when(copyFrom.getFiles())
+ .thenReturn(
+ FileListUtils.listFilesRecursively(localFs, new Path(sourceDir), pathFilter, applyFilterToDirectories));
+
+ HadoopFsEndPoint copyTo = Mockito.mock(HadoopFsEndPoint.class);
+ Mockito.when(copyTo.getDatasetPath()).thenReturn(new Path(destinationDir));
+ Mockito.when(copyTo.getFsURI()).thenReturn(local);
+ Optional<ComparableWatermark> tmp = Optional.absent();
+ Mockito.when(copyTo.getWatermark()).thenReturn(tmp);
+ Mockito.when(copyTo.getFiles())
+ .thenReturn(FileListUtils.listFilesRecursively(localFs, new Path(destinationDir), pathFilter,
+ applyFilterToDirectories));
+
+ CopyRoute route = Mockito.mock(CopyRoute.class);
+ Mockito.when(route.getCopyFrom()).thenReturn(copyFrom);
+ Mockito.when(route.getCopyTo()).thenReturn(copyTo);
+
+ ConfigBasedDataset dataset = new ConfigBasedDataset(mockRC, properties, route);
+ Collection<? extends CopyEntity> copyableFiles = dataset.getCopyableFiles(localFs, copyConfiguration);
+ return copyableFiles;
+ }
+
+ @Test
+ public void testGetCopyableFiles() throws Exception {
+ String sourceDir = getClass().getClassLoader().getResource("configBasedDatasetTest/src").getFile();
+ String destinationDir = getClass().getClassLoader().getResource("configBasedDatasetTest/dest").getFile();
+ long sourceWatermark = 100L;
+
+ Collection<? extends CopyEntity> copyableFiles =
+ testGetCopyableFilesHelper(sourceDir, destinationDir, sourceWatermark, false);
+ Assert.assertEquals(copyableFiles.size(), 8);
+ copyableFiles = testGetCopyableFilesHelper(sourceDir, destinationDir, sourceWatermark, true);
+ Assert.assertEquals(copyableFiles.size(), 6);
+
+ Set<Path> paths =
+ Sets.newHashSet(new Path("dir1/file2"), new Path("dir1/file1"), new Path("dir2/file1"), new Path("dir2/file3"));
+ for (CopyEntity copyEntity : copyableFiles) {
+ if (copyEntity instanceof CopyableFile) {
+ CopyableFile file = (CopyableFile) copyEntity;
+ Path originRelativePath =
+ PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(file.getOrigin().getPath()),
+ PathUtils.getPathWithoutSchemeAndAuthority(new Path(sourceDir)));
+ Path targetRelativePath =
+ PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(file.getDestination()),
+ PathUtils.getPathWithoutSchemeAndAuthority(new Path(destinationDir)));
+
+ Assert.assertTrue(paths.contains(originRelativePath));
+ Assert.assertTrue(paths.contains(targetRelativePath));
+ Assert.assertEquals(originRelativePath, targetRelativePath);
+ } else if (copyEntity instanceof PrePublishStep) {
+ PrePublishStep pre = (PrePublishStep) copyEntity;
+ Assert.assertTrue(pre.getStep() instanceof DeleteFileCommitStep);
+ // need to delete this file
+ Assert.assertTrue(pre.explain().indexOf("configBasedDatasetTest/dest/dir1/file1") > 0);
+ } else if (copyEntity instanceof PostPublishStep) {
+ PostPublishStep post = (PostPublishStep) copyEntity;
+ Assert.assertTrue(post.getStep() instanceof WatermarkMetadataGenerationCommitStep);
+ Assert.assertTrue(
+ post.explain().indexOf("dest/_metadata") > 0 && post.explain().indexOf("" + sourceWatermark) > 0);
+ } else {
+ throw new Exception("Wrong type");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetsTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetsTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetsTest.java
deleted file mode 100644
index 1965a41..0000000
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetsTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.data.management.copy.replication;
-
-import java.net.URI;
-import java.util.Collection;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.data.management.copy.CopyConfiguration;
-import org.apache.gobblin.data.management.copy.CopyEntity;
-import org.apache.gobblin.data.management.copy.CopyableFile;
-import org.apache.gobblin.data.management.copy.PreserveAttributes;
-import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
-import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
-import org.apache.gobblin.source.extractor.ComparableWatermark;
-import org.apache.gobblin.source.extractor.extract.LongWatermark;
-import org.apache.gobblin.util.FileListUtils;
-import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.util.commit.DeleteFileCommitStep;
-
-
-/**
- * Unit test for {@link ConfigBasedDatasets}
- * @author mitu
- *
- */
-@Test(groups = {"gobblin.data.management.copy.replication"})
-
-public class ConfigBasedDatasetsTest {
-
- @Test
- public void testGetCopyableFiles() throws Exception {
- String sourceDir = getClass().getClassLoader().getResource("configBasedDatasetTest/src").getFile();
- String destinationDir = getClass().getClassLoader().getResource("configBasedDatasetTest/dest").getFile();
- FileSystem localFs = FileSystem.getLocal(new Configuration());
- URI local = localFs.getUri();
- long sourceWatermark = 100L;
-
- Properties properties = new Properties();
- properties.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/publisher");
-
- CopyConfiguration copyConfiguration =
- CopyConfiguration.builder(FileSystem.getLocal(new Configuration()), properties).publishDir(new Path(destinationDir))
- .preserve(PreserveAttributes.fromMnemonicString("ugp")).build();
-
- ReplicationMetaData mockMetaData = Mockito.mock(ReplicationMetaData.class);
- Mockito.when(mockMetaData.toString()).thenReturn("Mock Meta Data");
-
- ReplicationConfiguration mockRC = Mockito.mock(ReplicationConfiguration.class);
- Mockito.when(mockRC.getCopyMode()).thenReturn(ReplicationCopyMode.PULL);
- Mockito.when(mockRC.getMetaData()).thenReturn(mockMetaData);
-
- HadoopFsEndPoint copyFrom = Mockito.mock(HadoopFsEndPoint.class);
- Mockito.when(copyFrom.getDatasetPath()).thenReturn(new Path(sourceDir));
- Mockito.when(copyFrom.getFsURI()).thenReturn(local);
- ComparableWatermark sw = new LongWatermark(sourceWatermark);
- Mockito.when(copyFrom.getWatermark()).thenReturn(Optional.of(sw));
- Mockito.when(copyFrom.getFiles()).thenReturn(FileListUtils.listFilesRecursively(localFs, new Path(sourceDir)));
-
- HadoopFsEndPoint copyTo = Mockito.mock(HadoopFsEndPoint.class);
- Mockito.when(copyTo.getDatasetPath()).thenReturn(new Path(destinationDir));
- Mockito.when(copyTo.getFsURI()).thenReturn(local);
- Optional<ComparableWatermark>tmp = Optional.absent();
- Mockito.when(copyTo.getWatermark()).thenReturn(tmp);
- Mockito.when(copyTo.getFiles()).thenReturn(FileListUtils.listFilesRecursively(localFs, new Path(destinationDir)));
-
- CopyRoute route = Mockito.mock(CopyRoute.class);
- Mockito.when(route.getCopyFrom()).thenReturn(copyFrom);
- Mockito.when(route.getCopyTo()).thenReturn(copyTo);
-
- ConfigBasedDataset dataset = new ConfigBasedDataset(mockRC, properties, route);
-
- Collection<? extends CopyEntity> copyableFiles = dataset.getCopyableFiles(localFs, copyConfiguration);
- Assert.assertEquals(copyableFiles.size(), 6);
-
- Set<Path> paths = Sets.newHashSet(new Path("dir1/file2"), new Path("dir1/file1"), new Path("dir2/file1"), new Path("dir2/file3"));
- for (CopyEntity copyEntity : copyableFiles) {
- if(copyEntity instanceof CopyableFile) {
- CopyableFile file = (CopyableFile) copyEntity;
- Path originRelativePath =
- PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(file.getOrigin().getPath()),
- PathUtils.getPathWithoutSchemeAndAuthority(new Path(sourceDir)));
- Path targetRelativePath =
- PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(file.getDestination()),
- PathUtils.getPathWithoutSchemeAndAuthority(new Path(destinationDir)));
-
- Assert.assertTrue(paths.contains(originRelativePath));
- Assert.assertTrue(paths.contains(targetRelativePath));
- Assert.assertEquals(originRelativePath, targetRelativePath);
- }
- else if(copyEntity instanceof PrePublishStep){
- PrePublishStep pre = (PrePublishStep)copyEntity;
- Assert.assertTrue(pre.getStep() instanceof DeleteFileCommitStep);
- // need to delete this file
- Assert.assertTrue(pre.explain().indexOf("configBasedDatasetTest/dest/dir1/file1") > 0);
- }
- else if(copyEntity instanceof PostPublishStep){
- PostPublishStep post = (PostPublishStep)copyEntity;
- Assert.assertTrue(post.getStep() instanceof WatermarkMetadataGenerationCommitStep);
- Assert.assertTrue(post.explain().indexOf("dest/_metadata") > 0 && post.explain().indexOf(""+sourceWatermark)>0);
- }
- else{
- throw new Exception("Wrong type");
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file1
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file1 b/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file1
new file mode 100644
index 0000000..d87e628
--- /dev/null
+++ b/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file1
@@ -0,0 +1 @@
+_dir1:file1content
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file2
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file2 b/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file2
new file mode 100644
index 0000000..248eb41
--- /dev/null
+++ b/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file2
@@ -0,0 +1 @@
+_dir1:file2content