You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/07/29 08:15:37 UTC
incubator-gobblin git commit: Add lead time functionality to
DatePartitionedRetrievers
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 3b64cf70f -> 85d724c64
Add lead time functionality to DatePartitionedRetrievers
Add lead time functionality to
DatePartitionedRetrievers. Directories
that were created after (now() - lead time) will
be ignored by the
source.
minor style tweaks
Closes #2003 from eogren/datepartitionleadtime
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/85d724c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/85d724c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/85d724c6
Branch: refs/heads/master
Commit: 85d724c64dc5ab842992a19bdfa59b3ee3d8c24f
Parents: 3b64cf7
Author: Eric Ogren <eo...@linkedin.com>
Authored: Sat Jul 29 01:15:33 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Sat Jul 29 01:15:33 2017 -0700
----------------------------------------------------------------------
.../source/DatePartitionedNestedRetriever.java | 6 ++-
.../PartitionAwareFileRetrieverUtils.java | 55 ++++++++++++++++++++
.../source/PartitionedFileSourceBase.java | 16 ++++++
.../source/RegexBasedPartitionedRetriever.java | 16 ++++--
.../RegexBasedPartitionedRetrieverTest.java | 26 ++++++++-
.../DatePartitionedAvroFileExtractorTest.java | 43 +++++++++++++--
.../java/gobblin/util/DatePartitionType.java | 19 +++++--
.../gobblin/util/DatePartitionTypeTest.java | 29 +++++++++++
8 files changed, 194 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-core/src/main/java/gobblin/source/DatePartitionedNestedRetriever.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/gobblin/source/DatePartitionedNestedRetriever.java b/gobblin-core/src/main/java/gobblin/source/DatePartitionedNestedRetriever.java
index 0dc2f9c..c0bc1dc 100644
--- a/gobblin-core/src/main/java/gobblin/source/DatePartitionedNestedRetriever.java
+++ b/gobblin-core/src/main/java/gobblin/source/DatePartitionedNestedRetriever.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
+import org.joda.time.Duration;
import org.joda.time.DurationFieldType;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@@ -46,6 +47,7 @@ import gobblin.util.DatePartitionType;
import static gobblin.source.PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN;
+
/**
* PartitionRetriever that is optimized for nested directory structures where data is dumped on a regular basis
* and most data has likely been processed by Gobblin already.
@@ -69,6 +71,7 @@ public class DatePartitionedNestedRetriever implements PartitionAwareFileRetriev
private FileSystem fs;
private HadoopFsHelper helper;
private final String expectedExtension;
+ private Duration leadTimeDuration;
public DatePartitionedNestedRetriever(String expectedExtension) {
this.expectedExtension = expectedExtension;
@@ -86,13 +89,14 @@ public class DatePartitionedNestedRetriever implements PartitionAwareFileRetriev
this.sourcePartitionSuffix =
state.getProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_SUFFIX, StringUtils.EMPTY);
this.sourceDir = new Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY));
+ this.leadTimeDuration = PartitionAwareFileRetrieverUtils.getLeadTimeDurationFromConfig(state);
this.helper = new HadoopFsHelper(state);
}
@Override
public List<FileInfo> getFilesToProcess(long minWatermark, int maxFilesToReturn)
throws IOException {
- DateTime currentDay = new DateTime();
+ DateTime currentDay = new DateTime().minus(leadTimeDuration);
DateTime lowWaterMarkDate = new DateTime(minWatermark);
List<FileInfo> filesToProcess = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-core/src/main/java/gobblin/source/PartitionAwareFileRetrieverUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/gobblin/source/PartitionAwareFileRetrieverUtils.java b/gobblin-core/src/main/java/gobblin/source/PartitionAwareFileRetrieverUtils.java
new file mode 100644
index 0000000..926267d
--- /dev/null
+++ b/gobblin-core/src/main/java/gobblin/source/PartitionAwareFileRetrieverUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gobblin.source;
+
+import org.joda.time.Duration;
+
+import gobblin.configuration.State;
+import gobblin.util.DatePartitionType;
+
+import static gobblin.source.PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME;
+import static gobblin.source.PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY;
+import static gobblin.source.PartitionedFileSourceBase.DEFAULT_DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY;
+import static gobblin.source.PartitionedFileSourceBase.DEFAULT_PARTITIONED_SOURCE_PARTITION_LEAD_TIME;
+
+
+/**
+ * Utility functions for parsing configuration parameters commonly used by {@link PartitionAwareFileRetriever}
+ * objects.
+ */
+public class PartitionAwareFileRetrieverUtils {
+ /**
+ * Retrieve the lead time duration from the LEAD_TIME and LEAD_TIME granularity config settings.
+ */
+ public static Duration getLeadTimeDurationFromConfig(State state) {
+ String leadTimeProp = state.getProp(DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME);
+ if (leadTimeProp == null || leadTimeProp.length() == 0) {
+ return DEFAULT_PARTITIONED_SOURCE_PARTITION_LEAD_TIME;
+ }
+
+ int leadTime = Integer.parseInt(leadTimeProp);
+
+ DatePartitionType leadTimeGranularity = DEFAULT_DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY;
+
+ String leadTimeGranularityProp = state.getProp(DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY);
+ if (leadTimeGranularityProp != null) {
+ leadTimeGranularity = DatePartitionType.valueOf(leadTimeGranularityProp);
+ }
+
+ return new Duration(leadTime * leadTimeGranularity.getUnitMilliseconds());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-core/src/main/java/gobblin/source/PartitionedFileSourceBase.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/gobblin/source/PartitionedFileSourceBase.java b/gobblin-core/src/main/java/gobblin/source/PartitionedFileSourceBase.java
index 041538c..de26450 100644
--- a/gobblin-core/src/main/java/gobblin/source/PartitionedFileSourceBase.java
+++ b/gobblin-core/src/main/java/gobblin/source/PartitionedFileSourceBase.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.joda.time.Duration;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
@@ -77,6 +78,21 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS
DatePartitionType.HOUR;
/**
+ * The partition 'lead time' allows a job to ignore a date partition for a given amount of time.
+ * For example, if the lead_time is set to 1 day and the job is run on Jul 1 2017 at 2am, the job
+ * will only process partitions from Jun 30 2017 at 2am and before.
+ */
+ public static final String DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME =
+ DATE_PARTITIONED_SOURCE_PREFIX + ".partition.lead_time.size";
+ public static final Duration DEFAULT_PARTITIONED_SOURCE_PARTITION_LEAD_TIME = new Duration(0);
+
+ public static final String DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY =
+ DATE_PARTITIONED_SOURCE_PREFIX + ".partition.lead_time.granularity";
+
+ public static final DatePartitionType DEFAULT_DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY =
+ DEFAULT_DATE_PARTITIONED_SOURCE_PARTITION_GRANULARITY;
+
+ /**
* A String of the format defined by {@link #DATE_PARTITIONED_SOURCE_PARTITION_PATTERN} or
* {@link #DATE_PARTITIONED_SOURCE_PARTITION_GRANULARITY}. For example for yyyy/MM/dd the
* 2015/01/01 corresponds to January 1st, 2015. The job will start reading data from this point in time.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-core/src/main/java/gobblin/source/RegexBasedPartitionedRetriever.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/gobblin/source/RegexBasedPartitionedRetriever.java b/gobblin-core/src/main/java/gobblin/source/RegexBasedPartitionedRetriever.java
index 9f43b6c..8ad8341 100644
--- a/gobblin-core/src/main/java/gobblin/source/RegexBasedPartitionedRetriever.java
+++ b/gobblin-core/src/main/java/gobblin/source/RegexBasedPartitionedRetriever.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +47,7 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev
private HadoopFsHelper helper;
private Path sourceDir;
private final String expectedExtension;
+ private Duration leadTime;
public RegexBasedPartitionedRetriever(String expectedExtension) {
this.expectedExtension = expectedExtension;
@@ -57,6 +60,7 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev
PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN
);
+ this.leadTime = PartitionAwareFileRetrieverUtils.getLeadTimeDurationFromConfig(state);
this.pattern = Pattern.compile(regexPattern);
this.helper = new HadoopFsHelper(state);
this.sourceDir = new Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY));
@@ -87,12 +91,14 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev
throws IOException {
// This implementation assumes snapshots are always in the root directory and the number of them
// remains relatively small
+ long maxAllowedWatermark = new DateTime().minus(leadTime).getMillis();
+
try {
this.helper.connect();
FileSystem fs = helper.getFileSystem();
List<FileInfo> filesToProcess = new ArrayList<>();
- List<FileInfo> outerDirectories = getOuterDirectories(fs, minWatermark);
+ List<FileInfo> outerDirectories = getOuterDirectories(fs, minWatermark, maxAllowedWatermark);
for (FileInfo outerDirectory: outerDirectories) {
FileStatus[] files = fs.listStatus(
new Path(outerDirectory.getFilePath()),
@@ -117,7 +123,7 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev
}
}
- private List<FileInfo> getOuterDirectories(FileSystem fs, long minWatermark) throws IOException {
+ private List<FileInfo> getOuterDirectories(FileSystem fs, long minWatermark, long maxAllowedWatermark) throws IOException {
LOGGER.debug("Listing contents of {}", sourceDir);
FileStatus[] fileStatus = fs.listStatus(sourceDir);
@@ -133,7 +139,7 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev
long watermark = getWatermarkFromString(
extractWatermarkFromDirectory(file.getPath().getName())
);
- if (watermark > minWatermark) {
+ if (watermark > minWatermark && watermark < maxAllowedWatermark) {
LOGGER.info("Processing directory {} with watermark {}",
file.getPath(),
watermark);
@@ -143,8 +149,8 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev
watermark
));
} else {
- LOGGER.info("Ignoring directory {} - watermark {} is less than minWatermark {}", file.getPath(), watermark,
- minWatermark);
+ LOGGER.info("Ignoring directory {} - watermark {} is not between minWatermark {} and (now-leadTime) {}",
+ file.getPath(), watermark, minWatermark, maxAllowedWatermark);
}
} catch (IllegalArgumentException e) {
LOGGER.info("Directory {} ({}) does not match pattern {}; skipping", file.getPath().getName(),
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-core/src/test/java/gobblin/source/RegexBasedPartitionedRetrieverTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/gobblin/source/RegexBasedPartitionedRetrieverTest.java b/gobblin-core/src/test/java/gobblin/source/RegexBasedPartitionedRetrieverTest.java
index 9a01b3a..208a5ae 100644
--- a/gobblin-core/src/test/java/gobblin/source/RegexBasedPartitionedRetrieverTest.java
+++ b/gobblin-core/src/test/java/gobblin/source/RegexBasedPartitionedRetrieverTest.java
@@ -24,6 +24,7 @@ import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.List;
+import org.joda.time.DateTime;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -37,7 +38,8 @@ public class RegexBasedPartitionedRetrieverTest {
private Path tempDir;
private enum DateToUse {
- APR_1_2017(1491004800000L), APR_3_2017(1491177600000L), MAY_1_2017(1493596800000L);
+ APR_1_2017(1491004800000L), APR_3_2017(1491177600000L), MAY_1_2017(1493596800000L),
+ TWENTY_THREE_HOURS_AGO(new DateTime().minusHours(23).getMillis());
private final long value;
@@ -93,11 +95,31 @@ public class RegexBasedPartitionedRetrieverTest {
r.init(state);
List<PartitionAwareFileRetriever.FileInfo> files = r.getFilesToProcess(DateToUse.APR_3_2017.getValue() - 1, 9999);
- Assert.assertEquals(files.size(), 2);
+ Assert.assertEquals(files.size(), 3);
verifyFile(files.get(0), DateToUse.APR_3_2017.getValue());
verifyFile(files.get(1), DateToUse.MAY_1_2017.getValue());
+ verifyFile(files.get(2), DateToUse.TWENTY_THREE_HOURS_AGO.getValue());
+ }
+ @Test
+ public void testLeadtime() throws IOException {
+ String snapshotRegex = "(\\d+)-PT-\\d+";
+ RegexBasedPartitionedRetriever r = new RegexBasedPartitionedRetriever("txt");
+ SourceState state = new SourceState();
+ state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
+ state.setProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY, tempDir.toString());
+ state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN,
+ snapshotRegex);
+ state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY, "DAY");
+ state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME, "1");
+ r.init(state);
+
+ List<PartitionAwareFileRetriever.FileInfo> files = r.getFilesToProcess(DateToUse.APR_3_2017.getValue() - 1, 9999);
+ Assert.assertEquals(files.size(), 2);
+
+ verifyFile(files.get(0), DateToUse.APR_3_2017.getValue());
+ verifyFile(files.get(1), DateToUse.MAY_1_2017.getValue());
}
private void verifyFile(PartitionAwareFileRetriever.FileInfo fileInfo, long value) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-core/src/test/java/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java b/gobblin-core/src/test/java/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java
index 577f4b8..145c522 100644
--- a/gobblin-core/src/test/java/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java
+++ b/gobblin-core/src/test/java/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java
@@ -98,13 +98,15 @@ public class DatePartitionedAvroFileExtractorTest {
this.schema = new Schema.Parser().parse(AVRO_SCHEMA);
//set up datetime objects
- DateTime now = new DateTime(TZ).minusHours(2);
+ DateTime now = new DateTime(TZ).minusHours(6);
this.startDateTime =
new DateTime(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), now.getHourOfDay(), 30, 0, TZ);
//create records, shift their timestamp by 1 minute
DateTime recordDt = startDateTime;
- for (int i = 0; i < RECORD_SIZE; i++) {
+ recordTimestamps[0] = recordDt.getMillis();
+ recordDt = recordDt.plusHours(4);
+ for (int i = 1; i < RECORD_SIZE; i++) {
recordDt = recordDt.plusMinutes(1);
recordTimestamps[i] = recordDt.getMillis();
}
@@ -170,6 +172,37 @@ public class DatePartitionedAvroFileExtractorTest {
}
@Test
+ public void testReadPartitionsByMinuteWithLeadtime() throws IOException, DataRecordException {
+
+ DatePartitionedAvroFileSource source = new DatePartitionedAvroFileSource();
+
+ SourceState state = new SourceState();
+ state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, ConfigurationKeys.LOCAL_FS_URI);
+ state.setProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, SOURCE_ENTITY);
+ state.setProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY, OUTPUT_DIR + Path.SEPARATOR + SOURCE_ENTITY);
+ state.setProp(ConfigurationKeys.SOURCE_ENTITY, SOURCE_ENTITY);
+ state.setProp(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS, 2);
+
+ state.setProp("date.partitioned.source.partition.pattern", DATE_PATTERN);
+ state.setProp("date.partitioned.source.min.watermark.value", DateTimeFormat.forPattern(DATE_PATTERN).print(
+ this.startDateTime.minusMinutes(1)));
+ state.setProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, TableType.SNAPSHOT_ONLY);
+ state.setProp("date.partitioned.source.partition.prefix", PREFIX);
+ state.setProp("date.partitioned.source.partition.suffix", SUFFIX);
+ state.setProp("date.partitioned.source.partition.lead_time.size", "3");
+ state.setProp("date.partitioned.source.partition.lead_time.granularity", "HOUR");
+
+ /*
+ * Since lead time is 3 hours, only the first WorkUnit (which is 6 hours old, rest are 2hrs) should get
+ * picked up
+ */
+ List<WorkUnit> workunits = source.getWorkunits(state);
+
+ Assert.assertEquals(workunits.size(), 1);
+ verifyWorkUnits(workunits, workunits.size());
+ }
+
+ @Test
public void testWorksNoPrefix() throws IOException, DataRecordException {
DatePartitionedAvroFileSource source = new DatePartitionedAvroFileSource();
@@ -195,7 +228,11 @@ public class DatePartitionedAvroFileExtractorTest {
private void verifyWorkUnits(List<WorkUnit> workunits)
throws IOException, DataRecordException {
- for (int i = 0; i < RECORD_SIZE; i++) {
+ verifyWorkUnits(workunits, RECORD_SIZE);
+ }
+
+ private void verifyWorkUnits(List<WorkUnit> workunits, int expectedSize) throws DataRecordException, IOException {
+ for (int i = 0; i < expectedSize; i++) {
WorkUnit workUnit = ((MultiWorkUnit) workunits.get(i)).getWorkUnits().get(0);
WorkUnitState wuState = new WorkUnitState(workunits.get(i), new State());
wuState.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, ConfigurationKeys.LOCAL_FS_URI);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-utility/src/main/java/gobblin/util/DatePartitionType.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/gobblin/util/DatePartitionType.java b/gobblin-utility/src/main/java/gobblin/util/DatePartitionType.java
index 5834914..be8be66 100644
--- a/gobblin-utility/src/main/java/gobblin/util/DatePartitionType.java
+++ b/gobblin-utility/src/main/java/gobblin/util/DatePartitionType.java
@@ -17,12 +17,13 @@ import java.util.Map;
import org.joda.time.DateTime;
import org.joda.time.DateTimeFieldType;
+import org.joda.time.chrono.ISOChronology;
/**
* Temporal granularity types for writing ({@link gobblin.writer.partitioner.TimeBasedWriterPartitioner}) and reading
* ({@link gobblin.source.DatePartitionedAvroFileSource}) date partitioned data.
- *
+ *
* @author Lorand Bendig
*
*/
@@ -62,7 +63,7 @@ public enum DatePartitionType {
/**
* @param pattern full partitioning pattern
- * @return a DateTimeFieldType corresponding to the smallest temporal unit in the pattern.
+ * @return a DateTimeFieldType corresponding to the smallest temporal unit in the pattern.
* E.g for yyyy/MM/dd {@link DateTimeFieldType#dayOfMonth()}
*/
public static DateTimeFieldType getLowestIntervalUnit(String pattern) {
@@ -75,15 +76,23 @@ public enum DatePartitionType {
}
return intervalUnit;
}
-
+
+ /**
+ * Get the number of milliseconds associated with a partition type. Eg
+ * getUnitMilliseconds() of DatePartitionType.MINUTE = 60,000.
+ */
+ public long getUnitMilliseconds() {
+ return dateTimeField.getDurationType().getField(ISOChronology.getInstance()).getUnitMillis();
+ }
+
public DateTimeFieldType getDateTimeFieldType() {
return dateTimeField;
}
-
+
public int getField(DateTime dateTime) {
return dateTime.get(this.dateTimeField);
}
-
+
public String getDateTimePattern() {
return dateTimePattern;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-utility/src/test/java/gobblin/util/DatePartitionTypeTest.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/java/gobblin/util/DatePartitionTypeTest.java b/gobblin-utility/src/test/java/gobblin/util/DatePartitionTypeTest.java
new file mode 100644
index 0000000..a46d258
--- /dev/null
+++ b/gobblin-utility/src/test/java/gobblin/util/DatePartitionTypeTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gobblin.util;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class DatePartitionTypeTest {
+ @Test
+ public void testGetMillis() {
+ Assert.assertEquals(DatePartitionType.MINUTE.getUnitMilliseconds(), 60000L);
+ Assert.assertEquals(DatePartitionType.HOUR.getUnitMilliseconds(), 60 * 60 * 1000L);
+ }
+}