You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2022/07/15 18:05:33 UTC

[GitHub] [gobblin] Will-Lo opened a new pull request, #3528: [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time…

Will-Lo opened a new pull request, #3528:
URL: https://github.com/apache/gobblin/pull/3528

   …stamp, squash logs for timestamp paths that do not exist, and improve efficiency on search for non-nested timestamps
   
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1669
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if applicable):
   
   Support seconds with the timeiterator
   Optimize non-nested timestamp representations e.g. yyyy-mm-dd-hh-mm-ss to not use an iterator, and instead list the top level directory to reduce the number of FS calls.
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3528: [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time…

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3528:
URL: https://github.com/apache/gobblin/pull/3528#discussion_r926141507


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java:
##########
@@ -170,37 +129,41 @@ private boolean lookbackTimeMatchesFormat(PeriodFormatterBuilder formatterBuilde
 
   @Override
   protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException {
-    DateTimeFormatter formatter = DateTimeFormat.forPattern(this.datePattern);
-    LocalDateTime endDate = currentTime;
-    LocalDateTime startDate = endDate.minus(this.lookbackPeriod);
-    List<FileStatus> fileStatuses = Lists.newArrayList();
+    return recursivelyGetFilesAtDatePath(fs, path, "", fileFilter, 1);
+  }
 
-    // Data inside of nested folders representing timestamps need to be fetched differently
-    if (datePattern.contains(FileSystems.getDefault().getSeparator())) {
-      // Use an iterator that traverses through all times from lookback to current time, based on format
-      DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, this.patternQualifier);
-      while (dateRangeIterator.hasNext()) {
-        Path pathWithDateTime = new Path(path, dateRangeIterator.next().toString(formatter));
-        if (!fs.exists(pathWithDateTime)) {
-          continue;
-        }
-        fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime, fileFilter));
-      }
-    } else {
-      // Look at the top level directories and compare if those fit into the date format
-      Iterator<FileStatus> folderIterator = Arrays.asList(fs.listStatus(path)).iterator();
+  private List<FileStatus> recursivelyGetFilesAtDatePath(FileSystem fs, Path path, String traversedDatePath, PathFilter fileFilter, int level) throws IOException {
+    List<FileStatus> fileStatuses = Lists.newArrayList();
+    Iterator<FileStatus> folderIterator = Arrays.asList(fs.listStatus(path)).iterator();
+
+    // Check if at the lowest level/granularity of the date folder
+    if (this.datePattern.split(FileSystems.getDefault().getSeparator()).length == level) {

Review Comment:
   since `dateaPattern` doesn't change, could always calc in `getFilesAtPath` and decrement on each recursive step
   
   same with the `endDate` and `startDate` below: could they be calculated once in the helper function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3528: [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time…

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3528:
URL: https://github.com/apache/gobblin/pull/3528#discussion_r926123411


##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java:
##########
@@ -216,6 +223,40 @@ public void testGetFilesAtPath() throws IOException {
       Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
     }
 
+    // test ds of daily/yyyy-MM-dd-HH-mm-ss
+    datePattern = "yyyy-MM-dd-HH-mm-ss";
+    formatter = DateTimeFormat.forPattern(datePattern);
+    endDate = LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
+
+    candidateFiles = new HashSet<>();
+    for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
+      String startDate = endDate.minusDays(i).withMinuteOfHour(random.nextInt(60)).withSecondOfMinute(random.nextInt(60)).toString(formatter);

Review Comment:
   It shouldn't affect the test since the test is looking for the past 2 days lookback, so the minute/second shouldn't affect the outcome. We just want to make sure that the date partitioned by the second will actually be picked up (it wasn't before due to how the iterator would assume every second path exists, but only look for paths in the increment of minutes)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3528: [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time…

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3528:
URL: https://github.com/apache/gobblin/pull/3528#discussion_r926122575


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java:
##########
@@ -128,55 +124,83 @@ public void remove() {
     }
   }
 
-  private boolean isDatePatternHourly(String datePattern) {
-    DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
-    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 0, 0);
-    String refDateTimeString = refDateTime.toString(formatter);
-    LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHourOfDay(0);
-    String refDateTimeStringAtStartOfDay = refDateTimeAtStartOfDay.toString(formatter);
-    return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
-  }
-
-  private boolean isDatePatternMinutely(String datePattern) {
+  private DatePattern validateLookbackWithDatePatternFormat(String datePattern, String lookbackTime) {
     DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
-    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 59, 0);
+    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 31, 10, 59, 59);
     String refDateTimeString = refDateTime.toString(formatter);
-    LocalDateTime refDateTimeAtStartOfHour = refDateTime.withMinuteOfHour(0);
-    String refDateTimeStringAtStartOfHour = refDateTimeAtStartOfHour.toString(formatter);
-    return !refDateTimeString.equals(refDateTimeStringAtStartOfHour);
-  }
+    PeriodFormatterBuilder formatterBuilder;
 
-  private boolean isLookbackTimeStringDaily(String lookbackTime) {
-    PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter();
-    try {
-      periodFormatter.parsePeriod(lookbackTime);
-      return true;
-    } catch (Exception e) {
-      return false;
+    // Validate that the lookback is supported for the time format
+    if (!refDateTime.withSecondOfMinute(0).toString(formatter).equals(refDateTimeString)) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m").appendSeconds().appendSuffix("s");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly or minutely or secondly format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.SECONDLY;
+    } else if (!refDateTime.withMinuteOfHour(0).toString(formatter).equals(refDateTimeString)) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly or minutely format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.MINUTELY;
+    } else if (!refDateTime.withHourOfDay(0).toString(formatter).equals(refDateTimeString)) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.HOURLY;
+    } else if (!refDateTime.withDayOfMonth(1).toString(formatter).equals(refDateTimeString) ) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.DAILY;
     }
