You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2018/01/24 22:44:01 UTC

incubator-gobblin git commit: [GOBBLIN-381][GOBBLIN-368] Add ability to filter hidden directories for ConfigBasedDatasets

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master f2f6e4685 -> 11abf9f5f


[GOBBLIN-381][GOBBLIN-368] Add ability to filter hidden directories for ConfigBasedDatasets

Closes #2260 from sv2000/gobblin-381


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/11abf9f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/11abf9f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/11abf9f5

Branch: refs/heads/master
Commit: 11abf9f5f604a90376f73c01ed5e4b73b14c35cf
Parents: f2f6e46
Author: suvasude <su...@linkedin.biz>
Authored: Wed Jan 24 14:43:56 2018 -0800
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Wed Jan 24 14:43:56 2018 -0800

----------------------------------------------------------------------
 .../copy/replication/ConfigBasedDataset.java    |  64 ++++----
 .../copy/replication/HadoopFsEndPoint.java      |  12 +-
 .../replication/ReplicaHadoopFsEndPoint.java    |  53 +++---
 .../replication/SourceHadoopFsEndPoint.java     |  37 +++--
 .../replication/ConfigBasedDatasetTest.java     | 162 +++++++++++++++++++
 .../replication/ConfigBasedDatasetsTest.java    | 135 ----------------
 .../configBasedDatasetTest/src/_dir1/file1      |   1 +
 .../configBasedDatasetTest/src/_dir1/file2      |   1 +
 8 files changed, 263 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
