You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/07/29 08:15:37 UTC

incubator-gobblin git commit: Add lead time functionality to DatePartitionedRetrievers

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 3b64cf70f -> 85d724c64


Add lead time functionality to DatePartitionedRetrievers

Add lead time functionality to
DatePartitionedRetrievers. Directories
that were created after (now() - lead time) will
be ignored by the
source.

minor style tweaks

Closes #2003 from eogren/datepartitionleadtime


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/85d724c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/85d724c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/85d724c6

Branch: refs/heads/master
Commit: 85d724c64dc5ab842992a19bdfa59b3ee3d8c24f
Parents: 3b64cf7
Author: Eric Ogren <eo...@linkedin.com>
Authored: Sat Jul 29 01:15:33 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Sat Jul 29 01:15:33 2017 -0700

----------------------------------------------------------------------
 .../source/DatePartitionedNestedRetriever.java  |  6 ++-
 .../PartitionAwareFileRetrieverUtils.java       | 55 ++++++++++++++++++++
 .../source/PartitionedFileSourceBase.java       | 16 ++++++
 .../source/RegexBasedPartitionedRetriever.java  | 16 ++++--
 .../RegexBasedPartitionedRetrieverTest.java     | 26 ++++++++-
 .../DatePartitionedAvroFileExtractorTest.java   | 43 +++++++++++++--
 .../java/gobblin/util/DatePartitionType.java    | 19 +++++--
 .../gobblin/util/DatePartitionTypeTest.java     | 29 +++++++++++
 8 files changed, 194 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-core/src/main/java/gobblin/source/DatePartitionedNestedRetriever.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/gobblin/source/DatePartitionedNestedRetriever.java b/gobblin-core/src/main/java/gobblin/source/DatePartitionedNestedRetriever.java
index 0dc2f9c..c0bc1dc 100644
--- a/gobblin-core/src/main/java/gobblin/source/DatePartitionedNestedRetriever.java
+++ b/gobblin-core/src/main/java/gobblin/source/DatePartitionedNestedRetriever.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
+import org.joda.time.Duration;
 import org.joda.time.DurationFieldType;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
@@ -46,6 +47,7 @@ import gobblin.util.DatePartitionType;
 
 import static gobblin.source.PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN;
 