+    return null;
   }
 
-  private boolean isLookbackTimeStringHourly(String lookbackTime) {
-    PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+  private boolean lookbackTimeMatchesFormat(PeriodFormatterBuilder formatterBuilder, String lookbackTime) {
     try {
-      periodFormatter.parsePeriod(lookbackTime);
-      return true;
-    } catch (Exception e) {
+      formatterBuilder.toFormatter().parsePeriod(lookbackTime);
+    } catch (IllegalArgumentException e) {
       return false;
     }
+    return true;
   }
 
   @Override
   protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException {
-    DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
+    DateTimeFormatter formatter = DateTimeFormat.forPattern(this.datePattern);
     LocalDateTime endDate = currentTime;
     LocalDateTime startDate = endDate.minus(this.lookbackPeriod);
-
-    DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, pattern);
     List<FileStatus> fileStatuses = Lists.newArrayList();
-    while (dateRangeIterator.hasNext()) {
-      Path pathWithDateTime = new Path(path, dateRangeIterator.next().toString(formatter));
-      fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime, fileFilter));
+
+    // Data inside of nested folders representing timestamps need to be fetched differently
+    if (datePattern.contains(FileSystems.getDefault().getSeparator())) {
+      // Use an iterator that traverses through all times from lookback to current time, based on format
+      DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, this.patternQualifier);
+      while (dateRangeIterator.hasNext()) {
+        Path pathWithDateTime = new Path(path, dateRangeIterator.next().toString(formatter));
+        if (!fs.exists(pathWithDateTime)) {
+          continue;
+        }
+        fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime, fileFilter));
+      }
+    } else {
+      // Look at the top level directories and compare if those fit into the date format
+      Iterator<FileStatus> folderIterator = Arrays.asList(fs.listStatus(path)).iterator();
+      while (folderIterator.hasNext()) {
+        Path folderPath = folderIterator.next().getPath();
+        String datePath = folderPath.getName().split("/")[folderPath.getName().split("/").length -1];
+        try {
+          LocalDateTime folderDate = formatter.parseLocalDateTime(datePath);
+          if (folderDate.isAfter(startDate) && folderDate.isBefore(endDate)) {

Review Comment:
   Yes you're right, to handle this I had to round down the startDate



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3528: [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time…

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3528:
URL: https://github.com/apache/gobblin/pull/3528#discussion_r924843973


##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java:
##########
@@ -216,6 +223,40 @@ public void testGetFilesAtPath() throws IOException {
       Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
     }
 
+    // test ds of daily/yyyy-MM-dd-HH-mm-ss
+    datePattern = "yyyy-MM-dd-HH-mm-ss";
+    formatter = DateTimeFormat.forPattern(datePattern);
+    endDate = LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
+
+    candidateFiles = new HashSet<>();
+    for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {

Review Comment:
   You're right but its just for generating folders for testing which gets cleaned up afterwards. And we would want to test scenarios where there are multiple folders but the lookback covers only a subset of them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3528: [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time…

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3528:
URL: https://github.com/apache/gobblin/pull/3528#discussion_r926120906


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java:
##########
@@ -128,55 +124,83 @@ public void remove() {
     }
   }
 
-  private boolean isDatePatternHourly(String datePattern) {
-    DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
-    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 0, 0);
-    String refDateTimeString = refDateTime.toString(formatter);
-    LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHourOfDay(0);
-    String refDateTimeStringAtStartOfDay = refDateTimeAtStartOfDay.toString(formatter);
-    return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
-  }
-
-  private boolean isDatePatternMinutely(String datePattern) {
+  private DatePattern validateLookbackWithDatePatternFormat(String datePattern, String lookbackTime) {
     DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
-    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 59, 0);
+    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 31, 10, 59, 59);
     String refDateTimeString = refDateTime.toString(formatter);
-    LocalDateTime refDateTimeAtStartOfHour = refDateTime.withMinuteOfHour(0);
-    String refDateTimeStringAtStartOfHour = refDateTimeAtStartOfHour.toString(formatter);
-    return !refDateTimeString.equals(refDateTimeStringAtStartOfHour);
-  }
+    PeriodFormatterBuilder formatterBuilder;
 
-  private boolean isLookbackTimeStringDaily(String lookbackTime) {
-    PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter();
-    try {
-      periodFormatter.parsePeriod(lookbackTime);
-      return true;
-    } catch (Exception e) {
-      return false;
+    // Validate that the lookback is supported for the time format
+    if (!refDateTime.withSecondOfMinute(0).toString(formatter).equals(refDateTimeString)) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m").appendSeconds().appendSuffix("s");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly or minutely or secondly format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.SECONDLY;
+    } else if (!refDateTime.withMinuteOfHour(0).toString(formatter).equals(refDateTimeString)) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly or minutely format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.MINUTELY;
+    } else if (!refDateTime.withHourOfDay(0).toString(formatter).equals(refDateTimeString)) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.HOURLY;
+    } else if (!refDateTime.withDayOfMonth(1).toString(formatter).equals(refDateTimeString) ) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.DAILY;
     }
+    return null;
   }
 