index 3881323..293034b 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
@@ -29,8 +29,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -71,14 +69,21 @@ public class ConfigBasedDataset implements CopyableDataset {
   private final ReplicationConfiguration rc;
   private String datasetURN;
   private boolean watermarkEnabled;
+  private final PathFilter pathFilter;
+
+  //Apply filter to directories
+  private final boolean applyFilterToDirectories;
 
   public ConfigBasedDataset(ReplicationConfiguration rc, Properties props, CopyRoute copyRoute) {
     this.props = props;
     this.copyRoute = copyRoute;
     this.rc = rc;
     calculateDatasetURN();
-    this.watermarkEnabled = Boolean.parseBoolean
-        (this.props.getProperty(ConfigBasedDatasetsFinder.WATERMARK_ENABLE, "true"));
+    this.watermarkEnabled =
+        Boolean.parseBoolean(this.props.getProperty(ConfigBasedDatasetsFinder.WATERMARK_ENABLE, "true"));
+    this.pathFilter = DatasetUtils.instantiatePathFilter(this.props);
+    this.applyFilterToDirectories =
+        Boolean.parseBoolean(this.props.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false"));
   }
 
   public ConfigBasedDataset(ReplicationConfiguration rc, Properties props, CopyRoute copyRoute, String datasetURN) {
@@ -86,9 +91,12 @@ public class ConfigBasedDataset implements CopyableDataset {
     this.copyRoute = copyRoute;
     this.rc = rc;
     this.datasetURN = datasetURN;
+    this.pathFilter = DatasetUtils.instantiatePathFilter(this.props);
+    this.applyFilterToDirectories =
+        Boolean.parseBoolean(this.props.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false"));
   }
 
-  private void calculateDatasetURN(){
+  private void calculateDatasetURN() {
     EndPoint e = this.copyRoute.getCopyTo();
     if (e instanceof HadoopFsEndPoint) {
       HadoopFsEndPoint copyTo = (HadoopFsEndPoint) e;
@@ -120,6 +128,14 @@ public class ConfigBasedDataset implements CopyableDataset {
       return copyableFiles;
     }
 
+    //For {@link HadoopFsEndPoint}s, set pathfilter and applyFilterToDirectories
+    HadoopFsEndPoint copyFrom = (HadoopFsEndPoint) copyFromRaw;
+    HadoopFsEndPoint copyTo = (HadoopFsEndPoint) copyToRaw;
+    copyFrom.setPathFilter(pathFilter);
+    copyFrom.setApplyFilterToDirectories(applyFilterToDirectories);
+    copyTo.setPathFilter(pathFilter);
+    copyTo.setApplyFilterToDirectories(applyFilterToDirectories);
+
     if (this.watermarkEnabled) {
       if ((!copyFromRaw.getWatermark().isPresent() && copyToRaw.getWatermark().isPresent()) || (
           copyFromRaw.getWatermark().isPresent() && copyToRaw.getWatermark().isPresent()
@@ -132,8 +148,6 @@ public class ConfigBasedDataset implements CopyableDataset {
       }
     }
 
-    HadoopFsEndPoint copyFrom = (HadoopFsEndPoint) copyFromRaw;
-    HadoopFsEndPoint copyTo = (HadoopFsEndPoint) copyToRaw;
     Configuration conf = HadoopUtils.newConfiguration();
     FileSystem copyFromFs = FileSystem.get(copyFrom.getFsURI(), conf);
     FileSystem copyToFs = FileSystem.get(copyTo.getFsURI(), conf);
@@ -141,20 +155,10 @@ public class ConfigBasedDataset implements CopyableDataset {
     Collection<FileStatus> allFilesInSource = copyFrom.getFiles();
     Collection<FileStatus> allFilesInTarget = copyTo.getFiles();
 
-    final PathFilter pathFilter = DatasetUtils.instantiatePathFilter(this.props);
-    Predicate<FileStatus> predicate = new Predicate<FileStatus>() {
-      @Override
-      public boolean apply(FileStatus input) {
-        return pathFilter.accept(input.getPath());
-      }
-    };
-
-    Set<FileStatus> copyFromFileStatuses = Sets.newHashSet(Collections2.filter(allFilesInSource, predicate));
+    Set<FileStatus> copyFromFileStatuses = Sets.newHashSet(allFilesInSource);
     Map<Path, FileStatus> copyToFileMap = Maps.newHashMap();
-    for(FileStatus f: allFilesInTarget){
-      if(pathFilter.accept(f.getPath())){
-        copyToFileMap.put(PathUtils.getPathWithoutSchemeAndAuthority(f.getPath()), f);
-      }
+    for (FileStatus f : allFilesInTarget) {
+      copyToFileMap.put(PathUtils.getPathWithoutSchemeAndAuthority(f.getPath()), f);
     }
 
     Collection<Path> deletedPaths = Lists.newArrayList();
@@ -184,10 +188,11 @@ public class ConfigBasedDataset implements CopyableDataset {
           deletedPaths.add(newPath);
         }
 
-        copyableFiles
-            .add(CopyableFile.fromOriginAndDestination(copyFromFs, originFileStatus, copyToFs.makeQualified(newPath), copyConfiguration)
-                .fileSet(PathUtils.getPathWithoutSchemeAndAuthority(copyTo.getDatasetPath()).toString()).build());
-
+        copyableFiles.add(
+            CopyableFile.fromOriginAndDestination(copyFromFs, originFileStatus, copyToFs.makeQualified(newPath),
+                copyConfiguration)
+                .fileSet(PathUtils.getPathWithoutSchemeAndAuthority(copyTo.getDatasetPath()).toString())
+                .build());
       }
 
       // clean up already checked paths
@@ -202,18 +207,17 @@ public class ConfigBasedDataset implements CopyableDataset {
     // delete old files first
     if (!deletedPaths.isEmpty()) {
       DeleteFileCommitStep deleteCommitStep = DeleteFileCommitStep.fromPaths(copyToFs, deletedPaths, this.props);
-      copyableFiles.add(new PrePublishStep(copyTo.getDatasetPath().toString(), Maps.<String, String> newHashMap(),
-          deleteCommitStep, 0));
+      copyableFiles.add(
+          new PrePublishStep(copyTo.getDatasetPath().toString(), Maps.<String, String>newHashMap(), deleteCommitStep,
+              0));
     }
 
     // generate the watermark file even if watermark checking is disabled. Make sure it can come into functional once disired.
     if ((!watermarkMetadataCopied) && copyFrom.getWatermark().isPresent()) {
-      copyableFiles.add(new PostPublishStep(copyTo.getDatasetPath().toString(), Maps.<String, String> newHashMap(),
+      copyableFiles.add(new PostPublishStep(copyTo.getDatasetPath().toString(), Maps.<String, String>newHashMap(),
           new WatermarkMetadataGenerationCommitStep(copyTo.getFsURI().toString(), copyTo.getDatasetPath(),
-              copyFrom.getWatermark().get()),
-          1));
+              copyFrom.getWatermark().get()), 1));
     }
     return copyableFiles;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/HadoopFsEndPoint.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/HadoopFsEndPoint.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/HadoopFsEndPoint.java
index 97fd20e..ea93ea8 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/HadoopFsEndPoint.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/HadoopFsEndPoint.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.data.management.copy.replication;
 import java.io.IOException;
 import java.net.URI;
 
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -28,9 +30,15 @@ import com.typesafe.config.Config;
 
 import org.apache.gobblin.util.HadoopUtils;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.PathFilter;
+
 
 @Slf4j
-public abstract class HadoopFsEndPoint implements EndPoint{
+@Getter
+@Setter
+public abstract class HadoopFsEndPoint implements EndPoint {
+  private PathFilter pathFilter;
+  private boolean applyFilterToDirectories;
 
   /**
    *
@@ -56,7 +64,7 @@ public abstract class HadoopFsEndPoint implements EndPoint{
    * @param path The path to be checked. For fs availability checking, just use "/"
    * @return If the filesystem/path exists or not.
    */
-  public boolean isPathAvailable(Path path){
+  public boolean isPathAvailable(Path path) {
     try {
       Configuration conf = HadoopUtils.newConfiguration();
       FileSystem fs = FileSystem.get(this.getFsURI(), conf);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicaHadoopFsEndPoint.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicaHadoopFsEndPoint.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicaHadoopFsEndPoint.java
index 8ab4e7e..024b239 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicaHadoopFsEndPoint.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicaHadoopFsEndPoint.java
@@ -72,30 +72,31 @@ public class ReplicaHadoopFsEndPoint extends HadoopFsEndPoint {
   }
 
   @Override
-  public synchronized Collection<FileStatus> getFiles() throws IOException{
-    if(filesInitialized){
+  public synchronized Collection<FileStatus> getFiles() throws IOException {
+    if (filesInitialized) {
       return this.allFileStatus;
     }
 
     this.filesInitialized = true;
     FileSystem fs = FileSystem.get(rc.getFsURI(), new Configuration());
 
-    if(!fs.exists(this.rc.getPath())){
+    if (!fs.exists(this.rc.getPath())) {
       return Collections.emptyList();
     }
 
     Collection<Path> validPaths = ReplicationDataValidPathPicker.getValidPaths(this);
-        //ReplicationDataValidPathPicker.getValidPaths(fs, this.rc.getPath(), this.rdc);
+    //ReplicationDataValidPathPicker.getValidPaths(fs, this.rc.getPath(), this.rdc);
 
-    for(Path p: validPaths){
-      this.allFileStatus.addAll(FileListUtils.listFilesRecursively(fs, p));
+    for (Path p : validPaths) {
+      this.allFileStatus.addAll(
+          FileListUtils.listFilesRecursively(fs, p, super.getPathFilter(), super.isApplyFilterToDirectories()));
     }
     return this.allFileStatus;
   }
 
   @Override
   public synchronized Optional<ComparableWatermark> getWatermark() {
-    if(this.watermarkInitialized) {
+    if (this.watermarkInitialized) {
       return this.cachedWatermark;
     }
 
@@ -104,12 +105,12 @@ public class ReplicaHadoopFsEndPoint extends HadoopFsEndPoint {
       Path metaData = new Path(rc.getPath(), WATERMARK_FILE);
       FileSystem fs = FileSystem.get(rc.getFsURI(), new Configuration());
       if (fs.exists(metaData)) {
-        try(FSDataInputStream fin = fs.open(metaData)){
+        try (FSDataInputStream fin = fs.open(metaData)) {
           InputStreamReader reader = new InputStreamReader(fin, Charsets.UTF_8);
           String content = CharStreams.toString(reader);
           Watermark w = WatermarkMetadataUtil.deserialize(content);
-          if(w instanceof ComparableWatermark){
-            this.cachedWatermark = Optional.of((ComparableWatermark)w);
+          if (w instanceof ComparableWatermark) {
+            this.cachedWatermark = Optional.of((ComparableWatermark) w);
           }
         }
         return this.cachedWatermark;
@@ -120,7 +121,7 @@ public class ReplicaHadoopFsEndPoint extends HadoopFsEndPoint {
     } catch (IOException e) {
       log.warn("Can not find " + WATERMARK_FILE + " for replica " + this);
       return this.cachedWatermark;
-    } catch (WatermarkMetadataUtil.WatermarkMetadataMulFormatException e){
+    } catch (WatermarkMetadataUtil.WatermarkMetadataMulFormatException e) {
       log.warn("Can not create watermark from " + WATERMARK_FILE + " for replica " + this);
       return this.cachedWatermark;
     }
@@ -143,8 +144,11 @@ public class ReplicaHadoopFsEndPoint extends HadoopFsEndPoint {
 
   @Override
   public String toString() {
-    return Objects.toStringHelper(this.getClass()).add("is source", this.isSource()).add("end point name", this.getEndPointName())
-        .add("hadoopfs config", this.rc).toString();
+    return Objects.toStringHelper(this.getClass())
+        .add("is source", this.isSource())
+        .add("end point name", this.getEndPointName())
+        .add("hadoopfs config", this.rc)
+        .toString();
   }
 
   @Override
@@ -153,7 +157,7 @@ public class ReplicaHadoopFsEndPoint extends HadoopFsEndPoint {
   }
 
   @Override
-  public Path getDatasetPath(){
+  public Path getDatasetPath() {
     return this.rc.getPath();
   }
 
@@ -168,23 +172,30 @@ public class ReplicaHadoopFsEndPoint extends HadoopFsEndPoint {
 
   @Override
   public boolean equals(Object obj) {
-    if (this == obj)
+    if (this == obj) {
       return true;
-    if (obj == null)
+    }
+    if (obj == null) {
       return false;
-    if (getClass() != obj.getClass())
+    }
+    if (getClass() != obj.getClass()) {
       return false;
+    }
     ReplicaHadoopFsEndPoint other = (ReplicaHadoopFsEndPoint) obj;
     if (rc == null) {
-      if (other.rc != null)
+      if (other.rc != null) {
         return false;
-    } else if (!rc.equals(other.rc))
+      }
+    } else if (!rc.equals(other.rc)) {
       return false;
+    }
     if (replicaName == null) {
-      if (other.replicaName != null)
+      if (other.replicaName != null) {
         return false;
-    } else if (!replicaName.equals(other.replicaName))
+      }
+    } else if (!replicaName.equals(other.replicaName)) {
       return false;
+    }
     return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java
index 0769c5c..2a56f2e 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java
@@ -39,7 +39,7 @@ import lombok.extern.slf4j.Slf4j;
 
 
 @Slf4j
-public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{
+public class SourceHadoopFsEndPoint extends HadoopFsEndPoint {
 
   @Getter
   private final HadoopFsReplicaConfig rc;
@@ -57,8 +57,8 @@ public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{
   }
 
   @Override
-  public synchronized Collection<FileStatus> getFiles() throws IOException{
-    if(!this.initialized){
+  public synchronized Collection<FileStatus> getFiles() throws IOException {
+    if (!this.initialized) {
       this.getWatermark();
     }
     return this.allFileStatus;
@@ -66,7 +66,7 @@ public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{
 
   @Override
   public synchronized Optional<ComparableWatermark> getWatermark() {
-    if(this.initialized) {
+    if (this.initialized) {
       return this.cachedWatermark;
     }
     try {
@@ -74,8 +74,9 @@ public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{
       FileSystem fs = FileSystem.get(rc.getFsURI(), new Configuration());
 
       Collection<Path> validPaths = ReplicationDataValidPathPicker.getValidPaths(this);
-      for(Path p: validPaths){
-        this.allFileStatus.addAll(FileListUtils.listFilesRecursively(fs, p));
+      for (Path p : validPaths) {
+        this.allFileStatus.addAll(
+            FileListUtils.listFilesRecursively(fs, p, super.getPathFilter(), super.isApplyFilterToDirectories()));
       }
 
       for (FileStatus f : this.allFileStatus) {
@@ -115,8 +116,11 @@ public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{
 
   @Override
   public String toString() {
-    return Objects.toStringHelper(this.getClass()).add("is source", this.isSource()).add("end point name", this.getEndPointName())
-        .add("hadoopfs config", this.rc).toString();
+    return Objects.toStringHelper(this.getClass())
+        .add("is source", this.isSource())
+        .add("end point name", this.getEndPointName())
+        .add("hadoopfs config", this.rc)
+        .toString();
   }
 
   @Override
@@ -125,7 +129,7 @@ public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{
   }
 
   @Override
-  public Path getDatasetPath(){
+  public Path getDatasetPath() {
     return this.rc.getPath();
   }
 
@@ -139,18 +143,23 @@ public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{
 
   @Override
   public boolean equals(Object obj) {
-    if (this == obj)
+    if (this == obj) {
       return true;
-    if (obj == null)
+    }
+    if (obj == null) {
       return false;
-    if (getClass() != obj.getClass())
+    }
+    if (getClass() != obj.getClass()) {
       return false;
+    }
     SourceHadoopFsEndPoint other = (SourceHadoopFsEndPoint) obj;
     if (rc == null) {
-      if (other.rc != null)
+      if (other.rc != null) {
         return false;
-    } else if (!rc.equals(other.rc))
+      }
+    } else if (!rc.equals(other.rc)) {
       return false;
+    }
     return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java
new file mode 100644
index 0000000..f925243
--- /dev/null
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.replication;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.Set;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.data.management.dataset.DatasetUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
+import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
+import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
+import org.apache.gobblin.source.extractor.ComparableWatermark;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.commit.DeleteFileCommitStep;
+
+
+/**
+ * Unit test for {@link ConfigBasedDataset}
+ * @author mitu
+ *
+ */
+@Test(groups = {"gobblin.data.management.copy.replication"})
+@Slf4j
+public class ConfigBasedDatasetTest {
+
+  public Collection<? extends CopyEntity> testGetCopyableFilesHelper(String sourceDir, String destinationDir,
+      long sourceWatermark, boolean isFilterEnabled) throws Exception {
+    FileSystem localFs = FileSystem.getLocal(new Configuration());
+    URI local = localFs.getUri();
+
+    Properties properties = new Properties();
+    properties.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/publisher");
+    PathFilter pathFilter = DatasetUtils.instantiatePathFilter(properties);
+    boolean applyFilterToDirectories = false;
+    if (isFilterEnabled) {
+      properties.setProperty(DatasetUtils.CONFIGURATION_KEY_PREFIX + "path.filter.class",
+          "org.apache.gobblin.util.filters.HiddenFilter");
+      properties.setProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "true");
+
+      pathFilter = DatasetUtils.instantiatePathFilter(properties);
+      applyFilterToDirectories =
+          Boolean.parseBoolean(properties.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false"));
+    }
+
+    CopyConfiguration copyConfiguration =
+        CopyConfiguration.builder(FileSystem.getLocal(new Configuration()), properties)
+            .publishDir(new Path(destinationDir))
+            .preserve(PreserveAttributes.fromMnemonicString("ugp"))
+            .build();
+
+    ReplicationMetaData mockMetaData = Mockito.mock(ReplicationMetaData.class);
+    Mockito.when(mockMetaData.toString()).thenReturn("Mock Meta Data");
+
+    ReplicationConfiguration mockRC = Mockito.mock(ReplicationConfiguration.class);
+    Mockito.when(mockRC.getCopyMode()).thenReturn(ReplicationCopyMode.PULL);
+    Mockito.when(mockRC.getMetaData()).thenReturn(mockMetaData);
+
+    HadoopFsEndPoint copyFrom = Mockito.mock(HadoopFsEndPoint.class);
+    Mockito.when(copyFrom.getDatasetPath()).thenReturn(new Path(sourceDir));
+    Mockito.when(copyFrom.getFsURI()).thenReturn(local);
+    ComparableWatermark sw = new LongWatermark(sourceWatermark);
+    Mockito.when(copyFrom.getWatermark()).thenReturn(Optional.of(sw));
+    Mockito.when(copyFrom.getFiles())
+        .thenReturn(
+            FileListUtils.listFilesRecursively(localFs, new Path(sourceDir), pathFilter, applyFilterToDirectories));
+
+    HadoopFsEndPoint copyTo = Mockito.mock(HadoopFsEndPoint.class);
+    Mockito.when(copyTo.getDatasetPath()).thenReturn(new Path(destinationDir));
+    Mockito.when(copyTo.getFsURI()).thenReturn(local);
+    Optional<ComparableWatermark> tmp = Optional.absent();
+    Mockito.when(copyTo.getWatermark()).thenReturn(tmp);
+    Mockito.when(copyTo.getFiles())
+        .thenReturn(FileListUtils.listFilesRecursively(localFs, new Path(destinationDir), pathFilter,
+            applyFilterToDirectories));
+
+    CopyRoute route = Mockito.mock(CopyRoute.class);
+    Mockito.when(route.getCopyFrom()).thenReturn(copyFrom);
+    Mockito.when(route.getCopyTo()).thenReturn(copyTo);
+
+    ConfigBasedDataset dataset = new ConfigBasedDataset(mockRC, properties, route);
+    Collection<? extends CopyEntity> copyableFiles = dataset.getCopyableFiles(localFs, copyConfiguration);
+    return copyableFiles;
+  }
+
+  @Test
+  public void testGetCopyableFiles() throws Exception {
+    String sourceDir = getClass().getClassLoader().getResource("configBasedDatasetTest/src").getFile();
+    String destinationDir = getClass().getClassLoader().getResource("configBasedDatasetTest/dest").getFile();
+    long sourceWatermark = 100L;
+
+    Collection<? extends CopyEntity> copyableFiles =
+        testGetCopyableFilesHelper(sourceDir, destinationDir, sourceWatermark, false);
+    Assert.assertEquals(copyableFiles.size(), 8);
+    copyableFiles = testGetCopyableFilesHelper(sourceDir, destinationDir, sourceWatermark, true);
+    Assert.assertEquals(copyableFiles.size(), 6);
+
+    Set<Path> paths =
+        Sets.newHashSet(new Path("dir1/file2"), new Path("dir1/file1"), new Path("dir2/file1"), new Path("dir2/file3"));
+    for (CopyEntity copyEntity : copyableFiles) {
+      if (copyEntity instanceof CopyableFile) {
+        CopyableFile file = (CopyableFile) copyEntity;
+        Path originRelativePath =
+            PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(file.getOrigin().getPath()),
+                PathUtils.getPathWithoutSchemeAndAuthority(new Path(sourceDir)));
+        Path targetRelativePath =
+            PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(file.getDestination()),
+                PathUtils.getPathWithoutSchemeAndAuthority(new Path(destinationDir)));
+
+        Assert.assertTrue(paths.contains(originRelativePath));
+        Assert.assertTrue(paths.contains(targetRelativePath));
+        Assert.assertEquals(originRelativePath, targetRelativePath);
+      } else if (copyEntity instanceof PrePublishStep) {
+        PrePublishStep pre = (PrePublishStep) copyEntity;
+        Assert.assertTrue(pre.getStep() instanceof DeleteFileCommitStep);
+        // need to delete this file
+        Assert.assertTrue(pre.explain().indexOf("configBasedDatasetTest/dest/dir1/file1") > 0);
+      } else if (copyEntity instanceof PostPublishStep) {
+        PostPublishStep post = (PostPublishStep) copyEntity;
+        Assert.assertTrue(post.getStep() instanceof WatermarkMetadataGenerationCommitStep);
+        Assert.assertTrue(
+            post.explain().indexOf("dest/_metadata") > 0 && post.explain().indexOf("" + sourceWatermark) > 0);
+      } else {
+        throw new Exception("Wrong type");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetsTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetsTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetsTest.java
deleted file mode 100644
index 1965a41..0000000
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetsTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.data.management.copy.replication;
-
-import java.net.URI;
-import java.util.Collection;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.data.management.copy.CopyConfiguration;
-import org.apache.gobblin.data.management.copy.CopyEntity;
-import org.apache.gobblin.data.management.copy.CopyableFile;
-import org.apache.gobblin.data.management.copy.PreserveAttributes;
-import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
-import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
-import org.apache.gobblin.source.extractor.ComparableWatermark;
-import org.apache.gobblin.source.extractor.extract.LongWatermark;
-import org.apache.gobblin.util.FileListUtils;
-import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.util.commit.DeleteFileCommitStep;
-
-
-/**
- * Unit test for {@link ConfigBasedDatasets}
- * @author mitu
- *
- */
-@Test(groups = {"gobblin.data.management.copy.replication"})
-
-public class ConfigBasedDatasetsTest {
-
-  @Test
-  public void testGetCopyableFiles() throws Exception {
-    String sourceDir = getClass().getClassLoader().getResource("configBasedDatasetTest/src").getFile();
-    String destinationDir = getClass().getClassLoader().getResource("configBasedDatasetTest/dest").getFile();
-    FileSystem localFs = FileSystem.getLocal(new Configuration());
-    URI local = localFs.getUri();
-    long sourceWatermark = 100L;
-
-    Properties properties = new Properties();
-    properties.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/publisher");
-
-    CopyConfiguration copyConfiguration =
-        CopyConfiguration.builder(FileSystem.getLocal(new Configuration()), properties).publishDir(new Path(destinationDir))
-        .preserve(PreserveAttributes.fromMnemonicString("ugp")).build();
-
-    ReplicationMetaData mockMetaData = Mockito.mock(ReplicationMetaData.class);
-    Mockito.when(mockMetaData.toString()).thenReturn("Mock Meta Data");
-
-    ReplicationConfiguration mockRC = Mockito.mock(ReplicationConfiguration.class);
-    Mockito.when(mockRC.getCopyMode()).thenReturn(ReplicationCopyMode.PULL);
-    Mockito.when(mockRC.getMetaData()).thenReturn(mockMetaData);
-
-    HadoopFsEndPoint copyFrom = Mockito.mock(HadoopFsEndPoint.class);
-    Mockito.when(copyFrom.getDatasetPath()).thenReturn(new Path(sourceDir));
-    Mockito.when(copyFrom.getFsURI()).thenReturn(local);
-    ComparableWatermark sw = new LongWatermark(sourceWatermark);
-    Mockito.when(copyFrom.getWatermark()).thenReturn(Optional.of(sw));
-    Mockito.when(copyFrom.getFiles()).thenReturn(FileListUtils.listFilesRecursively(localFs, new Path(sourceDir)));
-
-    HadoopFsEndPoint copyTo = Mockito.mock(HadoopFsEndPoint.class);
-    Mockito.when(copyTo.getDatasetPath()).thenReturn(new Path(destinationDir));
-    Mockito.when(copyTo.getFsURI()).thenReturn(local);
-    Optional<ComparableWatermark>tmp = Optional.absent();
-    Mockito.when(copyTo.getWatermark()).thenReturn(tmp);
-    Mockito.when(copyTo.getFiles()).thenReturn(FileListUtils.listFilesRecursively(localFs, new Path(destinationDir)));
-
-    CopyRoute route = Mockito.mock(CopyRoute.class);
-    Mockito.when(route.getCopyFrom()).thenReturn(copyFrom);
-    Mockito.when(route.getCopyTo()).thenReturn(copyTo);
-
-    ConfigBasedDataset dataset = new ConfigBasedDataset(mockRC, properties, route);
-
-    Collection<? extends CopyEntity> copyableFiles = dataset.getCopyableFiles(localFs, copyConfiguration);
-    Assert.assertEquals(copyableFiles.size(), 6);
-
-    Set<Path> paths = Sets.newHashSet(new Path("dir1/file2"), new Path("dir1/file1"), new Path("dir2/file1"), new Path("dir2/file3"));
-    for (CopyEntity copyEntity : copyableFiles) {
-      if(copyEntity instanceof CopyableFile) {
-        CopyableFile file = (CopyableFile) copyEntity;
-        Path originRelativePath =
-            PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(file.getOrigin().getPath()),
-                PathUtils.getPathWithoutSchemeAndAuthority(new Path(sourceDir)));
-        Path targetRelativePath =
-            PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(file.getDestination()),
-                PathUtils.getPathWithoutSchemeAndAuthority(new Path(destinationDir)));
-
-        Assert.assertTrue(paths.contains(originRelativePath));
-        Assert.assertTrue(paths.contains(targetRelativePath));
-        Assert.assertEquals(originRelativePath, targetRelativePath);
-      }
-      else if(copyEntity instanceof PrePublishStep){
-        PrePublishStep pre = (PrePublishStep)copyEntity;
-        Assert.assertTrue(pre.getStep() instanceof DeleteFileCommitStep);
-        // need to delete this file
-        Assert.assertTrue(pre.explain().indexOf("configBasedDatasetTest/dest/dir1/file1") > 0);
-      }
-      else if(copyEntity instanceof PostPublishStep){
-        PostPublishStep post = (PostPublishStep)copyEntity;
-        Assert.assertTrue(post.getStep() instanceof WatermarkMetadataGenerationCommitStep);
-        Assert.assertTrue(post.explain().indexOf("dest/_metadata") > 0 && post.explain().indexOf(""+sourceWatermark)>0);
-      }
-      else{
-        throw new Exception("Wrong type");
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file1
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file1 b/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file1
new file mode 100644
index 0000000..d87e628
--- /dev/null
+++ b/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file1
@@ -0,0 +1 @@
+_dir1:file1content

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file2
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file2 b/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file2
new file mode 100644
index 0000000..248eb41
--- /dev/null
+++ b/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file2
@@ -0,0 +1 @@
+_dir1:file2content