+
 /**
  * PartitionRetriever that is optimized for nested directory structures where data is dumped on a regular basis
  * and most data has likely been processed by Gobblin already.
@@ -69,6 +71,7 @@ public class DatePartitionedNestedRetriever implements PartitionAwareFileRetriev
   private FileSystem fs;
   private HadoopFsHelper helper;
   private final String expectedExtension;
+  private Duration leadTimeDuration;
 
   public DatePartitionedNestedRetriever(String expectedExtension) {
     this.expectedExtension = expectedExtension;
@@ -86,13 +89,14 @@ public class DatePartitionedNestedRetriever implements PartitionAwareFileRetriev
     this.sourcePartitionSuffix =
         state.getProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_SUFFIX, StringUtils.EMPTY);
     this.sourceDir = new Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY));
+    this.leadTimeDuration = PartitionAwareFileRetrieverUtils.getLeadTimeDurationFromConfig(state);
     this.helper = new HadoopFsHelper(state);
   }
 
   @Override
   public List<FileInfo> getFilesToProcess(long minWatermark, int maxFilesToReturn)
       throws IOException {
-    DateTime currentDay = new DateTime();
+    DateTime currentDay = new DateTime().minus(leadTimeDuration);
     DateTime lowWaterMarkDate = new DateTime(minWatermark);
     List<FileInfo> filesToProcess = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-core/src/main/java/gobblin/source/PartitionAwareFileRetrieverUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/gobblin/source/PartitionAwareFileRetrieverUtils.java b/gobblin-core/src/main/java/gobblin/source/PartitionAwareFileRetrieverUtils.java
new file mode 100644
index 0000000..926267d
--- /dev/null
+++ b/gobblin-core/src/main/java/gobblin/source/PartitionAwareFileRetrieverUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gobblin.source;
+
+import org.joda.time.Duration;
+
+import gobblin.configuration.State;
+import gobblin.util.DatePartitionType;
+
+import static gobblin.source.PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME;
+import static gobblin.source.PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY;
+import static gobblin.source.PartitionedFileSourceBase.DEFAULT_DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY;
+import static gobblin.source.PartitionedFileSourceBase.DEFAULT_PARTITIONED_SOURCE_PARTITION_LEAD_TIME;
+
+
+/**
+ * Utility functions for parsing configuration parameters commonly used by {@link PartitionAwareFileRetriever}
+ * objects.
+ */
+public class PartitionAwareFileRetrieverUtils {
+  /**
+   * Retrieve the lead time duration from the LEAD_TIME and LEAD_TIME granularity config settings.
+   */
+  public static Duration getLeadTimeDurationFromConfig(State state) {
+    String leadTimeProp = state.getProp(DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME);
+    if (leadTimeProp == null || leadTimeProp.length() == 0) {
+      return DEFAULT_PARTITIONED_SOURCE_PARTITION_LEAD_TIME;
+    }
+
+    int leadTime = Integer.parseInt(leadTimeProp);
+
+    DatePartitionType leadTimeGranularity = DEFAULT_DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY;
+
+    String leadTimeGranularityProp = state.getProp(DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY);
+    if (leadTimeGranularityProp != null) {
+      leadTimeGranularity = DatePartitionType.valueOf(leadTimeGranularityProp);
+    }
+
+    return new Duration(leadTime * leadTimeGranularity.getUnitMilliseconds());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-core/src/main/java/gobblin/source/PartitionedFileSourceBase.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/gobblin/source/PartitionedFileSourceBase.java b/gobblin-core/src/main/java/gobblin/source/PartitionedFileSourceBase.java
index 041538c..de26450 100644
--- a/gobblin-core/src/main/java/gobblin/source/PartitionedFileSourceBase.java
+++ b/gobblin-core/src/main/java/gobblin/source/PartitionedFileSourceBase.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.joda.time.Duration;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 import org.slf4j.Logger;
@@ -77,6 +78,21 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS
       DatePartitionType.HOUR;
 
   /**
+   * The partition 'lead time' allows a job to ignore a date partition for a given amount of time.
+   * For example, if the lead_time is set to 1 day and the job is run on Jul 1 2017 at 2am, the job
+   * will only process partitions from Jun 30 2017 at 2am and before.
+   */
+  public static final String DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME =
+      DATE_PARTITIONED_SOURCE_PREFIX + ".partition.lead_time.size";
+  public static final Duration DEFAULT_PARTITIONED_SOURCE_PARTITION_LEAD_TIME = new Duration(0);
+
+  public static final String DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY =
+      DATE_PARTITIONED_SOURCE_PREFIX + ".partition.lead_time.granularity";
+
+  public static final DatePartitionType DEFAULT_DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY =
+      DEFAULT_DATE_PARTITIONED_SOURCE_PARTITION_GRANULARITY;
+
+  /**
   * A String of the format defined by {@link #DATE_PARTITIONED_SOURCE_PARTITION_PATTERN} or
   * {@link #DATE_PARTITIONED_SOURCE_PARTITION_GRANULARITY}. For example for yyyy/MM/dd the
   * 2015/01/01 corresponds to January 1st, 2015. The job will start reading data from this point in time.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-core/src/main/java/gobblin/source/RegexBasedPartitionedRetriever.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/gobblin/source/RegexBasedPartitionedRetriever.java b/gobblin-core/src/main/java/gobblin/source/RegexBasedPartitionedRetriever.java
index 9f43b6c..8ad8341 100644
--- a/gobblin-core/src/main/java/gobblin/source/RegexBasedPartitionedRetriever.java
+++ b/gobblin-core/src/main/java/gobblin/source/RegexBasedPartitionedRetriever.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +47,7 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev
   private HadoopFsHelper helper;
   private Path sourceDir;
   private final String expectedExtension;
+  private Duration leadTime;
 
   public RegexBasedPartitionedRetriever(String expectedExtension) {
     this.expectedExtension = expectedExtension;
@@ -57,6 +60,7 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev
       PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN
     );
 
+    this.leadTime = PartitionAwareFileRetrieverUtils.getLeadTimeDurationFromConfig(state);
     this.pattern = Pattern.compile(regexPattern);
     this.helper = new HadoopFsHelper(state);
     this.sourceDir = new Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY));
@@ -87,12 +91,14 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev
       throws IOException {
     // This implementation assumes snapshots are always in the root directory and the number of them
     // remains relatively small
+    long maxAllowedWatermark = new DateTime().minus(leadTime).getMillis();
+
     try {
       this.helper.connect();
       FileSystem fs = helper.getFileSystem();
       List<FileInfo> filesToProcess = new ArrayList<>();
 
-      List<FileInfo> outerDirectories = getOuterDirectories(fs, minWatermark);
+      List<FileInfo> outerDirectories = getOuterDirectories(fs, minWatermark, maxAllowedWatermark);
       for (FileInfo outerDirectory: outerDirectories) {
         FileStatus[] files = fs.listStatus(
             new Path(outerDirectory.getFilePath()),
@@ -117,7 +123,7 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev
     }
   }
 
-  private List<FileInfo> getOuterDirectories(FileSystem fs, long minWatermark) throws IOException {
+  private List<FileInfo> getOuterDirectories(FileSystem fs, long minWatermark, long maxAllowedWatermark) throws IOException {
     LOGGER.debug("Listing contents of {}", sourceDir);
 
     FileStatus[] fileStatus = fs.listStatus(sourceDir);
@@ -133,7 +139,7 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev
         long watermark = getWatermarkFromString(
             extractWatermarkFromDirectory(file.getPath().getName())
         );
-        if (watermark > minWatermark) {
+        if (watermark > minWatermark && watermark < maxAllowedWatermark) {
           LOGGER.info("Processing directory {} with watermark {}",
               file.getPath(),
               watermark);
@@ -143,8 +149,8 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev
               watermark
           ));
         } else {
-          LOGGER.info("Ignoring directory {} - watermark {} is less than minWatermark {}", file.getPath(), watermark,
-              minWatermark);
+          LOGGER.info("Ignoring directory {} - watermark {} is not between minWatermark {} and (now-leadTime) {}",
+              file.getPath(), watermark, minWatermark, maxAllowedWatermark);
         }
       } catch (IllegalArgumentException e) {
         LOGGER.info("Directory {} ({}) does not match pattern {}; skipping", file.getPath().getName(),

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-core/src/test/java/gobblin/source/RegexBasedPartitionedRetrieverTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/gobblin/source/RegexBasedPartitionedRetrieverTest.java b/gobblin-core/src/test/java/gobblin/source/RegexBasedPartitionedRetrieverTest.java
index 9a01b3a..208a5ae 100644
--- a/gobblin-core/src/test/java/gobblin/source/RegexBasedPartitionedRetrieverTest.java
+++ b/gobblin-core/src/test/java/gobblin/source/RegexBasedPartitionedRetrieverTest.java
@@ -24,6 +24,7 @@ import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.List;
 
+import org.joda.time.DateTime;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -37,7 +38,8 @@ public class RegexBasedPartitionedRetrieverTest {
   private Path tempDir;
 
   private enum DateToUse {
-    APR_1_2017(1491004800000L), APR_3_2017(1491177600000L), MAY_1_2017(1493596800000L);
+    APR_1_2017(1491004800000L), APR_3_2017(1491177600000L), MAY_1_2017(1493596800000L),
+    TWENTY_THREE_HOURS_AGO(new DateTime().minusHours(23).getMillis());
 
     private final long value;
 
@@ -93,11 +95,31 @@ public class RegexBasedPartitionedRetrieverTest {
     r.init(state);
 
     List<PartitionAwareFileRetriever.FileInfo> files = r.getFilesToProcess(DateToUse.APR_3_2017.getValue() - 1, 9999);
-    Assert.assertEquals(files.size(), 2);
+    Assert.assertEquals(files.size(), 3);
 
     verifyFile(files.get(0), DateToUse.APR_3_2017.getValue());
     verifyFile(files.get(1), DateToUse.MAY_1_2017.getValue());
+    verifyFile(files.get(2), DateToUse.TWENTY_THREE_HOURS_AGO.getValue());
+ }
 
+ @Test
+ public void testLeadtime() throws IOException {
+     String snapshotRegex = "(\\d+)-PT-\\d+";
+    RegexBasedPartitionedRetriever r = new RegexBasedPartitionedRetriever("txt");
+    SourceState state = new SourceState();
+    state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
+    state.setProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY, tempDir.toString());
+    state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN,
+        snapshotRegex);
+    state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY, "DAY");
+    state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME, "1");
+    r.init(state);
+
+    List<PartitionAwareFileRetriever.FileInfo> files = r.getFilesToProcess(DateToUse.APR_3_2017.getValue() - 1, 9999);
+    Assert.assertEquals(files.size(), 2);
+
+    verifyFile(files.get(0), DateToUse.APR_3_2017.getValue());
+    verifyFile(files.get(1), DateToUse.MAY_1_2017.getValue());
  }
 
   private void verifyFile(PartitionAwareFileRetriever.FileInfo fileInfo, long value) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-core/src/test/java/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java b/gobblin-core/src/test/java/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java
index 577f4b8..145c522 100644
--- a/gobblin-core/src/test/java/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java
+++ b/gobblin-core/src/test/java/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java
@@ -98,13 +98,15 @@ public class DatePartitionedAvroFileExtractorTest {
     this.schema = new Schema.Parser().parse(AVRO_SCHEMA);
 
     //set up datetime objects
-    DateTime now = new DateTime(TZ).minusHours(2);
+    DateTime now = new DateTime(TZ).minusHours(6);
     this.startDateTime =
         new DateTime(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), now.getHourOfDay(), 30, 0, TZ);
 
     //create records, shift their timestamp by 1 minute
     DateTime recordDt = startDateTime;
-    for (int i = 0; i < RECORD_SIZE; i++) {
+    recordTimestamps[0] = recordDt.getMillis();
+    recordDt = recordDt.plusHours(4);
+    for (int i = 1; i < RECORD_SIZE; i++) {
       recordDt = recordDt.plusMinutes(1);
       recordTimestamps[i] = recordDt.getMillis();
     }
@@ -170,6 +172,37 @@ public class DatePartitionedAvroFileExtractorTest {
   }
 
   @Test
+  public void testReadPartitionsByMinuteWithLeadtime() throws IOException, DataRecordException {
+
+    DatePartitionedAvroFileSource source = new DatePartitionedAvroFileSource();
+
+    SourceState state = new SourceState();
+    state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, ConfigurationKeys.LOCAL_FS_URI);
+    state.setProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, SOURCE_ENTITY);
+    state.setProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY, OUTPUT_DIR + Path.SEPARATOR + SOURCE_ENTITY);
+    state.setProp(ConfigurationKeys.SOURCE_ENTITY, SOURCE_ENTITY);
+    state.setProp(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS, 2);
+
+    state.setProp("date.partitioned.source.partition.pattern", DATE_PATTERN);
+    state.setProp("date.partitioned.source.min.watermark.value", DateTimeFormat.forPattern(DATE_PATTERN).print(
+        this.startDateTime.minusMinutes(1)));
+    state.setProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, TableType.SNAPSHOT_ONLY);
+    state.setProp("date.partitioned.source.partition.prefix", PREFIX);
+    state.setProp("date.partitioned.source.partition.suffix", SUFFIX);
+    state.setProp("date.partitioned.source.partition.lead_time.size", "3");
+    state.setProp("date.partitioned.source.partition.lead_time.granularity", "HOUR");
+
+    /*
+     * Since lead time is 3 hours, only the first WorkUnit (which is 6 hours old, rest are 2hrs) should get
+     * picked up
+     */
+    List<WorkUnit> workunits = source.getWorkunits(state);
+
+    Assert.assertEquals(workunits.size(), 1);
+    verifyWorkUnits(workunits, workunits.size());
+  }
+
+  @Test
   public void testWorksNoPrefix() throws IOException, DataRecordException {
     DatePartitionedAvroFileSource source = new DatePartitionedAvroFileSource();
 
@@ -195,7 +228,11 @@ public class DatePartitionedAvroFileExtractorTest {
 
   private void verifyWorkUnits(List<WorkUnit> workunits)
       throws IOException, DataRecordException {
-    for (int i = 0; i < RECORD_SIZE; i++) {
+    verifyWorkUnits(workunits, RECORD_SIZE);
+  }
+
+  private void verifyWorkUnits(List<WorkUnit> workunits, int expectedSize) throws DataRecordException, IOException {
+    for (int i = 0; i < expectedSize; i++) {
       WorkUnit workUnit = ((MultiWorkUnit) workunits.get(i)).getWorkUnits().get(0);
       WorkUnitState wuState = new WorkUnitState(workunits.get(i), new State());
       wuState.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, ConfigurationKeys.LOCAL_FS_URI);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-utility/src/main/java/gobblin/util/DatePartitionType.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/gobblin/util/DatePartitionType.java b/gobblin-utility/src/main/java/gobblin/util/DatePartitionType.java
index 5834914..be8be66 100644
--- a/gobblin-utility/src/main/java/gobblin/util/DatePartitionType.java
+++ b/gobblin-utility/src/main/java/gobblin/util/DatePartitionType.java
@@ -17,12 +17,13 @@ import java.util.Map;
 
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeFieldType;
+import org.joda.time.chrono.ISOChronology;
 
 
 /**
  * Temporal granularity types for writing ({@link gobblin.writer.partitioner.TimeBasedWriterPartitioner}) and reading
  * ({@link gobblin.source.DatePartitionedAvroFileSource}) date partitioned data.
- * 
+ *
  * @author Lorand Bendig
  *
  */
@@ -62,7 +63,7 @@ public enum DatePartitionType {
 
   /**
    * @param pattern full partitioning pattern
-   * @return a DateTimeFieldType corresponding to the smallest temporal unit in the pattern. 
+   * @return a DateTimeFieldType corresponding to the smallest temporal unit in the pattern.
    * E.g for yyyy/MM/dd {@link DateTimeFieldType#dayOfMonth()}
    */
   public static DateTimeFieldType getLowestIntervalUnit(String pattern) {
@@ -75,15 +76,23 @@ public enum DatePartitionType {
     }
     return intervalUnit;
   }
-  
+
+  /**
+   * Get the number of milliseconds associated with a partition type. Eg
+   * getUnitMilliseconds() of DatePartitionType.MINUTE = 60,000.
+   */
+  public long getUnitMilliseconds() {
+    return dateTimeField.getDurationType().getField(ISOChronology.getInstance()).getUnitMillis();
+  }
+
   public DateTimeFieldType getDateTimeFieldType() {
     return dateTimeField;
   }
-  
+
   public int getField(DateTime dateTime) {
     return dateTime.get(this.dateTimeField);
   }
-  
+
   public String getDateTimePattern() {
     return dateTimePattern;
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-utility/src/test/java/gobblin/util/DatePartitionTypeTest.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/java/gobblin/util/DatePartitionTypeTest.java b/gobblin-utility/src/test/java/gobblin/util/DatePartitionTypeTest.java
new file mode 100644
index 0000000..a46d258
--- /dev/null
+++ b/gobblin-utility/src/test/java/gobblin/util/DatePartitionTypeTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gobblin.util;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class DatePartitionTypeTest {
+  @Test
+  public void testGetMillis() {
+    Assert.assertEquals(DatePartitionType.MINUTE.getUnitMilliseconds(), 60000L);
+    Assert.assertEquals(DatePartitionType.HOUR.getUnitMilliseconds(), 60 * 60 * 1000L);
+  }
+}