-  private boolean isLookbackTimeStringHourly(String lookbackTime) {
-    PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+  private boolean lookbackTimeMatchesFormat(PeriodFormatterBuilder formatterBuilder, String lookbackTime) {
     try {
-      periodFormatter.parsePeriod(lookbackTime);
-      return true;
-    } catch (Exception e) {
+      formatterBuilder.toFormatter().parsePeriod(lookbackTime);
+    } catch (IllegalArgumentException e) {
       return false;
     }
+    return true;
   }
 
   @Override
   protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException {
-    DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
+    DateTimeFormatter formatter = DateTimeFormat.forPattern(this.datePattern);
     LocalDateTime endDate = currentTime;
     LocalDateTime startDate = endDate.minus(this.lookbackPeriod);
-
-    DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, pattern);
     List<FileStatus> fileStatuses = Lists.newArrayList();
-    while (dateRangeIterator.hasNext()) {
-      Path pathWithDateTime = new Path(path, dateRangeIterator.next().toString(formatter));
-      fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime, fileFilter));
+
+    // Data inside of nested folders representing timestamps need to be fetched differently
+    if (datePattern.contains(FileSystems.getDefault().getSeparator())) {
+      // Use an iterator that traverses through all times from lookback to current time, based on format
+      DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, this.patternQualifier);
+      while (dateRangeIterator.hasNext()) {
+        Path pathWithDateTime = new Path(path, dateRangeIterator.next().toString(formatter));
+        if (!fs.exists(pathWithDateTime)) {
+          continue;
+        }
+        fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime, fileFilter));
+      }
+    } else {
+      // Look at the top level directories and compare if those fit into the date format
+      Iterator<FileStatus> folderIterator = Arrays.asList(fs.listStatus(path)).iterator();
+      while (folderIterator.hasNext()) {
+        Path folderPath = folderIterator.next().getPath();
+        String datePath = folderPath.getName().split("/")[folderPath.getName().split("/").length -1];

Review Comment:
   Oops that was a mistake, change it in a refactor to address the below review



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3528: [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time…

Posted by GitBox <gi...@apache.org>.
umustafi commented on code in PR #3528:
URL: https://github.com/apache/gobblin/pull/3528#discussion_r923904554


##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java:
##########
@@ -216,6 +223,40 @@ public void testGetFilesAtPath() throws IOException {
       Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
     }
 
+    // test ds of daily/yyyy-MM-dd-HH-mm-ss
+    datePattern = "yyyy-MM-dd-HH-mm-ss";
+    formatter = DateTimeFormat.forPattern(datePattern);
+    endDate = LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
+
+    candidateFiles = new HashSet<>();
+    for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {

Review Comment:
   do we not want to create # directories = min(lookback, max_num_daily_dirs)? there could be more directories created than we want if max_num_daily_dirs is > lookback time right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 merged pull request #3528: [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time…

Posted by GitBox <gi...@apache.org>.
ZihanLi58 merged PR #3528:
URL: https://github.com/apache/gobblin/pull/3528


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3528: [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time…

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3528:
URL: https://github.com/apache/gobblin/pull/3528#discussion_r926122676


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java:
##########
@@ -49,14 +52,11 @@ public class TimeAwareRecursiveCopyableDataset extends RecursiveCopyableDataset
   private final String lookbackTime;
   private final String datePattern;
   private final Period lookbackPeriod;
-  private final boolean isPatternDaily;
-  private final boolean isPatternHourly;
-  private final boolean isPatternMinutely;
   private final LocalDateTime currentTime;
-  private final DatePattern pattern;
+  private final DatePattern patternQualifier;
 
   enum DatePattern {
-    MINUTELY, HOURLY, DAILY
+    SECONDLY, MINUTELY, HOURLY, DAILY

Review Comment:
   Ends up we don't need this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] codecov-commenter commented on pull request #3528: [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time…

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #3528:
URL: https://github.com/apache/gobblin/pull/3528#issuecomment-1185824441

   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3528?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3528](https://codecov.io/gh/apache/gobblin/pull/3528?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (a75e5ef) into [master](https://codecov.io/gh/apache/gobblin/commit/7aad1f9ade24ad05ab797e150b6c03a03d925c04?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (7aad1f9) will **increase** coverage by `1.82%`.
   > The diff coverage is `67.92%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3528      +/-   ##
   ============================================
   + Coverage     46.61%   48.43%   +1.82%     
   + Complexity    10409     7750    -2659     
   ============================================
     Files          2081     1447     -634     
     Lines         81449    57267   -24182     
     Branches       9090     6603    -2487     
   ============================================
   - Hits          37965    27738   -10227     
   + Misses        39973    26932   -13041     
   + Partials       3511     2597     -914     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3528?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...gement/copy/TimeAwareRecursiveCopyableDataset.java](https://codecov.io/gh/apache/gobblin/pull/3528/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvVGltZUF3YXJlUmVjdXJzaXZlQ29weWFibGVEYXRhc2V0LmphdmE=) | `75.58% <67.92%> (-15.60%)` | :arrow_down: |
   | [...management/partition/CopyableDatasetRequestor.java](https://codecov.io/gh/apache/gobblin/pull/3528/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L3BhcnRpdGlvbi9Db3B5YWJsZURhdGFzZXRSZXF1ZXN0b3IuamF2YQ==) | `35.48% <0.00%> (-9.68%)` | :arrow_down: |
   | [...e/gobblin/runtime/locks/ZookeeperBasedJobLock.java](https://codecov.io/gh/apache/gobblin/pull/3528/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbG9ja3MvWm9va2VlcGVyQmFzZWRKb2JMb2NrLmphdmE=) | `57.77% <0.00%> (-6.67%)` | :arrow_down: |
   | [...in/java/org/apache/gobblin/cluster/HelixUtils.java](https://codecov.io/gh/apache/gobblin/pull/3528/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSGVsaXhVdGlscy5qYXZh) | `31.70% <0.00%> (-4.27%)` | :arrow_down: |
   | [...data/management/copy/RecursiveCopyableDataset.java](https://codecov.io/gh/apache/gobblin/pull/3528/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvUmVjdXJzaXZlQ29weWFibGVEYXRhc2V0LmphdmE=) | `86.36% <0.00%> (-3.41%)` | :arrow_down: |
   | [.../apache/gobblin/runtime/api/JobExecutionState.java](https://codecov.io/gh/apache/gobblin/pull/3528/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL0pvYkV4ZWN1dGlvblN0YXRlLmphdmE=) | `79.43% <0.00%> (-0.94%)` | :arrow_down: |
   | [...anagement/copy/replication/ConfigBasedDataset.java](https://codecov.io/gh/apache/gobblin/pull/3528/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvcmVwbGljYXRpb24vQ29uZmlnQmFzZWREYXRhc2V0LmphdmE=) | `68.87% <0.00%> (ø)` | |
   | [.../apache/gobblin/metadata/types/GlobalMetadata.java](https://codecov.io/gh/apache/gobblin/pull/3528/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tbWV0YWRhdGEvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vbWV0YWRhdGEvdHlwZXMvR2xvYmFsTWV0YWRhdGEuamF2YQ==) | | |
   | [...apache/gobblin/couchbase/common/TupleDocument.java](https://codecov.io/gh/apache/gobblin/pull/3528/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tY291Y2hiYXNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvdWNoYmFzZS9jb21tb24vVHVwbGVEb2N1bWVudC5qYXZh) | | |
   | [.../service/modules/orchestration/AzkabanSuccess.java](https://codecov.io/gh/apache/gobblin/pull/3528/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tYXprYWJhbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9zZXJ2aWNlL21vZHVsZXMvb3JjaGVzdHJhdGlvbi9BemthYmFuU3VjY2Vzcy5qYXZh) | | |
   | ... and [647 more](https://codecov.io/gh/apache/gobblin/pull/3528/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/gobblin/pull/3528?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/gobblin/pull/3528?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [7aad1f9...a75e5ef](https://codecov.io/gh/apache/gobblin/pull/3528?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3528: [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time…

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3528:
URL: https://github.com/apache/gobblin/pull/3528#discussion_r926136860


##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java:
##########
@@ -216,6 +223,40 @@ public void testGetFilesAtPath() throws IOException {
       Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
     }
 
+    // test ds of daily/yyyy-MM-dd-HH-mm-ss
+    datePattern = "yyyy-MM-dd-HH-mm-ss";
+    formatter = DateTimeFormat.forPattern(datePattern);
+    endDate = LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
+
+    candidateFiles = new HashSet<>();
+    for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
+      String startDate = endDate.minusDays(i).withMinuteOfHour(random.nextInt(60)).withSecondOfMinute(random.nextInt(60)).toString(formatter);

Review Comment:
   ah, yes.  makes sense--good use for `random`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3528: [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time…

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3528:
URL: https://github.com/apache/gobblin/pull/3528#discussion_r925243204


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java:
##########
@@ -49,14 +52,11 @@ public class TimeAwareRecursiveCopyableDataset extends RecursiveCopyableDataset
   private final String lookbackTime;
   private final String datePattern;
   private final Period lookbackPeriod;
-  private final boolean isPatternDaily;
-  private final boolean isPatternHourly;
-  private final boolean isPatternMinutely;
   private final LocalDateTime currentTime;
-  private final DatePattern pattern;
+  private final DatePattern patternQualifier;
 
   enum DatePattern {
-    MINUTELY, HOURLY, DAILY
+    SECONDLY, MINUTELY, HOURLY, DAILY

Review Comment:
   I can't think of better names (and understand how these result from applying a general rule), but I do stumble when reading these according to the common defs, "of the second thing" and "in fine detail" (!) ;)



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java:
##########
@@ -108,6 +101,9 @@ public LocalDateTime next() {
       LocalDateTime dateTime = startDate;
 
       switch (datePattern) {
+        case SECONDLY:
+          startDate = startDate.plusSeconds(1);
+          break;
         case MINUTELY:
           startDate = startDate.plusMinutes(1);

Review Comment:
   it's interesting to see how firmly tied this finder is to specific time semantics (especially time zones).  if e.g. the dir producer were not timezone aware (or used a broken/presumed calculation) to create a date folder like 2:30am on the day the finder's TZ actually has it "springing forward" (from 2am -> 3am) due to DST, this finder would miss that folder.  probably not a big risk, all things considered, but worth noting



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java:
##########
@@ -216,6 +223,40 @@ public void testGetFilesAtPath() throws IOException {
       Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
     }
 
+    // test ds of daily/yyyy-MM-dd-HH-mm-ss
+    datePattern = "yyyy-MM-dd-HH-mm-ss";
+    formatter = DateTimeFormat.forPattern(datePattern);
+    endDate = LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
+
+    candidateFiles = new HashSet<>();
+    for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
+      String startDate = endDate.minusDays(i).withMinuteOfHour(random.nextInt(60)).withSecondOfMinute(random.nextInt(60)).toString(formatter);

Review Comment:
   does using random values mean the test is not necessarily deterministic?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java:
##########
@@ -128,55 +124,83 @@ public void remove() {
     }
   }
 
-  private boolean isDatePatternHourly(String datePattern) {
-    DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
-    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 0, 0);
-    String refDateTimeString = refDateTime.toString(formatter);
-    LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHourOfDay(0);
-    String refDateTimeStringAtStartOfDay = refDateTimeAtStartOfDay.toString(formatter);
-    return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
-  }
-
-  private boolean isDatePatternMinutely(String datePattern) {
+  private DatePattern validateLookbackWithDatePatternFormat(String datePattern, String lookbackTime) {
     DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
-    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 59, 0);
+    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 31, 10, 59, 59);
     String refDateTimeString = refDateTime.toString(formatter);
-    LocalDateTime refDateTimeAtStartOfHour = refDateTime.withMinuteOfHour(0);
-    String refDateTimeStringAtStartOfHour = refDateTimeAtStartOfHour.toString(formatter);
-    return !refDateTimeString.equals(refDateTimeStringAtStartOfHour);
-  }
+    PeriodFormatterBuilder formatterBuilder;
 
-  private boolean isLookbackTimeStringDaily(String lookbackTime) {
-    PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter();
-    try {
-      periodFormatter.parsePeriod(lookbackTime);
-      return true;
-    } catch (Exception e) {
-      return false;
+    // Validate that the lookback is supported for the time format
+    if (!refDateTime.withSecondOfMinute(0).toString(formatter).equals(refDateTimeString)) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m").appendSeconds().appendSuffix("s");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly or minutely or secondly format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.SECONDLY;
+    } else if (!refDateTime.withMinuteOfHour(0).toString(formatter).equals(refDateTimeString)) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly or minutely format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.MINUTELY;
+    } else if (!refDateTime.withHourOfDay(0).toString(formatter).equals(refDateTimeString)) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.HOURLY;
+    } else if (!refDateTime.withDayOfMonth(1).toString(formatter).equals(refDateTimeString) ) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.DAILY;
     }
+    return null;
   }
 
-  private boolean isLookbackTimeStringHourly(String lookbackTime) {
-    PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+  private boolean lookbackTimeMatchesFormat(PeriodFormatterBuilder formatterBuilder, String lookbackTime) {
     try {
-      periodFormatter.parsePeriod(lookbackTime);
-      return true;
-    } catch (Exception e) {
+      formatterBuilder.toFormatter().parsePeriod(lookbackTime);
+    } catch (IllegalArgumentException e) {
       return false;
     }
+    return true;
   }
 
   @Override
   protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException {
-    DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
+    DateTimeFormatter formatter = DateTimeFormat.forPattern(this.datePattern);
     LocalDateTime endDate = currentTime;
     LocalDateTime startDate = endDate.minus(this.lookbackPeriod);
-
-    DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, pattern);
     List<FileStatus> fileStatuses = Lists.newArrayList();
-    while (dateRangeIterator.hasNext()) {
-      Path pathWithDateTime = new Path(path, dateRangeIterator.next().toString(formatter));
-      fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime, fileFilter));
+
+    // Data inside of nested folders representing timestamps need to be fetched differently
+    if (datePattern.contains(FileSystems.getDefault().getSeparator())) {
+      // Use an iterator that traverses through all times from lookback to current time, based on format
+      DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, this.patternQualifier);
+      while (dateRangeIterator.hasNext()) {
+        Path pathWithDateTime = new Path(path, dateRangeIterator.next().toString(formatter));
+        if (!fs.exists(pathWithDateTime)) {
+          continue;
+        }
+        fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime, fileFilter));
+      }
+    } else {
+      // Look at the top level directories and compare if those fit into the date format
+      Iterator<FileStatus> folderIterator = Arrays.asList(fs.listStatus(path)).iterator();
+      while (folderIterator.hasNext()) {
+        Path folderPath = folderIterator.next().getPath();
+        String datePath = folderPath.getName().split("/")[folderPath.getName().split("/").length -1];
+        try {
+          LocalDateTime folderDate = formatter.parseLocalDateTime(datePath);
+          if (folderDate.isAfter(startDate) && folderDate.isBefore(endDate)) {

Review Comment:
   the iterator based approach was `!isAfter(endDate)`, therefore wouldn't equivalent here be "is before or equal to" `endDate`?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java:
##########
@@ -128,55 +124,83 @@ public void remove() {
     }
   }
 
-  private boolean isDatePatternHourly(String datePattern) {
-    DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
-    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 0, 0);
-    String refDateTimeString = refDateTime.toString(formatter);
-    LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHourOfDay(0);
-    String refDateTimeStringAtStartOfDay = refDateTimeAtStartOfDay.toString(formatter);
-    return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
-  }
-
-  private boolean isDatePatternMinutely(String datePattern) {
+  private DatePattern validateLookbackWithDatePatternFormat(String datePattern, String lookbackTime) {
     DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
-    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 59, 0);
+    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 31, 10, 59, 59);
     String refDateTimeString = refDateTime.toString(formatter);
-    LocalDateTime refDateTimeAtStartOfHour = refDateTime.withMinuteOfHour(0);
-    String refDateTimeStringAtStartOfHour = refDateTimeAtStartOfHour.toString(formatter);
-    return !refDateTimeString.equals(refDateTimeStringAtStartOfHour);
-  }
+    PeriodFormatterBuilder formatterBuilder;
 
-  private boolean isLookbackTimeStringDaily(String lookbackTime) {
-    PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter();
-    try {
-      periodFormatter.parsePeriod(lookbackTime);
-      return true;
-    } catch (Exception e) {
-      return false;
+    // Validate that the lookback is supported for the time format
+    if (!refDateTime.withSecondOfMinute(0).toString(formatter).equals(refDateTimeString)) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m").appendSeconds().appendSuffix("s");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly or minutely or secondly format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.SECONDLY;
+    } else if (!refDateTime.withMinuteOfHour(0).toString(formatter).equals(refDateTimeString)) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly or minutely format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.MINUTELY;
+    } else if (!refDateTime.withHourOfDay(0).toString(formatter).equals(refDateTimeString)) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.HOURLY;
+    } else if (!refDateTime.withDayOfMonth(1).toString(formatter).equals(refDateTimeString) ) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.DAILY;
     }
+    return null;
   }
 
-  private boolean isLookbackTimeStringHourly(String lookbackTime) {
-    PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+  private boolean lookbackTimeMatchesFormat(PeriodFormatterBuilder formatterBuilder, String lookbackTime) {
     try {
-      periodFormatter.parsePeriod(lookbackTime);
-      return true;
-    } catch (Exception e) {
+      formatterBuilder.toFormatter().parsePeriod(lookbackTime);
+    } catch (IllegalArgumentException e) {
       return false;
     }
+    return true;
   }
 
   @Override
   protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException {
-    DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
+    DateTimeFormatter formatter = DateTimeFormat.forPattern(this.datePattern);
     LocalDateTime endDate = currentTime;
     LocalDateTime startDate = endDate.minus(this.lookbackPeriod);
-
-    DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, pattern);
     List<FileStatus> fileStatuses = Lists.newArrayList();
-    while (dateRangeIterator.hasNext()) {
-      Path pathWithDateTime = new Path(path, dateRangeIterator.next().toString(formatter));
-      fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime, fileFilter));
+
+    // Data inside of nested folders representing timestamps need to be fetched differently
+    if (datePattern.contains(FileSystems.getDefault().getSeparator())) {
+      // Use an iterator that traverses through all times from lookback to current time, based on format
+      DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, this.patternQualifier);
+      while (dateRangeIterator.hasNext()) {
+        Path pathWithDateTime = new Path(path, dateRangeIterator.next().toString(formatter));
+        if (!fs.exists(pathWithDateTime)) {

Review Comment:
   wow, that's a lot of FS fstat access: from 1440 when by-minute to 86400 when by-second--and that's just each day!  I'm sure we all agree "premature optimization is the root of all evil"... but still I have serious concerns that such scanning will exceed the acceptable load on the host FS.  worst would be if users default to this as a way to 'keep their options open', even when in practice they don't require nearly such fine-grained differentiation between the dirs they create.
   
   (clearly my suggestions here imply a fundamental rethinking of the impl...)
   
   first off I note that the iterator's stride is always just one.  a sequential scan of the entire space is generally unnecessary merely to test interval membership.  I expect that was the same intuition leading you to develop the `else` case (when no dir separator detected)... I'm just suggesting you go farther.
   
   overall my expectation is of an infrequent/sparse hit-rate from this probing, since most producers won't be creating a new snapshot/partition every/most seconds or every/most minutes.  if borne out, it would likely be far more efficient (although somewhat more complex to implement) targeted directory child listing, with filtering of the results, as opposed to constructing every possible concrete embodiment, only to find the vast majority not actually to exist.
   
   if you feel such concerns are warranted, I'm happy to suggest simple/efficient implementation for targeted dir listing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3528: [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time…

Posted by GitBox <gi...@apache.org>.
ZihanLi58 commented on code in PR #3528:
URL: https://github.com/apache/gobblin/pull/3528#discussion_r925039376


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java:
##########
@@ -128,55 +124,83 @@ public void remove() {
     }
   }
 
-  private boolean isDatePatternHourly(String datePattern) {
-    DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
-    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 0, 0);
-    String refDateTimeString = refDateTime.toString(formatter);
-    LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHourOfDay(0);
-    String refDateTimeStringAtStartOfDay = refDateTimeAtStartOfDay.toString(formatter);
-    return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
-  }
-
-  private boolean isDatePatternMinutely(String datePattern) {
+  private DatePattern validateLookbackWithDatePatternFormat(String datePattern, String lookbackTime) {
     DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
-    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 59, 0);
+    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 31, 10, 59, 59);
     String refDateTimeString = refDateTime.toString(formatter);
-    LocalDateTime refDateTimeAtStartOfHour = refDateTime.withMinuteOfHour(0);
-    String refDateTimeStringAtStartOfHour = refDateTimeAtStartOfHour.toString(formatter);
-    return !refDateTimeString.equals(refDateTimeStringAtStartOfHour);
-  }
+    PeriodFormatterBuilder formatterBuilder;
 
-  private boolean isLookbackTimeStringDaily(String lookbackTime) {
-    PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter();
-    try {
-      periodFormatter.parsePeriod(lookbackTime);
-      return true;
-    } catch (Exception e) {
-      return false;
+    // Validate that the lookback is supported for the time format
+    if (!refDateTime.withSecondOfMinute(0).toString(formatter).equals(refDateTimeString)) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m").appendSeconds().appendSuffix("s");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly or minutely or secondly format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.SECONDLY;
+    } else if (!refDateTime.withMinuteOfHour(0).toString(formatter).equals(refDateTimeString)) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly or minutely format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.MINUTELY;
+    } else if (!refDateTime.withHourOfDay(0).toString(formatter).equals(refDateTimeString)) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.HOURLY;
+    } else if (!refDateTime.withDayOfMonth(1).toString(formatter).equals(refDateTimeString) ) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily format, check %s", LOOKBACK_TIME_KEY));
+      }
+      return DatePattern.DAILY;
     }
+    return null;
   }
 
-  private boolean isLookbackTimeStringHourly(String lookbackTime) {
-    PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+  private boolean lookbackTimeMatchesFormat(PeriodFormatterBuilder formatterBuilder, String lookbackTime) {
     try {
-      periodFormatter.parsePeriod(lookbackTime);
-      return true;
-    } catch (Exception e) {
+      formatterBuilder.toFormatter().parsePeriod(lookbackTime);
+    } catch (IllegalArgumentException e) {
       return false;
     }
+    return true;
   }
 
   @Override
   protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException {
-    DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
+    DateTimeFormatter formatter = DateTimeFormat.forPattern(this.datePattern);
     LocalDateTime endDate = currentTime;
     LocalDateTime startDate = endDate.minus(this.lookbackPeriod);
-
-    DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, pattern);
     List<FileStatus> fileStatuses = Lists.newArrayList();
-    while (dateRangeIterator.hasNext()) {
-      Path pathWithDateTime = new Path(path, dateRangeIterator.next().toString(formatter));
-      fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime, fileFilter));
+
+    // Data inside of nested folders representing timestamps need to be fetched differently
+    if (datePattern.contains(FileSystems.getDefault().getSeparator())) {
+      // Use an iterator that traverses through all times from lookback to current time, based on format
+      DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, this.patternQualifier);
+      while (dateRangeIterator.hasNext()) {
+        Path pathWithDateTime = new Path(path, dateRangeIterator.next().toString(formatter));
+        if (!fs.exists(pathWithDateTime)) {
+          continue;
+        }
+        fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime, fileFilter));
+      }
+    } else {
+      // Look at the top level directories and compare if those fit into the date format
+      Iterator<FileStatus> folderIterator = Arrays.asList(fs.listStatus(path)).iterator();
+      while (folderIterator.hasNext()) {
+        Path folderPath = folderIterator.next().getPath();
+        String datePath = folderPath.getName().split("/")[folderPath.getName().split("/").length -1];

Review Comment:
   What's this used for? isn't Path.getName() directly return you the name of current folder?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org