You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/05/19 20:10:35 UTC

svn commit: r1340527 - in /incubator/flume/trunk: flume-ng-core/src/main/java/org/apache/flume/formatter/output/ flume-ng-core/src/main/java/org/apache/flume/tools/ flume-ng-core/src/test/java/org/apache/flume/formatter/ flume-ng-core/src/test/java/org...

Author: arvind
Date: Sat May 19 18:10:34 2012
New Revision: 1340527

URL: http://svn.apache.org/viewvc?rev=1340527&view=rev
Log:
FLUME-1213. Support for timestamp round-down for bucket path.

(Hari Shreedharan via Arvind Prabhakar)

Added:
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java   (with props)
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/formatter/
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/formatter/output/
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java   (with props)
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/tools/
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/tools/TestTimestampRoundDownUtil.java   (with props)
Modified:
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
    incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java?rev=1340527&r1=1340526&r2=1340527&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java Sat May 19 18:10:34 2012
@@ -19,12 +19,15 @@
 package org.apache.flume.formatter.output;
 
 import java.text.SimpleDateFormat;
+import java.util.Calendar;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.flume.tools.TimestampRoundDownUtil;
+
 import com.google.common.base.Preconditions;
 
 public class BucketPath {
@@ -109,6 +112,33 @@ public class BucketPath {
    *
    */
   public static String replaceShorthand(char c, Map<String, String> headers) {
+    return replaceShorthand(c, headers, false, 0, 0);
+  }
+
+  /**
+   * Hardcoded lookups for %x style escape replacement. Add your own!
+   *
+   * All shorthands are Date format strings, currently.
+   *
+   * Returns the empty string if an escape is not recognized.
+   *
+   * Dates follow the same format as unix date, with a few exceptions.
+   * @param c - The character to replace.
+   * @param headers - Event headers
+   * @param needRounding - Should the timestamp be rounded down?
+   * @param unit - if needRounding is true, what unit to round down to. This
+   * must be one of the units specified by {@link java.util.Calendar} -
+   * HOUR, MINUTE or SECOND. Defaults to second, if none of these are present.
+   * Ignored if needRounding is false.
+   * @param roundDown - if needRounding is true,
+   * The time should be rounded to the largest multiple of this
+   * value, smaller than the time supplied, defaults to 1, if <= 0(rounds off
+   * to the second/minute/hour immediately lower than the timestamp supplied.
+   * Ignored if needRounding is false.
+   * @return
+   */
+  public static String replaceShorthand(char c, Map<String, String> headers,
+      boolean needRounding, int unit, int roundDown) {
     // It's a date
     String formatString = "";
     switch (c) {
@@ -160,7 +190,12 @@ public class BucketPath {
       formatString = "a";
       break;
     case 's':
-      return "" + (Long.valueOf(headers.get("timestamp"))/ 1000);
+      long ts = Long.valueOf(headers.get("timestamp"));
+      if(needRounding){
+        ts = roundDown(
+            roundDown, unit, ts);
+      }
+      return "" + (ts/1000);
     case 'S':
       formatString = "ss";
       break;
@@ -182,10 +217,40 @@ public class BucketPath {
       return "";
     }
     SimpleDateFormat format = new SimpleDateFormat(formatString);
-    Date date = new Date(Long.valueOf(headers.get("timestamp")));
+    long ts = Long.valueOf(headers.get("timestamp"));
+    long timestamp = ts;
+    if(needRounding){
+      timestamp = roundDown(roundDown, unit, ts);
+    }
+    Date date = new Date(timestamp);
     return format.format(date);
   }
 
+  private static long roundDown(int roundDown, int unit, long ts){
+    long timestamp = ts;
+    if(roundDown <= 0){
+      roundDown = 1;
+    }
+    switch (unit) {
+      case Calendar.SECOND:
+        timestamp = TimestampRoundDownUtil.roundDownTimeStampSeconds(
+            ts, roundDown);
+        break;
+      case Calendar.MINUTE:
+        timestamp = TimestampRoundDownUtil.roundDownTimeStampMinutes(
+            ts, roundDown);
+        break;
+      case Calendar.HOUR_OF_DAY:
+        timestamp = TimestampRoundDownUtil.roundDownTimeStampHours(
+            ts, roundDown);
+        break;
+      default:
+        timestamp = ts;
+        break;
+    }
+    return timestamp;
+  }
+
   /**
    * Replace all substrings of form %{tagname} with get(tagname).toString() and
    * all shorthand substrings of form %x with a special value.
@@ -195,7 +260,33 @@ public class BucketPath {
    * TODO(henry): we may want to consider taking this out of Event and into a
    * more general class when we get more use cases for this pattern.
    */
-  public static String escapeString(String in, Map<String, String> headers) {
+  public static String escapeString(String in, Map<String, String> headers){
+    return escapeString(in, headers, false, 0, 0);
+  }
+
+  /**
+   * Replace all substrings of form %{tagname} with get(tagname).toString() and
+   * all shorthand substrings of form %x with a special value.
+   *
+   * Any unrecognized / not found tags will be replaced with the empty string.
+   *
+   * TODO(henry): we may want to consider taking this out of Event and into a
+   * more general class when we get more use cases for this pattern.
+   *
+   * @param needRounding - Should the timestamp be rounded down?
+   * @param unit - if needRounding is true, what unit to round down to. This
+   * must be one of the units specified by {@link java.util.Calendar} -
+   * HOUR, MINUTE or SECOND. Defaults to second, if none of these are present.
+   * Ignored if needRounding is false.
+   * @param roundDown - if needRounding is true,
+   * The time should be rounded to the largest multiple of this
+   * value, smaller than the time supplied, defaults to 1, if <= 0(rounds off
+   * to the second/minute/hour immediately lower than the timestamp supplied.
+   * Ignored if needRounding is false.
+   * @return Escaped string.
+   */
+  public static String escapeString(String in, Map<String, String> headers,
+      boolean needRounding, int unit, int roundDown) {
     Matcher matcher = tagPattern.matcher(in);
     StringBuffer sb = new StringBuffer();
     while (matcher.find()) {
@@ -216,7 +307,8 @@ public class BucketPath {
             && matcher.group(1).length() == 1,
             "Expected to match single character tag in string " + in);
         char c = matcher.group(1).charAt(0);
-        replacement = replaceShorthand(c, headers);
+        replacement = replaceShorthand(c, headers,
+            needRounding, unit, roundDown);
       }
 
       // The replacement string must have '$' and '\' chars escaped. This
@@ -243,7 +335,13 @@ public class BucketPath {
    * mapping of an attribute name to the value based on the escape sequence
    * found in the argument string.
    */
-  public static Map<String, String> getEscapeMapping(String in, Map<String, String> headers) {
+  public static Map<String, String> getEscapeMapping(String in,
+      Map<String, String> headers) {
+    return getEscapeMapping(in, headers, false, 0, 0);
+  }
+  public static Map<String, String> getEscapeMapping(String in,
+      Map<String, String> headers, boolean needRounding,
+      int unit, int roundDown) {
     Map<String, String> mapping = new HashMap<String, String>();
     Matcher matcher = tagPattern.matcher(in);
     while (matcher.find()) {
@@ -266,7 +364,8 @@ public class BucketPath {
             && matcher.group(1).length() == 1,
             "Expected to match single character tag in string " + in);
         char c = matcher.group(1).charAt(0);
-        replacement = replaceShorthand(c, headers);
+        replacement = replaceShorthand(c, headers,
+            needRounding, unit, roundDown);
         mapping.put(expandShorthand(c), replacement);
       }
     }

Added: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java?rev=1340527&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java Sat May 19 18:10:34 2012
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.tools;
+
+import java.util.Calendar;
+
+import com.google.common.base.Preconditions;
+
+public class TimestampRoundDownUtil {
+
+  /**
+   *
+   * @param timestamp - The time stamp to be rounded down.
+   * @param roundDownSec - The <tt>timestamp</tt> is rounded down to the largest
+   * multiple of <tt>roundDownSec</tt> seconds
+   * less than or equal to <tt>timestamp.</tt> Should be between 0 and 60.
+   * @return - Rounded down timestamp
+   * @throws IllegalStateException
+   */
+  public static long roundDownTimeStampSeconds(long timestamp,
+      int roundDownSec) throws IllegalStateException {
+    Preconditions.checkArgument(roundDownSec > 0 && roundDownSec <=60,
+        "RoundDownSec must be > 0 and <=60");
+    Calendar cal = roundDownField(timestamp, Calendar.SECOND, roundDownSec);
+    cal.set(Calendar.MILLISECOND, 0);
+    return cal.getTimeInMillis();
+  }
+
+  /**
+   *
+   * @param timestamp - The time stamp to be rounded down.
+   * @param roundDownMins - The <tt>timestamp</tt> is rounded down to the
+   * largest multiple of <tt>roundDownMins</tt> minutes less than
+   * or equal to <tt>timestamp.</tt> Should be between 0 and 60.
+   * @return - Rounded down timestamp
+   * @throws IllegalStateException
+   */
+  public static long roundDownTimeStampMinutes(long timestamp,
+      int roundDownMins) throws IllegalStateException {
+    Preconditions.checkArgument(roundDownMins > 0 && roundDownMins <=60,
+        "RoundDown must be > 0 and <=60");
+    Calendar cal = roundDownField(timestamp, Calendar.MINUTE, roundDownMins);
+    cal.set(Calendar.SECOND, 0);
+    cal.set(Calendar.MILLISECOND, 0);
+    return cal.getTimeInMillis();
+
+  }
+
+  /**
+   *
+   * @param timestamp - The time stamp to be rounded down.
+   * @param roundDownHours - The <tt>timestamp</tt> is rounded down to the
+   * largest multiple of <tt>roundDownHours</tt> hours less than
+   * or equal to <tt>timestamp.</tt> Should be between 0 and 24.
+   * @return - Rounded down timestamp
+   * @throws IllegalStateException
+   */
+  public static long roundDownTimeStampHours(long timestamp,
+      int roundDownHours) throws IllegalStateException {
+    Preconditions.checkArgument(roundDownHours > 0 && roundDownHours <=24,
+        "RoundDown must be > 0 and <=24");
+    Calendar cal = roundDownField(timestamp,
+        Calendar.HOUR_OF_DAY, roundDownHours);
+    cal.set(Calendar.MINUTE, 0);
+    cal.set(Calendar.SECOND, 0);
+    cal.set(Calendar.MILLISECOND, 0);
+    return cal.getTimeInMillis();
+  }
+
+  private static Calendar roundDownField(
+      long timestamp, int field, int roundDown){
+    Preconditions.checkArgument(timestamp > 0, "Timestamp must be positive");
+    Calendar cal = Calendar.getInstance();
+    cal.setTimeInMillis(timestamp);
+    int fieldVal = cal.get(field);
+    int remainder =  (fieldVal % roundDown);
+    cal.set(field, fieldVal - remainder);
+    return cal;
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java?rev=1340527&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java Sat May 19 18:10:34 2012
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.formatter.output;
+
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestBucketPath {
+  Calendar cal;
+  Map<String, String> headers;
+  @Before
+  public void setUp(){
+    cal = Calendar.getInstance();
+    cal.set(2012, 5, 23, 13, 46, 33);
+    cal.set(Calendar.MILLISECOND, 234);
+    headers = new HashMap<String, String>();
+    headers.put("timestamp", String.valueOf(cal.getTimeInMillis()));
+  }
+  @Test
+  public void testDateFormatHours() {
+    String test = "%c";
+    String escapedString = BucketPath.escapeString(
+        test, headers, true, Calendar.HOUR_OF_DAY, 12);
+    System.out.println("Escaped String: " + escapedString);
+    Calendar cal2 = Calendar.getInstance();
+    cal2.set(2012, 5, 23, 12, 0, 0);
+    cal2.set(Calendar.MILLISECOND, 0);
+    SimpleDateFormat format = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+    Date d = new Date(cal2.getTimeInMillis());
+    String expectedString = format.format(d);
+    System.out.println("Expected String: "+ expectedString);
+    Assert.assertEquals(expectedString, escapedString);
+  }
+
+  @Test
+  public void testDateFormatMinutes() {
+    String test = "%s";
+    String escapedString = BucketPath.escapeString(
+        test, headers, true, Calendar.MINUTE, 5);
+    System.out.println("Escaped String: " + escapedString);
+    Calendar cal2 = Calendar.getInstance();
+    cal2.set(2012, 5, 23, 13, 45, 0);
+    cal2.set(Calendar.MILLISECOND, 0);
+    String expectedString = String.valueOf(cal2.getTimeInMillis()/1000);
+    System.out.println("Expected String: "+ expectedString);
+    Assert.assertEquals(expectedString, escapedString);
+  }
+
+  @Test
+  public void testDateFormatSeconds() {
+    String test = "%s";
+    String escapedString = BucketPath.escapeString(
+        test, headers, true, Calendar.SECOND, 5);
+    System.out.println("Escaped String: " + escapedString);
+    Calendar cal2 = Calendar.getInstance();
+    cal2.set(2012, 5, 23, 13, 46, 30);
+    cal2.set(Calendar.MILLISECOND, 0);
+    String expectedString = String.valueOf(cal2.getTimeInMillis()/1000);
+    System.out.println("Expected String: "+ expectedString);
+    Assert.assertEquals(expectedString, escapedString);
+  }
+
+  @Test
+  public void testNoRounding(){
+    String test = "%c";
+    String escapedString = BucketPath.escapeString(
+        test, headers, false, Calendar.HOUR_OF_DAY, 12);
+    System.out.println("Escaped String: " + escapedString);
+    SimpleDateFormat format = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+    Date d = new Date(cal.getTimeInMillis());
+    String expectedString = format.format(d);
+    System.out.println("Expected String: "+ expectedString);
+    Assert.assertEquals(expectedString, escapedString);
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/tools/TestTimestampRoundDownUtil.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/tools/TestTimestampRoundDownUtil.java?rev=1340527&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/tools/TestTimestampRoundDownUtil.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/tools/TestTimestampRoundDownUtil.java Sat May 19 18:10:34 2012
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.tools;
+
+
+import java.util.Calendar;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+public class TestTimestampRoundDownUtil {
+
+  @Test
+  public void testRoundDownTimeStampSeconds() {
+    Calendar cal = Calendar.getInstance();
+    cal.clear();
+    cal.set(2012, 5, 15, 15, 12, 54);
+    cal.set(Calendar.MILLISECOND, 20);
+    Calendar cal2 = Calendar.getInstance();
+    cal2.clear();
+    cal2.set(2012, 5, 15, 15, 12, 0);
+    cal2.set(Calendar.MILLISECOND, 0);
+    long timeToVerify = cal2.getTimeInMillis();
+    long ret = TimestampRoundDownUtil.
+        roundDownTimeStampSeconds(cal.getTimeInMillis(), 60);
+    System.out.println("Cal 1: " + cal.toString());
+    System.out.println("Cal 2: " + cal2.toString());
+    Assert.assertEquals(timeToVerify, ret);
+  }
+
+  @Test
+  public void testRoundDownTimeStampMinutes() {
+    Calendar cal = Calendar.getInstance();
+    cal.clear();
+    cal.set(2012, 5, 15, 15, 12, 54);
+    cal.set(Calendar.MILLISECOND, 20);
+    Calendar cal2 = Calendar.getInstance();
+    cal2.clear();
+    cal2.set(2012, 5, 15, 15, 10, 0);
+    cal2.set(Calendar.MILLISECOND, 0);
+    long timeToVerify = cal2.getTimeInMillis();
+    long ret = TimestampRoundDownUtil.
+        roundDownTimeStampMinutes(cal.getTimeInMillis(), 5);
+    System.out.println("Cal 1: " + cal.toString());
+    System.out.println("Cal 2: " + cal2.toString());
+    Assert.assertEquals(timeToVerify, ret);
+  }
+
+  @Test
+  public void testRoundDownTimeStampHours() {
+    Calendar cal = Calendar.getInstance();
+    cal.clear();
+    cal.set(2012, 5, 15, 15, 12, 54);
+    cal.set(Calendar.MILLISECOND, 20);
+    Calendar cal2 = Calendar.getInstance();
+    cal2.clear();
+    cal2.set(2012, 5, 15, 14, 0, 0);
+    cal2.set(Calendar.MILLISECOND, 0);
+    long timeToVerify = cal2.getTimeInMillis();
+    long ret = TimestampRoundDownUtil.
+        roundDownTimeStampHours(cal.getTimeInMillis(), 2);
+    System.out.println("Cal 1: " + ret);
+    System.out.println("Cal 2: " + cal2.toString());
+    Assert.assertEquals(timeToVerify, ret);
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/tools/TestTimestampRoundDownUtil.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1340527&r1=1340526&r2=1340527&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java (original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java Sat May 19 18:10:34 2012
@@ -21,6 +21,7 @@ package org.apache.flume.sink.hdfs;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map.Entry;
@@ -119,6 +120,10 @@ public class HDFSEventSink extends Abstr
   private String proxyUserName;
   private UserGroupInformation proxyTicket;
 
+  private boolean needRounding = false;
+  private int roundUnit = Calendar.SECOND;
+  private int roundValue = 1;
+
   private long callTimeout;
   private Context context;
 
@@ -243,6 +248,32 @@ public class HDFSEventSink extends Abstr
     if (!authenticate(path)) {
       LOG.error("Failed to authenticate!");
     }
+    needRounding = context.getBoolean("hdfs.round", false);
+
+    if(needRounding) {
+      String unit = context.getString("hdfs.roundUnit", "second");
+      if (unit.equalsIgnoreCase("hour")) {
+        this.roundUnit = Calendar.HOUR_OF_DAY;
+      } else if (unit.equalsIgnoreCase("minute")) {
+        this.roundUnit = Calendar.MINUTE;
+      } else if (unit.equalsIgnoreCase("second")){
+        this.roundUnit = Calendar.SECOND;
+      } else {
+        LOG.warn("Rounding unit is not valid, please set one of" +
+            "minute, hour, or second. Rounding will be disabled");
+        needRounding = false;
+      }
+      this.roundValue = context.getInteger("hdfs.roundValue", 1);
+      if(roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE){
+        Preconditions.checkArgument(roundValue > 0 && roundValue <= 60,
+            "Round value" +
+            "must be > 0 and <= 60");
+      } else if (roundUnit == Calendar.HOUR_OF_DAY){
+        Preconditions.checkArgument(roundValue > 0 && roundValue <= 24,
+            "Round value" +
+            "must be > 0 and <= 24");
+      }
+    }
   }
 
   private static boolean codecMatches(Class<? extends CompressionCodec> cls,
@@ -376,7 +407,8 @@ public class HDFSEventSink extends Abstr
         }
 
         // reconstruct the path name by substituting place holders
-        String realPath = BucketPath.escapeString(path, event.getHeaders());
+        String realPath = BucketPath.escapeString(path, event.getHeaders(),
+            needRounding, roundUnit, roundValue);
         BucketWriter bucketWriter = sfWriters.get(realPath);
 
         // we haven't seen this file yet, so open it and cache the handle
@@ -448,7 +480,7 @@ public class HDFSEventSink extends Abstr
       }
     } finally {
       // flush the buckets that still has pending data
-      // this ensures that the data removed from channel 
+      // this ensures that the data removed from channel
       // by the current transaction is safely on disk
       for (BucketWriter writer : writers) {
         if (writer.isBatchComplete()) {