You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ku...@apache.org on 2020/03/25 20:38:34 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1096] Work with
DST change in compaction watermark
This is an automated email from the ASF dual-hosted git repository.
kuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a88add0 [GOBBLIN-1096] Work with DST change in compaction watermark
a88add0 is described below
commit a88add02b67aad6949a6b1d5b1a92bfb870b8c56
Author: zhchen <zh...@linkedin.com>
AuthorDate: Wed Mar 25 13:38:20 2020 -0700
[GOBBLIN-1096] Work with DST change in compaction watermark
Closes #2937 from zxcware/dst
---
.../action/CompactionWatermarkAction.java | 7 +++-
.../action/CompactionWatermarkActionTest.java | 48 ++++++++++++++++++++++
2 files changed, 53 insertions(+), 2 deletions(-)
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
index 2c6e1c2..cd3d98d 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
@@ -28,6 +28,7 @@ import com.google.common.base.Optional;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.compaction.verify.CompactionWatermarkChecker;
import org.apache.gobblin.configuration.State;
@@ -56,11 +57,13 @@ public class CompactionWatermarkAction implements CompactionCompleteAction<FileS
private State state;
private final String defaultHiveDb;
private final TimeIterator.Granularity granularity;
+ private final ZoneId zone;
public CompactionWatermarkAction(State state) {
this.state = state;
defaultHiveDb = state.getProp(DEFAULT_HIVE_DB);
granularity = TimeIterator.Granularity.valueOf(state.getProp(GRANULARITY).toUpperCase());
+ zone = ZoneId.of(state.getProp(MRCompactor.COMPACTION_TIMEZONE, MRCompactor.DEFAULT_COMPACTION_TIMEZONE));
}
@Override
@@ -138,8 +141,8 @@ public class CompactionWatermarkAction implements CompactionCompleteAction<FileS
* unit of {@link #granularity}
*/
private long getExpectedNextWatermark(Long previousWatermark) {
- ZonedDateTime previousWatermarkTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(previousWatermark),
- ZoneId.systemDefault());
+ ZonedDateTime previousWatermarkTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(previousWatermark), zone);
+ // Since version 1.8, java time supports DST change in PST(America/Los_Angeles) time zone
ZonedDateTime nextWatermarkTime = TimeIterator.inc(previousWatermarkTime, granularity, 1);
return nextWatermarkTime.toInstant().toEpochMilli();
}
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java
index 038268c..16f33d6 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java
@@ -101,6 +101,54 @@ public class CompactionWatermarkActionTest {
doWatermarkTest(action, fsDataset, state, actualWatermark, actualWatermark);
}
+ @Test
+ public void testWatermarkWithDST() throws Exception {
+ // Test case 1
+ // Time zone: PST(America/Los_Angeles)
+ // Existing watermark millis: 1583654399999 (2020-03-07 23:59:59.999 PST)
+ // Actual watermark millis: 1583737199999 (2020-03-08 23:59:59.999 PST) with DST
+ testWatermarkWithDSTTimeZone("America/Los_Angeles", "1583654399999", "1583737199999");
+ // Test case 2
+ // Time zone: UTC
+ // Existing watermark millis: 1583625599999 (2020-03-07 23:59:59.999 UTC)
+ // Actual watermark millis: 1583711999999 (2020-03-08 23:59:59.999 UTC)
+ testWatermarkWithDSTTimeZone("UTC", "1583625599999", "1583711999999");
+ }
+
+ private void testWatermarkWithDSTTimeZone(String timeZone, String existingWatermark, String actualWatermark)
+ throws Exception {
+ String db = "db1";
+ String table = "table1";
+ String dataset = "db1/table1";
+ State state = new State();
+ String defaultDb = "tracking";
+ state.setProp(CompactionWatermarkAction.DEFAULT_HIVE_DB, defaultDb);
+
+ String inputDir = "/data/tracking";
+ String inputSubDir = "hourly";
+ String destSubDir = "daily";
+ String datasetPath = String.format("%s/%s/%s/2020/03/08", inputDir, dataset, inputSubDir);
+ state.setProp(MRCompactor.COMPACTION_INPUT_DIR, inputDir);
+ state.setProp(MRCompactor.COMPACTION_DEST_DIR, inputDir);
+ state.setProp(MRCompactor.COMPACTION_INPUT_SUBDIR, inputSubDir);
+ state.setProp(MRCompactor.COMPACTION_DEST_SUBDIR, destSubDir);
+ state.setProp(HiveRegister.HIVE_REGISTER_TYPE, MockHiveRegister.class.getName());
+ state.setProp(CompactionWatermarkAction.GRANULARITY, "DAY");
+ state.setProp(MRCompactor.COMPACTION_TIMEZONE, timeZone);
+
+ State tableProps = new State();
+ tableProps.setProp(compactionWatermark, existingWatermark);
+ tableProps.setProp(completionCompactionWatermark, existingWatermark);
+ HiveTable existingTable = new HiveTable.Builder().withDbName(db).withTableName(table)
+ .withProps(tableProps).build();
+ MockHiveRegister.existingTable = existingTable;
+
+ CompactionWatermarkAction action = new CompactionWatermarkAction(state);
+ FileSystemDataset fsDataset = new SimpleFileSystemDataset(new Path(datasetPath));
+
+ doWatermarkTest(action, fsDataset, state, actualWatermark, actualWatermark);
+ }
+
private void doWatermarkTest(CompactionWatermarkAction action, FileSystemDataset fsDataset,
State state, String actualWatermark, String expectedWatermark)
throws Exception {