You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2012/07/05 11:55:07 UTC
svn commit: r1357531 -
/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
Author: hshreedharan
Date: Thu Jul 5 09:55:06 2012
New Revision: 1357531
URL: http://svn.apache.org/viewvc?rev=1357531&view=rev
Log:
FLUME-1338. Produce helpful error message in case that timestamp header is missing when time based bucketing is in use.
(Jarek Jarcec Cecho via Hari Shreedharan)
Modified:
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java?rev=1357531&r1=1357530&r2=1357531&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java Thu Jul 5 09:55:06 2012
@@ -139,6 +139,22 @@ public class BucketPath {
*/
public static String replaceShorthand(char c, Map<String, String> headers,
boolean needRounding, int unit, int roundDown) {
+
+ String timestampHeader = headers.get("timestamp");
+ long ts;
+ try {
+ ts = Long.valueOf(timestampHeader);
+ } catch (NumberFormatException e) {
+ throw new RuntimeException("Flume wasn't able to parse timestamp header"
+ + " in the event to resolve time based bucketing. Please check that"
+ + " you're correctly populating timestamp header (for example using"
+ + " TimestampInterceptor source interceptor).", e);
+ }
+
+ if(needRounding){
+ ts = roundDown(roundDown, unit, ts);
+ }
+
// It's a date
String formatString = "";
switch (c) {
@@ -190,11 +206,6 @@ public class BucketPath {
formatString = "a";
break;
case 's':
- long ts = Long.valueOf(headers.get("timestamp"));
- if(needRounding){
- ts = roundDown(
- roundDown, unit, ts);
- }
return "" + (ts/1000);
case 'S':
formatString = "ss";
@@ -202,7 +213,7 @@ public class BucketPath {
case 't':
// This is different from unix date (which would insert a tab character
// here)
- return headers.get("timestamp");
+ return timestampHeader;
case 'y':
formatString = "yy";
break;
@@ -216,13 +227,9 @@ public class BucketPath {
// LOG.warn("Unrecognized escape in event format string: %" + c);
return "";
}
+
SimpleDateFormat format = new SimpleDateFormat(formatString);
- long ts = Long.valueOf(headers.get("timestamp"));
- long timestamp = ts;
- if(needRounding){
- timestamp = roundDown(roundDown, unit, ts);
- }
- Date date = new Date(timestamp);
+ Date date = new Date(ts);
return format.format(date);
}