You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2024/02/23 23:37:51 UTC

(gobblin) branch master updated: [GOBBLIN-2005] Use correct root path when finding relative paths on destination side (#3882)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new bdbbfe18c [GOBBLIN-2005] Use correct root path when finding relative paths on destination side (#3882)
bdbbfe18c is described below

commit bdbbfe18cff0acee7833f148843cd371eaca8bdd
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Fri Feb 23 15:37:45 2024 -0800

    [GOBBLIN-2005] Use correct root path when finding relative paths on destination side (#3882)
---
 .../copy/UnixTimestampRecursiveCopyableDataset.java       | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java
index cd4f18867..366249b30 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java
@@ -41,6 +41,8 @@ import org.joda.time.format.PeriodFormatterBuilder;
 
 import com.google.common.collect.Lists;
 
+import lombok.AllArgsConstructor;
+
 import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.filters.AndPathFilter;
 
@@ -88,22 +90,23 @@ public class UnixTimestampRecursiveCopyableDataset extends RecursiveCopyableData
    * based on {@link #timestampPattern} and filters out the paths that are out the date range
    *
    */
+  @AllArgsConstructor
   class TimestampPathFilter implements PathFilter {
+    private final Path path;
 
     @Override
     public boolean accept(Path path) {
 
       LocalDate endDate = currentTime.toLocalDate();
       LocalDate startDate = endDate.minus(lookbackPeriod);
-      Path relativePath = PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(path), datasetRoot());
+      Path relativePath = PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(path), this.path);
       Matcher matcher = timestampPattern.matcher(relativePath.toString());
       if (!matcher.matches()) {
         return false;
       }
       Long timestamp = Long.parseLong(matcher.group(1));
       LocalDate dateOfTimestamp = new LocalDateTime(timestamp, dateTimeZone).toLocalDate();
-      return !(dateOfTimestamp == null || dateOfTimestamp.isAfter(endDate) || dateOfTimestamp.isEqual(startDate)
-          || dateOfTimestamp.isBefore(startDate));
+      return !(dateOfTimestamp.isAfter(endDate) || dateOfTimestamp.isEqual(startDate) || dateOfTimestamp.isBefore(startDate));
     }
   }
 
@@ -111,8 +114,8 @@ public class UnixTimestampRecursiveCopyableDataset extends RecursiveCopyableData
   protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter)
       throws IOException {
 
-    // Filter files by lookback period (fileNames >= startDate and fileNames <= endDate)
-    PathFilter andPathFilter = new AndPathFilter(fileFilter, new TimestampPathFilter());
+    // Filter files by lookback period (fileNames >= startDate and fileNames < endDate)
+    PathFilter andPathFilter = new AndPathFilter(fileFilter, new TimestampPathFilter(path));
     List<FileStatus> files = super.getFilesAtPath(fs, path, andPathFilter);
 
     if (VersionSelectionPolicy.ALL == versionSelectionPolicy) {
@@ -122,7 +125,7 @@ public class UnixTimestampRecursiveCopyableDataset extends RecursiveCopyableData
     Map<Pair<String, LocalDate>, TreeMap<Long, List<FileStatus>>> pathTimestampFilesMap = new HashMap<>();
     // Now select files per day based on version selection policy
     for (FileStatus fileStatus : files) {
-      String relativePath = PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()), datasetRoot()).toString();
+      String relativePath = PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()), path).toString();
       Matcher matcher = timestampPattern.matcher(relativePath);
       if (!matcher.matches()) {
         continue;