You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/07/21 20:16:00 UTC

[gobblin] branch master updated: [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time… (#3528)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 930055460 [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time… (#3528)
930055460 is described below

commit 930055460043bd7427508e6b3a47078df50c0f00
Author: William Lo <lo...@gmail.com>
AuthorDate: Thu Jul 21 13:15:54 2022 -0700

    [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time… (#3528)
    
    * Clean up TimeAwareRecursiveCopyableDataset to support seconds in timestamp, squash logs for timestamp paths that do not exist, and improve efficiency on search for non-nested timestamps
    
    * Fix checkstyle
    
    * Address review
    
    * Refactor based on reviews to handle every date pattern format recursively without the iterator
    
    * Calculate start and end date in helper
---
 .../copy/TimeAwareRecursiveCopyableDataset.java    | 190 ++++++++++-----------
 .../management/copy/DateRangeIteratorTest.java     |  67 --------
 .../TimeAwareRecursiveCopyableDatasetTest.java     |  45 ++++-
 3 files changed, 133 insertions(+), 169 deletions(-)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
index 73bc721e8..e0cdd9899 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
@@ -18,10 +18,13 @@
 package org.apache.gobblin.data.management.copy;
 
 import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -34,11 +37,11 @@ import org.joda.time.format.DateTimeFormatter;
 import org.joda.time.format.PeriodFormatter;
 import org.joda.time.format.PeriodFormatterBuilder;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 
+@Slf4j
 public class TimeAwareRecursiveCopyableDataset extends RecursiveCopyableDataset {
   private static final String CONFIG_PREFIX = CopyConfiguration.COPY_PREFIX + ".recursive";
   public static final String DATE_PATTERN_KEY = CONFIG_PREFIX + ".date.pattern";
@@ -49,134 +52,119 @@ public class TimeAwareRecursiveCopyableDataset extends RecursiveCopyableDataset
   private final String lookbackTime;
   private final String datePattern;
   private final Period lookbackPeriod;
-  private final boolean isPatternDaily;
-  private final boolean isPatternHourly;
-  private final boolean isPatternMinutely;
   private final LocalDateTime currentTime;
-  private final DatePattern pattern;
-
-  enum DatePattern {
-    MINUTELY, HOURLY, DAILY
-  }
 
   public TimeAwareRecursiveCopyableDataset(FileSystem fs, Path rootPath, Properties properties, Path glob) {
     super(fs, rootPath, properties, glob);
     this.lookbackTime = properties.getProperty(LOOKBACK_TIME_KEY);
-    PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m").toFormatter();
+    PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays()
+        .appendSuffix("d")
+        .appendHours()
+        .appendSuffix("h")
+        .appendMinutes()
+        .appendSuffix("m")
+        .toFormatter();
     this.lookbackPeriod = periodFormatter.parsePeriod(lookbackTime);
     this.datePattern = properties.getProperty(DATE_PATTERN_KEY);
-    this.isPatternMinutely = isDatePatternMinutely(datePattern);
-    this.isPatternHourly = !this.isPatternMinutely && isDatePatternHourly(datePattern);
-    this.isPatternDaily = !this.isPatternMinutely && !this.isPatternHourly;
+
     this.currentTime = properties.containsKey(DATE_PATTERN_TIMEZONE_KEY) ? LocalDateTime.now(
         DateTimeZone.forID(DATE_PATTERN_TIMEZONE_KEY))
         : LocalDateTime.now(DateTimeZone.forID(DEFAULT_DATE_PATTERN_TIMEZONE));
 
-    if (this.isPatternDaily) {
-      Preconditions.checkArgument(isLookbackTimeStringDaily(this.lookbackTime), "Expected day format for lookback time; found hourly or minutely format");
-      pattern = DatePattern.DAILY;
-    } else if (this.isPatternHourly) {
-      Preconditions.checkArgument(isLookbackTimeStringHourly(this.lookbackTime), "Expected hourly format for lookback time; found minutely format");
-      pattern = DatePattern.HOURLY;
-    } else {
-      pattern = DatePattern.MINUTELY;
-    }
+    this.validateLookbackWithDatePatternFormat(this.datePattern, this.lookbackTime);
   }
 
-  /**
-   * TODO: Replace it with {@link org.apache.gobblin.time.TimeIterator} as {@link LocalDateTime} will not adjust time
-   * to a given time zone
-   */
-  public static class DateRangeIterator implements Iterator {
-    private LocalDateTime startDate;
-    private LocalDateTime endDate;
-    private DatePattern datePattern;
-
-    public DateRangeIterator(LocalDateTime startDate, LocalDateTime endDate, DatePattern datePattern) {
-      this.startDate = startDate;
-      this.endDate = endDate;
-      this.datePattern = datePattern;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return !startDate.isAfter(endDate);
-    }
-
-    @Override
-    public LocalDateTime next() {
-      LocalDateTime dateTime = startDate;
-
-      switch (datePattern) {
-        case MINUTELY:
-          startDate = startDate.plusMinutes(1);
-          break;
-        case HOURLY:
-          startDate = startDate.plusHours(1);
-          break;
-        case DAILY:
-          startDate = startDate.plusDays(1);
-          break;
-      }
-
-      return dateTime;
-    }
-
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  private boolean isDatePatternHourly(String datePattern) {
+  void validateLookbackWithDatePatternFormat(String datePattern, String lookbackTime) {
     DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
-    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 0, 0);
+    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 31, 10, 59, 59);
     String refDateTimeString = refDateTime.toString(formatter);
-    LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHourOfDay(0);
-    String refDateTimeStringAtStartOfDay = refDateTimeAtStartOfDay.toString(formatter);
-    return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
-  }
-
-  private boolean isDatePatternMinutely(String datePattern) {
-    DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
-    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 59, 0);
-    String refDateTimeString = refDateTime.toString(formatter);
-    LocalDateTime refDateTimeAtStartOfHour = refDateTime.withMinuteOfHour(0);
-    String refDateTimeStringAtStartOfHour = refDateTimeAtStartOfHour.toString(formatter);
-    return !refDateTimeString.equals(refDateTimeStringAtStartOfHour);
-  }
-
-  private boolean isLookbackTimeStringDaily(String lookbackTime) {
-    PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter();
-    try {
-      periodFormatter.parsePeriod(lookbackTime);
-      return true;
-    } catch (Exception e) {
-      return false;
+    PeriodFormatterBuilder formatterBuilder;
+
+    // Validate that the lookback is supported for the time format
+    if (!refDateTime.withSecondOfMinute(0).toString(formatter).equals(refDateTimeString)) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays()
+          .appendSuffix("d")
+          .appendHours()
+          .appendSuffix("h")
+          .appendMinutes()
+          .appendSuffix("m")
+          .appendSeconds()
+          .appendSuffix("s");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly or minutely or secondly format, check %s",
+            LOOKBACK_TIME_KEY));
+      }
+    } else if (!refDateTime.withMinuteOfHour(0).toString(formatter).equals(refDateTimeString)) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays()
+          .appendSuffix("d")
+          .appendHours()
+          .appendSuffix("h")
+          .appendMinutes()
+          .appendSuffix("m");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly or minutely format, check %s",
+            LOOKBACK_TIME_KEY));
+      }
+    } else if (!refDateTime.withHourOfDay(0).toString(formatter).equals(refDateTimeString)) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly format, check %s", LOOKBACK_TIME_KEY));
+      }
+    } else if (!refDateTime.withDayOfMonth(1).toString(formatter).equals(refDateTimeString)) {
+      formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d");
+      if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+        throw new IllegalArgumentException(String.format("Expected lookback time to be in daily format, check %s", LOOKBACK_TIME_KEY));
+      }
     }
   }
 
