You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ku...@apache.org on 2020/03/25 20:38:34 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1096] Work with DST change in compaction watermark

This is an automated email from the ASF dual-hosted git repository.

kuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a88add0  [GOBBLIN-1096] Work with DST change in compaction watermark
a88add0 is described below

commit a88add02b67aad6949a6b1d5b1a92bfb870b8c56
Author: zhchen <zh...@linkedin.com>
AuthorDate: Wed Mar 25 13:38:20 2020 -0700

    [GOBBLIN-1096] Work with DST change in compaction watermark
    
    Closes #2937 from zxcware/dst
---
 .../action/CompactionWatermarkAction.java          |  7 +++-
 .../action/CompactionWatermarkActionTest.java      | 48 ++++++++++++++++++++++
 2 files changed, 53 insertions(+), 2 deletions(-)

diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
index 2c6e1c2..cd3d98d 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
@@ -28,6 +28,7 @@ import com.google.common.base.Optional;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.compaction.verify.CompactionWatermarkChecker;
 import org.apache.gobblin.configuration.State;
@@ -56,11 +57,13 @@ public class CompactionWatermarkAction implements CompactionCompleteAction<FileS
   private State state;
   private final String defaultHiveDb;
   private final TimeIterator.Granularity granularity;
+  private final ZoneId zone;
 
   public CompactionWatermarkAction(State state) {
     this.state = state;
     defaultHiveDb = state.getProp(DEFAULT_HIVE_DB);
     granularity = TimeIterator.Granularity.valueOf(state.getProp(GRANULARITY).toUpperCase());
+    zone = ZoneId.of(state.getProp(MRCompactor.COMPACTION_TIMEZONE, MRCompactor.DEFAULT_COMPACTION_TIMEZONE));
   }
 
   @Override
@@ -138,8 +141,8 @@ public class CompactionWatermarkAction implements CompactionCompleteAction<FileS
    * unit of {@link #granularity}
    */
   private long getExpectedNextWatermark(Long previousWatermark) {
-    ZonedDateTime previousWatermarkTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(previousWatermark),
-        ZoneId.systemDefault());
+    ZonedDateTime previousWatermarkTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(previousWatermark), zone);
+    // Since version 1.8, java time supports DST change in PST(America/Los_Angeles) time zone
     ZonedDateTime nextWatermarkTime = TimeIterator.inc(previousWatermarkTime, granularity, 1);
     return nextWatermarkTime.toInstant().toEpochMilli();
   }
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java
index 038268c..16f33d6 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java
@@ -101,6 +101,54 @@ public class CompactionWatermarkActionTest {
     doWatermarkTest(action, fsDataset, state, actualWatermark, actualWatermark);
   }
 
+  @Test
+  public void testWatermarkWithDST() throws Exception {
+    // Test case 1
+    // Time zone: PST(America/Los_Angeles)
+    // Existing watermark millis: 1583654399999 (2020-03-07 23:59:59.999 PST)
+    // Actual watermark millis: 1583737199999 (2020-03-08 23:59:59.999 PST) with DST
+    testWatermarkWithDSTTimeZone("America/Los_Angeles", "1583654399999", "1583737199999");
+    // Test case 2
+    // Time zone: UTC
+    // Existing watermark millis: 1583625599999 (2020-03-07 23:59:59.999 UTC)
+    // Actual watermark millis: 1583711999999 (2020-03-08 23:59:59.999 UTC)
+    testWatermarkWithDSTTimeZone("UTC", "1583625599999", "1583711999999");
+  }
+
+  private void testWatermarkWithDSTTimeZone(String timeZone, String existingWatermark, String actualWatermark)
+      throws Exception {
+    String db = "db1";
+    String table = "table1";
+    String dataset = "db1/table1";
+    State state = new State();
+    String defaultDb = "tracking";
+    state.setProp(CompactionWatermarkAction.DEFAULT_HIVE_DB, defaultDb);
+
+    String inputDir = "/data/tracking";
+    String inputSubDir = "hourly";
+    String destSubDir = "daily";
+    String datasetPath = String.format("%s/%s/%s/2020/03/08", inputDir, dataset, inputSubDir);
+    state.setProp(MRCompactor.COMPACTION_INPUT_DIR, inputDir);
+    state.setProp(MRCompactor.COMPACTION_DEST_DIR, inputDir);
+    state.setProp(MRCompactor.COMPACTION_INPUT_SUBDIR, inputSubDir);
+    state.setProp(MRCompactor.COMPACTION_DEST_SUBDIR, destSubDir);
+    state.setProp(HiveRegister.HIVE_REGISTER_TYPE, MockHiveRegister.class.getName());
+    state.setProp(CompactionWatermarkAction.GRANULARITY, "DAY");
+    state.setProp(MRCompactor.COMPACTION_TIMEZONE, timeZone);
+
+    State tableProps = new State();
+    tableProps.setProp(compactionWatermark, existingWatermark);
+    tableProps.setProp(completionCompactionWatermark, existingWatermark);
+    HiveTable existingTable = new HiveTable.Builder().withDbName(db).withTableName(table)
+        .withProps(tableProps).build();
+    MockHiveRegister.existingTable = existingTable;
+
+    CompactionWatermarkAction action = new CompactionWatermarkAction(state);
+    FileSystemDataset fsDataset = new SimpleFileSystemDataset(new Path(datasetPath));
+
+    doWatermarkTest(action, fsDataset, state, actualWatermark, actualWatermark);
+  }
+
   private void doWatermarkTest(CompactionWatermarkAction action, FileSystemDataset fsDataset,
       State state, String actualWatermark, String expectedWatermark)
       throws Exception {