You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2024/02/23 23:37:51 UTC
(gobblin) branch master updated: [GOBBLIN-2005] Use correct root path when finding relative paths on destination side (#3882)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new bdbbfe18c [GOBBLIN-2005] Use correct root path when finding relative paths on destination side (#3882)
bdbbfe18c is described below
commit bdbbfe18cff0acee7833f148843cd371eaca8bdd
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Fri Feb 23 15:37:45 2024 -0800
[GOBBLIN-2005] Use correct root path when finding relative paths on destination side (#3882)
---
.../copy/UnixTimestampRecursiveCopyableDataset.java | 15 +++++++++------
1 file changed, 9 insertions(+), 6 deletions(-)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java
index cd4f18867..366249b30 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java
@@ -41,6 +41,8 @@ import org.joda.time.format.PeriodFormatterBuilder;
import com.google.common.collect.Lists;
+import lombok.AllArgsConstructor;
+
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.filters.AndPathFilter;
@@ -88,22 +90,23 @@ public class UnixTimestampRecursiveCopyableDataset extends RecursiveCopyableData
* based on {@link #timestampPattern} and filters out the paths that are out the date range
*
*/
+ @AllArgsConstructor
class TimestampPathFilter implements PathFilter {
+ private final Path path;
@Override
public boolean accept(Path path) {
LocalDate endDate = currentTime.toLocalDate();
LocalDate startDate = endDate.minus(lookbackPeriod);
- Path relativePath = PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(path), datasetRoot());
+ Path relativePath = PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(path), this.path);
Matcher matcher = timestampPattern.matcher(relativePath.toString());
if (!matcher.matches()) {
return false;
}
Long timestamp = Long.parseLong(matcher.group(1));
LocalDate dateOfTimestamp = new LocalDateTime(timestamp, dateTimeZone).toLocalDate();
- return !(dateOfTimestamp == null || dateOfTimestamp.isAfter(endDate) || dateOfTimestamp.isEqual(startDate)
- || dateOfTimestamp.isBefore(startDate));
+ return !(dateOfTimestamp.isAfter(endDate) || dateOfTimestamp.isEqual(startDate) || dateOfTimestamp.isBefore(startDate));
}
}
@@ -111,8 +114,8 @@ public class UnixTimestampRecursiveCopyableDataset extends RecursiveCopyableData
protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter)
throws IOException {
- // Filter files by lookback period (fileNames >= startDate and fileNames <= endDate)
- PathFilter andPathFilter = new AndPathFilter(fileFilter, new TimestampPathFilter());
+ // Filter files by lookback period (fileNames >= startDate and fileNames < endDate)
+ PathFilter andPathFilter = new AndPathFilter(fileFilter, new TimestampPathFilter(path));
List<FileStatus> files = super.getFilesAtPath(fs, path, andPathFilter);
if (VersionSelectionPolicy.ALL == versionSelectionPolicy) {
@@ -122,7 +125,7 @@ public class UnixTimestampRecursiveCopyableDataset extends RecursiveCopyableData
Map<Pair<String, LocalDate>, TreeMap<Long, List<FileStatus>>> pathTimestampFilesMap = new HashMap<>();
// Now select files per day based on version selection policy
for (FileStatus fileStatus : files) {
- String relativePath = PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()), datasetRoot()).toString();
+ String relativePath = PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()), path).toString();
Matcher matcher = timestampPattern.matcher(relativePath);
if (!matcher.matches()) {
continue;