You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/07/21 20:16:00 UTC
[gobblin] branch master updated: [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time… (#3528)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 930055460 [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time… (#3528)
930055460 is described below
commit 930055460043bd7427508e6b3a47078df50c0f00
Author: William Lo <lo...@gmail.com>
AuthorDate: Thu Jul 21 13:15:54 2022 -0700
[GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time… (#3528)
* Clean up TimeAwareRecursiveCopyableDataset to support seconds in timestamp, squash logs for timestamp paths that do not exist, and improve efficiency on search for non-nested timestamps
* Fix checkstyle
* Address review
* Refactor based on reviews to handle every date pattern format recursively without the iterator
* Calculate start and end date in helper
---
.../copy/TimeAwareRecursiveCopyableDataset.java | 190 ++++++++++-----------
.../management/copy/DateRangeIteratorTest.java | 67 --------
.../TimeAwareRecursiveCopyableDatasetTest.java | 45 ++++-
3 files changed, 133 insertions(+), 169 deletions(-)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
index 73bc721e8..e0cdd9899 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
@@ -18,10 +18,13 @@
package org.apache.gobblin.data.management.copy;
import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -34,11 +37,11 @@ import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.PeriodFormatter;
import org.joda.time.format.PeriodFormatterBuilder;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.gobblin.configuration.ConfigurationKeys;
+@Slf4j
public class TimeAwareRecursiveCopyableDataset extends RecursiveCopyableDataset {
private static final String CONFIG_PREFIX = CopyConfiguration.COPY_PREFIX + ".recursive";
public static final String DATE_PATTERN_KEY = CONFIG_PREFIX + ".date.pattern";
@@ -49,134 +52,119 @@ public class TimeAwareRecursiveCopyableDataset extends RecursiveCopyableDataset
private final String lookbackTime;
private final String datePattern;
private final Period lookbackPeriod;
- private final boolean isPatternDaily;
- private final boolean isPatternHourly;
- private final boolean isPatternMinutely;
private final LocalDateTime currentTime;
- private final DatePattern pattern;
-
- enum DatePattern {
- MINUTELY, HOURLY, DAILY
- }
public TimeAwareRecursiveCopyableDataset(FileSystem fs, Path rootPath, Properties properties, Path glob) {
super(fs, rootPath, properties, glob);
this.lookbackTime = properties.getProperty(LOOKBACK_TIME_KEY);
- PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m").toFormatter();
+ PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays()
+ .appendSuffix("d")
+ .appendHours()
+ .appendSuffix("h")
+ .appendMinutes()
+ .appendSuffix("m")
+ .toFormatter();
this.lookbackPeriod = periodFormatter.parsePeriod(lookbackTime);
this.datePattern = properties.getProperty(DATE_PATTERN_KEY);
- this.isPatternMinutely = isDatePatternMinutely(datePattern);
- this.isPatternHourly = !this.isPatternMinutely && isDatePatternHourly(datePattern);
- this.isPatternDaily = !this.isPatternMinutely && !this.isPatternHourly;
+
this.currentTime = properties.containsKey(DATE_PATTERN_TIMEZONE_KEY) ? LocalDateTime.now(
DateTimeZone.forID(DATE_PATTERN_TIMEZONE_KEY))
: LocalDateTime.now(DateTimeZone.forID(DEFAULT_DATE_PATTERN_TIMEZONE));
- if (this.isPatternDaily) {
- Preconditions.checkArgument(isLookbackTimeStringDaily(this.lookbackTime), "Expected day format for lookback time; found hourly or minutely format");
- pattern = DatePattern.DAILY;
- } else if (this.isPatternHourly) {
- Preconditions.checkArgument(isLookbackTimeStringHourly(this.lookbackTime), "Expected hourly format for lookback time; found minutely format");
- pattern = DatePattern.HOURLY;
- } else {
- pattern = DatePattern.MINUTELY;
- }
+ this.validateLookbackWithDatePatternFormat(this.datePattern, this.lookbackTime);
}
- /**
- * TODO: Replace it with {@link org.apache.gobblin.time.TimeIterator} as {@link LocalDateTime} will not adjust time
- * to a given time zone
- */
- public static class DateRangeIterator implements Iterator {
- private LocalDateTime startDate;
- private LocalDateTime endDate;
- private DatePattern datePattern;
-
- public DateRangeIterator(LocalDateTime startDate, LocalDateTime endDate, DatePattern datePattern) {
- this.startDate = startDate;
- this.endDate = endDate;
- this.datePattern = datePattern;
- }
-
- @Override
- public boolean hasNext() {
- return !startDate.isAfter(endDate);
- }
-
- @Override
- public LocalDateTime next() {
- LocalDateTime dateTime = startDate;
-
- switch (datePattern) {
- case MINUTELY:
- startDate = startDate.plusMinutes(1);
- break;
- case HOURLY:
- startDate = startDate.plusHours(1);
- break;
- case DAILY:
- startDate = startDate.plusDays(1);
- break;
- }
-
- return dateTime;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
- private boolean isDatePatternHourly(String datePattern) {
+ void validateLookbackWithDatePatternFormat(String datePattern, String lookbackTime) {
DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
- LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 0, 0);
+ LocalDateTime refDateTime = new LocalDateTime(2017, 01, 31, 10, 59, 59);
String refDateTimeString = refDateTime.toString(formatter);
- LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHourOfDay(0);
- String refDateTimeStringAtStartOfDay = refDateTimeAtStartOfDay.toString(formatter);
- return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
- }
-
- private boolean isDatePatternMinutely(String datePattern) {
- DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
- LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 59, 0);
- String refDateTimeString = refDateTime.toString(formatter);
- LocalDateTime refDateTimeAtStartOfHour = refDateTime.withMinuteOfHour(0);
- String refDateTimeStringAtStartOfHour = refDateTimeAtStartOfHour.toString(formatter);
- return !refDateTimeString.equals(refDateTimeStringAtStartOfHour);
- }
-
- private boolean isLookbackTimeStringDaily(String lookbackTime) {
- PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter();
- try {
- periodFormatter.parsePeriod(lookbackTime);
- return true;
- } catch (Exception e) {
- return false;
+ PeriodFormatterBuilder formatterBuilder;
+
+ // Validate that the lookback is supported for the time format
+ if (!refDateTime.withSecondOfMinute(0).toString(formatter).equals(refDateTimeString)) {
+ formatterBuilder = new PeriodFormatterBuilder().appendDays()
+ .appendSuffix("d")
+ .appendHours()
+ .appendSuffix("h")
+ .appendMinutes()
+ .appendSuffix("m")
+ .appendSeconds()
+ .appendSuffix("s");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly or minutely or secondly format, check %s",
+ LOOKBACK_TIME_KEY));
+ }
+ } else if (!refDateTime.withMinuteOfHour(0).toString(formatter).equals(refDateTimeString)) {
+ formatterBuilder = new PeriodFormatterBuilder().appendDays()
+ .appendSuffix("d")
+ .appendHours()
+ .appendSuffix("h")
+ .appendMinutes()
+ .appendSuffix("m");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly or minutely format, check %s",
+ LOOKBACK_TIME_KEY));
+ }
+ } else if (!refDateTime.withHourOfDay(0).toString(formatter).equals(refDateTimeString)) {
+ formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly format, check %s", LOOKBACK_TIME_KEY));
+ }
+ } else if (!refDateTime.withDayOfMonth(1).toString(formatter).equals(refDateTimeString)) {
+ formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback time to be in daily format, check %s", LOOKBACK_TIME_KEY));
+ }
}
}
- private boolean isLookbackTimeStringHourly(String lookbackTime) {
- PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+ private boolean lookbackTimeMatchesFormat(PeriodFormatterBuilder formatterBuilder, String lookbackTime) {
try {
- periodFormatter.parsePeriod(lookbackTime);
- return true;
- } catch (Exception e) {
+ formatterBuilder.toFormatter().parsePeriod(lookbackTime);
+ } catch (IllegalArgumentException e) {
return false;
}
+ return true;
}
@Override
protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException {
- DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
LocalDateTime endDate = currentTime;
- LocalDateTime startDate = endDate.minus(this.lookbackPeriod);
+ DateTimeFormatter formatter = DateTimeFormat.forPattern(this.datePattern);
+ LocalDateTime startDate = formatter.parseLocalDateTime(endDate.minus(this.lookbackPeriod).toString(this.datePattern));
+ return recursivelyGetFilesAtDatePath(fs, path, "", fileFilter, 1, startDate, endDate, formatter);
+ }
- DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, pattern);
+ private List<FileStatus> recursivelyGetFilesAtDatePath(FileSystem fs, Path path, String traversedDatePath, PathFilter fileFilter,
+ int level, LocalDateTime startDate, LocalDateTime endDate, DateTimeFormatter formatter) throws IOException {
List<FileStatus> fileStatuses = Lists.newArrayList();
- while (dateRangeIterator.hasNext()) {
- Path pathWithDateTime = new Path(path, dateRangeIterator.next().toString(formatter));
- fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime, fileFilter));
+ Iterator<FileStatus> folderIterator = Arrays.asList(fs.listStatus(path)).iterator();
+
+ // Check if at the lowest level/granularity of the date folder
+ if (this.datePattern.split(FileSystems.getDefault().getSeparator()).length == level) {
+ // Truncate the start date to the most granular unit of time in the datepattern
+ while (folderIterator.hasNext()) {
+ Path folderPath = folderIterator.next().getPath();
+ String datePath = traversedDatePath.isEmpty() ? folderPath.getName() : new Path(traversedDatePath, folderPath.getName()).toString();
+ try {
+ LocalDateTime folderDate = formatter.parseLocalDateTime(datePath);
+ if (!folderDate.isBefore(startDate) && !folderDate.isAfter(endDate)) {
+ fileStatuses.addAll(super.getFilesAtPath(fs, folderPath, fileFilter));
+ }
+ } catch (IllegalArgumentException e) {
+ log.warn(String.format(
+ "Folder at path %s is not convertible to format %s. Please confirm that argument %s is valid", datePath,
+ this.datePattern, DATE_PATTERN_KEY));
+ }
+ }
+ } else {
+ // folder has a format such as yyyy/mm/dd/hh, so recursively find date paths
+ while (folderIterator.hasNext()) {
+ // Start building the date from top-down
+ String nextDate = folderIterator.next().getPath().getName();
+ String datePath = traversedDatePath.isEmpty() ? nextDate : new Path(traversedDatePath, nextDate).toString();
+ fileStatuses.addAll(recursivelyGetFilesAtDatePath(fs, new Path(path, nextDate), datePath, fileFilter, level + 1, startDate, endDate, formatter));
+ }
}
return fileStatuses;
}
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
deleted file mode 100644
index 47e72d672..000000000
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.data.management.copy;
-
-import org.joda.time.LocalDateTime;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.*;
-
-
-public class DateRangeIteratorTest {
-
- @Test
- public void testIterator() {
- LocalDateTime endDate = new LocalDateTime(2017, 1, 1, 0, 0, 0);
- LocalDateTime startDate = endDate.minusHours(2);
- String datePattern = "HH/yyyy/MM/dd";
- DateTimeFormatter format = DateTimeFormat.forPattern(datePattern);
- TimeAwareRecursiveCopyableDataset.DateRangeIterator dateRangeIterator =
- new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, TimeAwareRecursiveCopyableDataset.DatePattern.HOURLY);
- LocalDateTime dateTime = dateRangeIterator.next();
- Assert.assertEquals(dateTime.toString(format), "22/2016/12/31");
- dateTime = dateRangeIterator.next();
- Assert.assertEquals(dateTime.toString(format), "23/2016/12/31");
- dateTime = dateRangeIterator.next();
- Assert.assertEquals(dateTime.toString(format), "00/2017/01/01");
- Assert.assertEquals(dateRangeIterator.hasNext(), false);
-
- datePattern = "yyyy/MM/dd";
- format = DateTimeFormat.forPattern(datePattern);
- startDate = endDate.minusDays(1);
- dateRangeIterator = new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, TimeAwareRecursiveCopyableDataset.DatePattern.DAILY);
- dateTime = dateRangeIterator.next();
- Assert.assertEquals(dateTime.toString(format), "2016/12/31");
- dateTime = dateRangeIterator.next();
- Assert.assertEquals(dateTime.toString(format), "2017/01/01");
- Assert.assertEquals(dateRangeIterator.hasNext(), false);
-
- datePattern = "yyyy-MM-dd-HH-mm";
- format = DateTimeFormat.forPattern(datePattern);
- startDate = endDate.minusHours(1);
- dateRangeIterator = new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, TimeAwareRecursiveCopyableDataset.DatePattern.MINUTELY);
- dateTime = dateRangeIterator.next();
- Assert.assertEquals(dateTime.toString(format), "2016-12-31-23-00");
- dateTime = dateRangeIterator.next();
- Assert.assertEquals(dateTime.toString(format), "2016-12-31-23-01");
- Assert.assertEquals(dateRangeIterator.hasNext(), true);
- }
-}
\ No newline at end of file
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
index b684936bb..521caaafe 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
@@ -24,6 +24,7 @@ import java.util.Properties;
import java.util.Random;
import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -44,12 +45,13 @@ import org.testng.annotations.Test;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.filters.HiddenFilter;
-
+@Slf4j
public class TimeAwareRecursiveCopyableDatasetTest {
private FileSystem fs;
private Path baseDir1;
private Path baseDir2;
private Path baseDir3;
+ private Path baseDir4;
private static final String NUM_LOOKBACK_DAYS_STR = "2d";
private static final Integer NUM_LOOKBACK_DAYS = 2;
@@ -85,6 +87,12 @@ public class TimeAwareRecursiveCopyableDatasetTest {
fs.delete(baseDir3, true);
}
fs.mkdirs(baseDir3);
+
+ baseDir4 = new Path("/tmp/src/ds3/daily");
+ if (fs.exists(baseDir4)) {
+ fs.delete(baseDir4, true);
+ }
+ fs.mkdirs(baseDir4);
PeriodFormatter formatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
Period period = formatter.parsePeriod(NUM_LOOKBACK_DAYS_HOURS_STR);
}
@@ -216,6 +224,40 @@ public class TimeAwareRecursiveCopyableDatasetTest {
Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
}
+ // test ds of daily/yyyy-MM-dd-HH-mm-ss
+ datePattern = "yyyy-MM-dd-HH-mm-ss";
+ formatter = DateTimeFormat.forPattern(datePattern);
+ endDate = LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
+
+ candidateFiles = new HashSet<>();
+ for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
+ String startDate = endDate.minusDays(i).withMinuteOfHour(random.nextInt(60)).withSecondOfMinute(random.nextInt(60)).toString(formatter);
+ if (i == 0) {
+ // avoid future dates on minutes, so have consistency test result
+ startDate = endDate.minusHours(i).withMinuteOfHour(0).withSecondOfMinute(0).toString(formatter);
+ }
+ Path subDirPath = new Path(baseDir4, new Path(startDate));
+ fs.mkdirs(subDirPath);
+ Path filePath = new Path(subDirPath, i + ".avro");
+ fs.create(filePath);
+ if (i < (NUM_LOOKBACK_DAYS + 1)) {
+ candidateFiles.add(filePath.toString());
+ }
+ }
+
+ properties = new Properties();
+ properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, "2d1h");
+ properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy-MM-dd-HH-mm-ss");
+
+ dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir4, properties,
+ new Path("/tmp/src/ds3/daily"));
+
+ fileStatusList = dataset.getFilesAtPath(fs, baseDir4, pathFilter);
+
+ Assert.assertEquals(fileStatusList.size(), NUM_LOOKBACK_DAYS + 1);
+ for (FileStatus fileStatus: fileStatusList) {
+ Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
+ }
}
@Test (expectedExceptions = IllegalArgumentException.class)
@@ -244,5 +286,6 @@ public class TimeAwareRecursiveCopyableDatasetTest {
this.fs.delete(baseDir1, true);
this.fs.delete(baseDir2, true);
this.fs.delete(baseDir3, true);
+ this.fs.delete(baseDir4, true);
}
}
\ No newline at end of file