You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/05/19 20:10:35 UTC
svn commit: r1340527 - in /incubator/flume/trunk:
flume-ng-core/src/main/java/org/apache/flume/formatter/output/
flume-ng-core/src/main/java/org/apache/flume/tools/
flume-ng-core/src/test/java/org/apache/flume/formatter/
flume-ng-core/src/test/java/org...
Author: arvind
Date: Sat May 19 18:10:34 2012
New Revision: 1340527
URL: http://svn.apache.org/viewvc?rev=1340527&view=rev
Log:
FLUME-1213. Support for timestamp round-down for bucket path.
(Hari Shreedharan via Arvind Prabhakar)
Added:
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java (with props)
incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/formatter/
incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/formatter/output/
incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java (with props)
incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/tools/
incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/tools/TestTimestampRoundDownUtil.java (with props)
Modified:
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java?rev=1340527&r1=1340526&r2=1340527&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java Sat May 19 18:10:34 2012
@@ -19,12 +19,15 @@
package org.apache.flume.formatter.output;
import java.text.SimpleDateFormat;
+import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.flume.tools.TimestampRoundDownUtil;
+
import com.google.common.base.Preconditions;
public class BucketPath {
@@ -109,6 +112,33 @@ public class BucketPath {
*
*/
public static String replaceShorthand(char c, Map<String, String> headers) {
+ return replaceShorthand(c, headers, false, 0, 0);
+ }
+
+ /**
+ * Hardcoded lookups for %x style escape replacement. Add your own!
+ *
+ * All shorthands are Date format strings, currently.
+ *
+ * Returns the empty string if an escape is not recognized.
+ *
+ * Dates follow the same format as unix date, with a few exceptions.
+ * @param c - The character to replace.
+ * @param headers - Event headers
+ * @param needRounding - Should the timestamp be rounded down?
+ * @param unit - if needRounding is true, what unit to round down to. This
+ * must be one of the units specified by {@link java.util.Calendar} -
+ * HOUR, MINUTE or SECOND. Defaults to second, if none of these are present.
+ * Ignored if needRounding is false.
+ * @param roundDown - if needRounding is true,
+ * The time should be rounded to the largest multiple of this
+ * value, smaller than the time supplied, defaults to 1, if <= 0(rounds off
+ * to the second/minute/hour immediately lower than the timestamp supplied.
+ * Ignored if needRounding is false.
+ * @return
+ */
+ public static String replaceShorthand(char c, Map<String, String> headers,
+ boolean needRounding, int unit, int roundDown) {
// It's a date
String formatString = "";
switch (c) {
@@ -160,7 +190,12 @@ public class BucketPath {
formatString = "a";
break;
case 's':
- return "" + (Long.valueOf(headers.get("timestamp"))/ 1000);
+ long ts = Long.valueOf(headers.get("timestamp"));
+ if(needRounding){
+ ts = roundDown(
+ roundDown, unit, ts);
+ }
+ return "" + (ts/1000);
case 'S':
formatString = "ss";
break;
@@ -182,10 +217,40 @@ public class BucketPath {
return "";
}
SimpleDateFormat format = new SimpleDateFormat(formatString);
- Date date = new Date(Long.valueOf(headers.get("timestamp")));
+ long ts = Long.valueOf(headers.get("timestamp"));
+ long timestamp = ts;
+ if(needRounding){
+ timestamp = roundDown(roundDown, unit, ts);
+ }
+ Date date = new Date(timestamp);
return format.format(date);
}
+ private static long roundDown(int roundDown, int unit, long ts){
+ long timestamp = ts;
+ if(roundDown <= 0){
+ roundDown = 1;
+ }
+ switch (unit) {
+ case Calendar.SECOND:
+ timestamp = TimestampRoundDownUtil.roundDownTimeStampSeconds(
+ ts, roundDown);
+ break;
+ case Calendar.MINUTE:
+ timestamp = TimestampRoundDownUtil.roundDownTimeStampMinutes(
+ ts, roundDown);
+ break;
+ case Calendar.HOUR_OF_DAY:
+ timestamp = TimestampRoundDownUtil.roundDownTimeStampHours(
+ ts, roundDown);
+ break;
+ default:
+ timestamp = ts;
+ break;
+ }
+ return timestamp;
+ }
+
/**
* Replace all substrings of form %{tagname} with get(tagname).toString() and
* all shorthand substrings of form %x with a special value.
@@ -195,7 +260,33 @@ public class BucketPath {
* TODO(henry): we may want to consider taking this out of Event and into a
* more general class when we get more use cases for this pattern.
*/
- public static String escapeString(String in, Map<String, String> headers) {
+ public static String escapeString(String in, Map<String, String> headers){
+ return escapeString(in, headers, false, 0, 0);
+ }
+
+ /**
+ * Replace all substrings of form %{tagname} with get(tagname).toString() and
+ * all shorthand substrings of form %x with a special value.
+ *
+ * Any unrecognized / not found tags will be replaced with the empty string.
+ *
+ * TODO(henry): we may want to consider taking this out of Event and into a
+ * more general class when we get more use cases for this pattern.
+ *
+ * @param needRounding - Should the timestamp be rounded down?
+ * @param unit - if needRounding is true, what unit to round down to. This
+ * must be one of the units specified by {@link java.util.Calendar} -
+ * HOUR, MINUTE or SECOND. Defaults to second, if none of these are present.
+ * Ignored if needRounding is false.
+ * @param roundDown - if needRounding is true,
+ * The time should be rounded to the largest multiple of this
+ * value, smaller than the time supplied, defaults to 1, if <= 0(rounds off
+ * to the second/minute/hour immediately lower than the timestamp supplied.
+ * Ignored if needRounding is false.
+ * @return Escaped string.
+ */
+ public static String escapeString(String in, Map<String, String> headers,
+ boolean needRounding, int unit, int roundDown) {
Matcher matcher = tagPattern.matcher(in);
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
@@ -216,7 +307,8 @@ public class BucketPath {
&& matcher.group(1).length() == 1,
"Expected to match single character tag in string " + in);
char c = matcher.group(1).charAt(0);
- replacement = replaceShorthand(c, headers);
+ replacement = replaceShorthand(c, headers,
+ needRounding, unit, roundDown);
}
// The replacement string must have '$' and '\' chars escaped. This
@@ -243,7 +335,13 @@ public class BucketPath {
* mapping of an attribute name to the value based on the escape sequence
* found in the argument string.
*/
- public static Map<String, String> getEscapeMapping(String in, Map<String, String> headers) {
+ public static Map<String, String> getEscapeMapping(String in,
+ Map<String, String> headers) {
+ return getEscapeMapping(in, headers, false, 0, 0);
+ }
+ public static Map<String, String> getEscapeMapping(String in,
+ Map<String, String> headers, boolean needRounding,
+ int unit, int roundDown) {
Map<String, String> mapping = new HashMap<String, String>();
Matcher matcher = tagPattern.matcher(in);
while (matcher.find()) {
@@ -266,7 +364,8 @@ public class BucketPath {
&& matcher.group(1).length() == 1,
"Expected to match single character tag in string " + in);
char c = matcher.group(1).charAt(0);
- replacement = replaceShorthand(c, headers);
+ replacement = replaceShorthand(c, headers,
+ needRounding, unit, roundDown);
mapping.put(expandShorthand(c), replacement);
}
}
Added: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java?rev=1340527&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java Sat May 19 18:10:34 2012
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.tools;
+
+import java.util.Calendar;
+
+import com.google.common.base.Preconditions;
+
+public class TimestampRoundDownUtil {
+
+ /**
+ *
+ * @param timestamp - The time stamp to be rounded down.
+ * @param roundDownSec - The <tt>timestamp</tt> is rounded down to the largest
+ * multiple of <tt>roundDownSec</tt> seconds
+ * less than or equal to <tt>timestamp.</tt> Should be between 0 and 60.
+ * @return - Rounded down timestamp
+ * @throws IllegalStateException
+ */
+ public static long roundDownTimeStampSeconds(long timestamp,
+ int roundDownSec) throws IllegalStateException {
+ Preconditions.checkArgument(roundDownSec > 0 && roundDownSec <=60,
+ "RoundDownSec must be > 0 and <=60");
+ Calendar cal = roundDownField(timestamp, Calendar.SECOND, roundDownSec);
+ cal.set(Calendar.MILLISECOND, 0);
+ return cal.getTimeInMillis();
+ }
+
+ /**
+ *
+ * @param timestamp - The time stamp to be rounded down.
+ * @param roundDownMins - The <tt>timestamp</tt> is rounded down to the
+ * largest multiple of <tt>roundDownMins</tt> minutes less than
+ * or equal to <tt>timestamp.</tt> Should be between 0 and 60.
+ * @return - Rounded down timestamp
+ * @throws IllegalStateException
+ */
+ public static long roundDownTimeStampMinutes(long timestamp,
+ int roundDownMins) throws IllegalStateException {
+ Preconditions.checkArgument(roundDownMins > 0 && roundDownMins <=60,
+ "RoundDown must be > 0 and <=60");
+ Calendar cal = roundDownField(timestamp, Calendar.MINUTE, roundDownMins);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+ return cal.getTimeInMillis();
+
+ }
+
+ /**
+ *
+ * @param timestamp - The time stamp to be rounded down.
+ * @param roundDownHours - The <tt>timestamp</tt> is rounded down to the
+ * largest multiple of <tt>roundDownHours</tt> hours less than
+ * or equal to <tt>timestamp.</tt> Should be between 0 and 24.
+ * @return - Rounded down timestamp
+ * @throws IllegalStateException
+ */
+ public static long roundDownTimeStampHours(long timestamp,
+ int roundDownHours) throws IllegalStateException {
+ Preconditions.checkArgument(roundDownHours > 0 && roundDownHours <=24,
+ "RoundDown must be > 0 and <=24");
+ Calendar cal = roundDownField(timestamp,
+ Calendar.HOUR_OF_DAY, roundDownHours);
+ cal.set(Calendar.MINUTE, 0);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+ return cal.getTimeInMillis();
+ }
+
+ private static Calendar roundDownField(
+ long timestamp, int field, int roundDown){
+ Preconditions.checkArgument(timestamp > 0, "Timestamp must be positive");
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(timestamp);
+ int fieldVal = cal.get(field);
+ int remainder = (fieldVal % roundDown);
+ cal.set(field, fieldVal - remainder);
+ return cal;
+ }
+}
Propchange: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java?rev=1340527&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java Sat May 19 18:10:34 2012
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.formatter.output;
+
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestBucketPath {
+ Calendar cal;
+ Map<String, String> headers;
+ @Before
+ public void setUp(){
+ cal = Calendar.getInstance();
+ cal.set(2012, 5, 23, 13, 46, 33);
+ cal.set(Calendar.MILLISECOND, 234);
+ headers = new HashMap<String, String>();
+ headers.put("timestamp", String.valueOf(cal.getTimeInMillis()));
+ }
+ @Test
+ public void testDateFormatHours() {
+ String test = "%c";
+ String escapedString = BucketPath.escapeString(
+ test, headers, true, Calendar.HOUR_OF_DAY, 12);
+ System.out.println("Escaped String: " + escapedString);
+ Calendar cal2 = Calendar.getInstance();
+ cal2.set(2012, 5, 23, 12, 0, 0);
+ cal2.set(Calendar.MILLISECOND, 0);
+ SimpleDateFormat format = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+ Date d = new Date(cal2.getTimeInMillis());
+ String expectedString = format.format(d);
+ System.out.println("Expected String: "+ expectedString);
+ Assert.assertEquals(expectedString, escapedString);
+ }
+
+ @Test
+ public void testDateFormatMinutes() {
+ String test = "%s";
+ String escapedString = BucketPath.escapeString(
+ test, headers, true, Calendar.MINUTE, 5);
+ System.out.println("Escaped String: " + escapedString);
+ Calendar cal2 = Calendar.getInstance();
+ cal2.set(2012, 5, 23, 13, 45, 0);
+ cal2.set(Calendar.MILLISECOND, 0);
+ String expectedString = String.valueOf(cal2.getTimeInMillis()/1000);
+ System.out.println("Expected String: "+ expectedString);
+ Assert.assertEquals(expectedString, escapedString);
+ }
+
+ @Test
+ public void testDateFormatSeconds() {
+ String test = "%s";
+ String escapedString = BucketPath.escapeString(
+ test, headers, true, Calendar.SECOND, 5);
+ System.out.println("Escaped String: " + escapedString);
+ Calendar cal2 = Calendar.getInstance();
+ cal2.set(2012, 5, 23, 13, 46, 30);
+ cal2.set(Calendar.MILLISECOND, 0);
+ String expectedString = String.valueOf(cal2.getTimeInMillis()/1000);
+ System.out.println("Expected String: "+ expectedString);
+ Assert.assertEquals(expectedString, escapedString);
+ }
+
+ @Test
+ public void testNoRounding(){
+ String test = "%c";
+ String escapedString = BucketPath.escapeString(
+ test, headers, false, Calendar.HOUR_OF_DAY, 12);
+ System.out.println("Escaped String: " + escapedString);
+ SimpleDateFormat format = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+ Date d = new Date(cal.getTimeInMillis());
+ String expectedString = format.format(d);
+ System.out.println("Expected String: "+ expectedString);
+ Assert.assertEquals(expectedString, escapedString);
+ }
+
+}
Propchange: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/tools/TestTimestampRoundDownUtil.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/tools/TestTimestampRoundDownUtil.java?rev=1340527&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/tools/TestTimestampRoundDownUtil.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/tools/TestTimestampRoundDownUtil.java Sat May 19 18:10:34 2012
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.tools;
+
+
+import java.util.Calendar;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+public class TestTimestampRoundDownUtil {
+
+ @Test
+ public void testRoundDownTimeStampSeconds() {
+ Calendar cal = Calendar.getInstance();
+ cal.clear();
+ cal.set(2012, 5, 15, 15, 12, 54);
+ cal.set(Calendar.MILLISECOND, 20);
+ Calendar cal2 = Calendar.getInstance();
+ cal2.clear();
+ cal2.set(2012, 5, 15, 15, 12, 0);
+ cal2.set(Calendar.MILLISECOND, 0);
+ long timeToVerify = cal2.getTimeInMillis();
+ long ret = TimestampRoundDownUtil.
+ roundDownTimeStampSeconds(cal.getTimeInMillis(), 60);
+ System.out.println("Cal 1: " + cal.toString());
+ System.out.println("Cal 2: " + cal2.toString());
+ Assert.assertEquals(timeToVerify, ret);
+ }
+
+ @Test
+ public void testRoundDownTimeStampMinutes() {
+ Calendar cal = Calendar.getInstance();
+ cal.clear();
+ cal.set(2012, 5, 15, 15, 12, 54);
+ cal.set(Calendar.MILLISECOND, 20);
+ Calendar cal2 = Calendar.getInstance();
+ cal2.clear();
+ cal2.set(2012, 5, 15, 15, 10, 0);
+ cal2.set(Calendar.MILLISECOND, 0);
+ long timeToVerify = cal2.getTimeInMillis();
+ long ret = TimestampRoundDownUtil.
+ roundDownTimeStampMinutes(cal.getTimeInMillis(), 5);
+ System.out.println("Cal 1: " + cal.toString());
+ System.out.println("Cal 2: " + cal2.toString());
+ Assert.assertEquals(timeToVerify, ret);
+ }
+
+ @Test
+ public void testRoundDownTimeStampHours() {
+ Calendar cal = Calendar.getInstance();
+ cal.clear();
+ cal.set(2012, 5, 15, 15, 12, 54);
+ cal.set(Calendar.MILLISECOND, 20);
+ Calendar cal2 = Calendar.getInstance();
+ cal2.clear();
+ cal2.set(2012, 5, 15, 14, 0, 0);
+ cal2.set(Calendar.MILLISECOND, 0);
+ long timeToVerify = cal2.getTimeInMillis();
+ long ret = TimestampRoundDownUtil.
+ roundDownTimeStampHours(cal.getTimeInMillis(), 2);
+ System.out.println("Cal 1: " + ret);
+ System.out.println("Cal 2: " + cal2.toString());
+ Assert.assertEquals(timeToVerify, ret);
+ }
+
+}
Propchange: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/tools/TestTimestampRoundDownUtil.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1340527&r1=1340526&r2=1340527&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java (original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java Sat May 19 18:10:34 2012
@@ -21,6 +21,7 @@ package org.apache.flume.sink.hdfs;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Calendar;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map.Entry;
@@ -119,6 +120,10 @@ public class HDFSEventSink extends Abstr
private String proxyUserName;
private UserGroupInformation proxyTicket;
+ private boolean needRounding = false;
+ private int roundUnit = Calendar.SECOND;
+ private int roundValue = 1;
+
private long callTimeout;
private Context context;
@@ -243,6 +248,32 @@ public class HDFSEventSink extends Abstr
if (!authenticate(path)) {
LOG.error("Failed to authenticate!");
}
+ needRounding = context.getBoolean("hdfs.round", false);
+
+ if(needRounding) {
+ String unit = context.getString("hdfs.roundUnit", "second");
+ if (unit.equalsIgnoreCase("hour")) {
+ this.roundUnit = Calendar.HOUR_OF_DAY;
+ } else if (unit.equalsIgnoreCase("minute")) {
+ this.roundUnit = Calendar.MINUTE;
+ } else if (unit.equalsIgnoreCase("second")){
+ this.roundUnit = Calendar.SECOND;
+ } else {
+ LOG.warn("Rounding unit is not valid, please set one of" +
+ "minute, hour, or second. Rounding will be disabled");
+ needRounding = false;
+ }
+ this.roundValue = context.getInteger("hdfs.roundValue", 1);
+ if(roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE){
+ Preconditions.checkArgument(roundValue > 0 && roundValue <= 60,
+ "Round value" +
+ "must be > 0 and <= 60");
+ } else if (roundUnit == Calendar.HOUR_OF_DAY){
+ Preconditions.checkArgument(roundValue > 0 && roundValue <= 24,
+ "Round value" +
+ "must be > 0 and <= 24");
+ }
+ }
}
private static boolean codecMatches(Class<? extends CompressionCodec> cls,
@@ -376,7 +407,8 @@ public class HDFSEventSink extends Abstr
}
// reconstruct the path name by substituting place holders
- String realPath = BucketPath.escapeString(path, event.getHeaders());
+ String realPath = BucketPath.escapeString(path, event.getHeaders(),
+ needRounding, roundUnit, roundValue);
BucketWriter bucketWriter = sfWriters.get(realPath);
// we haven't seen this file yet, so open it and cache the handle
@@ -448,7 +480,7 @@ public class HDFSEventSink extends Abstr
}
} finally {
// flush the buckets that still has pending data
- // this ensures that the data removed from channel
+ // this ensures that the data removed from channel
// by the current transaction is safely on disk
for (BucketWriter writer : writers) {
if (writer.isBatchComplete()) {