-  private boolean isLookbackTimeStringHourly(String lookbackTime) {
-    PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+  private boolean lookbackTimeMatchesFormat(PeriodFormatterBuilder formatterBuilder, String lookbackTime) {
     try {
-      periodFormatter.parsePeriod(lookbackTime);
-      return true;
-    } catch (Exception e) {
+      formatterBuilder.toFormatter().parsePeriod(lookbackTime);
+    } catch (IllegalArgumentException e) {
       return false;
     }
+    return true;
   }
 
   @Override
   protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException {
-    DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
     LocalDateTime endDate = currentTime;
-    LocalDateTime startDate = endDate.minus(this.lookbackPeriod);
+    DateTimeFormatter formatter = DateTimeFormat.forPattern(this.datePattern);
+    LocalDateTime startDate = formatter.parseLocalDateTime(endDate.minus(this.lookbackPeriod).toString(this.datePattern));
+    return recursivelyGetFilesAtDatePath(fs, path, "", fileFilter, 1, startDate, endDate, formatter);
+  }
 
-    DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, pattern);
+  private List<FileStatus> recursivelyGetFilesAtDatePath(FileSystem fs, Path path, String traversedDatePath, PathFilter fileFilter,
+      int level,  LocalDateTime startDate, LocalDateTime endDate, DateTimeFormatter formatter) throws IOException {
     List<FileStatus> fileStatuses = Lists.newArrayList();
-    while (dateRangeIterator.hasNext()) {
-      Path pathWithDateTime = new Path(path, dateRangeIterator.next().toString(formatter));
-      fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime, fileFilter));
+    Iterator<FileStatus> folderIterator = Arrays.asList(fs.listStatus(path)).iterator();
+
+    // Check if at the lowest level/granularity of the date folder
+    if (this.datePattern.split(FileSystems.getDefault().getSeparator()).length == level) {
+      // Truncate the start date to the most granular unit of time in the datepattern
+      while (folderIterator.hasNext()) {
+        Path folderPath = folderIterator.next().getPath();
+        String datePath = traversedDatePath.isEmpty() ? folderPath.getName() : new Path(traversedDatePath, folderPath.getName()).toString();
+        try {
+          LocalDateTime folderDate = formatter.parseLocalDateTime(datePath);
+          if (!folderDate.isBefore(startDate) && !folderDate.isAfter(endDate)) {
+            fileStatuses.addAll(super.getFilesAtPath(fs, folderPath, fileFilter));
+          }
+        } catch (IllegalArgumentException e) {
+          log.warn(String.format(
+              "Folder at path %s is not convertible to format %s. Please confirm that argument %s is valid", datePath,
+              this.datePattern, DATE_PATTERN_KEY));
+        }
+      }
+    } else {
+      // folder has a format such as yyyy/mm/dd/hh, so recursively find date paths
+      while (folderIterator.hasNext()) {
+        // Start building the date from top-down
+        String nextDate = folderIterator.next().getPath().getName();
+        String datePath = traversedDatePath.isEmpty() ?  nextDate : new Path(traversedDatePath, nextDate).toString();
+        fileStatuses.addAll(recursivelyGetFilesAtDatePath(fs, new Path(path, nextDate), datePath, fileFilter, level + 1, startDate, endDate, formatter));
+      }
     }
     return fileStatuses;
   }
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
deleted file mode 100644
index 47e72d672..000000000
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.data.management.copy;
-
-import org.joda.time.LocalDateTime;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.*;
-
-
-public class DateRangeIteratorTest {
-
-  @Test
-  public void testIterator() {
-    LocalDateTime endDate = new LocalDateTime(2017, 1, 1, 0, 0, 0);
-    LocalDateTime startDate = endDate.minusHours(2);
-    String datePattern = "HH/yyyy/MM/dd";
-    DateTimeFormatter format = DateTimeFormat.forPattern(datePattern);
-    TimeAwareRecursiveCopyableDataset.DateRangeIterator dateRangeIterator =
-        new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, TimeAwareRecursiveCopyableDataset.DatePattern.HOURLY);
-    LocalDateTime dateTime = dateRangeIterator.next();
-    Assert.assertEquals(dateTime.toString(format), "22/2016/12/31");
-    dateTime = dateRangeIterator.next();
-    Assert.assertEquals(dateTime.toString(format), "23/2016/12/31");
-    dateTime = dateRangeIterator.next();
-    Assert.assertEquals(dateTime.toString(format), "00/2017/01/01");
-    Assert.assertEquals(dateRangeIterator.hasNext(), false);
-
-    datePattern = "yyyy/MM/dd";
-    format = DateTimeFormat.forPattern(datePattern);
-    startDate = endDate.minusDays(1);
-    dateRangeIterator = new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, TimeAwareRecursiveCopyableDataset.DatePattern.DAILY);
-    dateTime = dateRangeIterator.next();
-    Assert.assertEquals(dateTime.toString(format), "2016/12/31");
-    dateTime = dateRangeIterator.next();
-    Assert.assertEquals(dateTime.toString(format), "2017/01/01");
-    Assert.assertEquals(dateRangeIterator.hasNext(), false);
-
-    datePattern = "yyyy-MM-dd-HH-mm";
-    format = DateTimeFormat.forPattern(datePattern);
-    startDate = endDate.minusHours(1);
-    dateRangeIterator = new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, TimeAwareRecursiveCopyableDataset.DatePattern.MINUTELY);
-    dateTime = dateRangeIterator.next();
-    Assert.assertEquals(dateTime.toString(format), "2016-12-31-23-00");
-    dateTime = dateRangeIterator.next();
-    Assert.assertEquals(dateTime.toString(format), "2016-12-31-23-01");
-    Assert.assertEquals(dateRangeIterator.hasNext(), true);
-  }
-}
\ No newline at end of file
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
index b684936bb..521caaafe 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
@@ -24,6 +24,7 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -44,12 +45,13 @@ import org.testng.annotations.Test;
 import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.filters.HiddenFilter;
 
-
+@Slf4j
 public class TimeAwareRecursiveCopyableDatasetTest {
   private FileSystem fs;
   private Path baseDir1;
   private Path baseDir2;
   private Path baseDir3;
+  private Path baseDir4;
 
   private static final String NUM_LOOKBACK_DAYS_STR = "2d";
   private static final Integer NUM_LOOKBACK_DAYS = 2;
@@ -85,6 +87,12 @@ public class TimeAwareRecursiveCopyableDatasetTest {
       fs.delete(baseDir3, true);
     }
     fs.mkdirs(baseDir3);
+
+    baseDir4 = new Path("/tmp/src/ds3/daily");
+    if (fs.exists(baseDir4)) {
+      fs.delete(baseDir4, true);
+    }
+    fs.mkdirs(baseDir4);
     PeriodFormatter formatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
     Period period = formatter.parsePeriod(NUM_LOOKBACK_DAYS_HOURS_STR);
   }
@@ -216,6 +224,40 @@ public class TimeAwareRecursiveCopyableDatasetTest {
       Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
     }
 
+    // test ds of daily/yyyy-MM-dd-HH-mm-ss
+    datePattern = "yyyy-MM-dd-HH-mm-ss";
+    formatter = DateTimeFormat.forPattern(datePattern);
+    endDate = LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
+
+    candidateFiles = new HashSet<>();
+    for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
+      String startDate = endDate.minusDays(i).withMinuteOfHour(random.nextInt(60)).withSecondOfMinute(random.nextInt(60)).toString(formatter);
+      if (i == 0) {
+        // avoid future dates on minutes, so have consistency test result
+        startDate = endDate.minusHours(i).withMinuteOfHour(0).withSecondOfMinute(0).toString(formatter);
+      }
+      Path subDirPath = new Path(baseDir4, new Path(startDate));
+      fs.mkdirs(subDirPath);
+      Path filePath = new Path(subDirPath, i + ".avro");
+      fs.create(filePath);
+      if (i < (NUM_LOOKBACK_DAYS + 1)) {
+        candidateFiles.add(filePath.toString());
+      }
+    }
+
+    properties = new Properties();
+    properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, "2d1h");
+    properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy-MM-dd-HH-mm-ss");
+
+    dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir4, properties,
+        new Path("/tmp/src/ds3/daily"));
+
+    fileStatusList = dataset.getFilesAtPath(fs, baseDir4, pathFilter);
+
+    Assert.assertEquals(fileStatusList.size(), NUM_LOOKBACK_DAYS + 1);
+    for (FileStatus fileStatus: fileStatusList) {
+      Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
+    }
   }
 
   @Test (expectedExceptions = IllegalArgumentException.class)
@@ -244,5 +286,6 @@ public class TimeAwareRecursiveCopyableDatasetTest {
     this.fs.delete(baseDir1, true);
     this.fs.delete(baseDir2, true);
     this.fs.delete(baseDir3, true);
+    this.fs.delete(baseDir4, true);
   }
 }
\ No newline at end of file