You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/04/23 00:31:49 UTC
git commit: FLUME-2014. Race condition when using local timestamp
with BucketPath
Updated Branches:
refs/heads/trunk 6662b34c0 -> 0a855488a
FLUME-2014. Race condition when using local timestamp with BucketPath
(Mike Percy via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0a855488
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0a855488
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0a855488
Branch: refs/heads/trunk
Commit: 0a855488a152cf695e2e1e2f89b67ed2e3095422
Parents: 6662b34
Author: Hari Shreedharan <hs...@apache.org>
Authored: Mon Apr 22 15:30:49 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Mon Apr 22 15:30:49 2013 -0700
----------------------------------------------------------------------
.../apache/flume/formatter/output/BucketPath.java | 69 +++++++++++++--
.../flume/formatter/output/TestBucketPath.java | 29 ++++++
2 files changed, 90 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/0a855488/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
index 971c75c..bef4b1f 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
@@ -49,10 +49,14 @@ public class BucketPath {
* Returns true if in contains a substring matching TAG_REGEX (i.e. of the
* form %{...} or %x.
*/
+ @VisibleForTesting
+ @Deprecated
public static boolean containsTag(String in) {
return tagPattern.matcher(in).find();
}
+ @VisibleForTesting
+ @Deprecated
public static String expandShorthand(char c) {
// It's a date
switch (c) {
@@ -116,16 +120,25 @@ public class BucketPath {
*
* Dates follow the same format as unix date, with a few exceptions.
*
+ * <p>This static method will be REMOVED in a future version of Flume</p>
+ *
*/
+ @VisibleForTesting
+ @Deprecated
public static String replaceShorthand(char c, Map<String, String> headers) {
return replaceShorthand(c, headers, false, 0, 0);
}
/**
* A wrapper around
- * {@link BucketPath#replaceShorthand(char, Map, TimeZone, boolean, int, int)}
+ * {@link BucketPath#replaceShorthand(char, Map, TimeZone, boolean, int,
+ * int, boolean)}
* with the timezone set to the default.
+ *
+ * <p>This static method will be REMOVED in a future version of Flume</p>
*/
+ @VisibleForTesting
+ @Deprecated
public static String replaceShorthand(char c, Map<String, String> headers,
boolean needRounding, int unit, int roundDown) {
return replaceShorthand(c, headers, null, needRounding, unit, roundDown,
@@ -140,6 +153,9 @@ public class BucketPath {
* Returns the empty string if an escape is not recognized.
*
* Dates follow the same format as unix date, with a few exceptions.
+ *
+ * <p>This static method will be REMOVED in a future version of Flume</p>
+ *
* @param c - The character to replace.
* @param headers - Event headers
* @param timeZone - The timezone to use for formatting the timestamp
@@ -153,22 +169,40 @@ public class BucketPath {
* value, smaller than the time supplied, defaults to 1, if <= 0(rounds off
* to the second/minute/hour immediately lower than the timestamp supplied.
* Ignored if needRounding is false.
+ *
* @return
*/
+ @VisibleForTesting
+ @Deprecated
public static String replaceShorthand(char c, Map<String, String> headers,
TimeZone timeZone, boolean needRounding, int unit, int roundDown,
boolean useLocalTimestamp) {
- long ts;
- String timestampHeader;
+ long ts = 0;
+ if (useLocalTimestamp) {
+ ts = clock.currentTimeMillis();
+ }
+ return replaceShorthand(c, headers, timeZone, needRounding, unit,
+ roundDown, false, ts);
+ }
+
+ /**
+ * Not intended as a public API
+ */
+ @VisibleForTesting
+ protected static String replaceShorthand(char c, Map<String, String> headers,
+ TimeZone timeZone, boolean needRounding, int unit, int roundDown,
+ boolean useLocalTimestamp, long ts) {
+
+ String timestampHeader = null;
try {
if(!useLocalTimestamp) {
timestampHeader = headers.get("timestamp");
Preconditions.checkNotNull(timestampHeader, "Expected timestamp in " +
"the Flume event headers, but it was null");
+ ts = Long.valueOf(timestampHeader);
} else {
- timestampHeader = String.valueOf(clock.currentTimeMillis());
+ timestampHeader = String.valueOf(ts);
}
- ts = Long.valueOf(timestampHeader);
} catch (NumberFormatException e) {
throw new RuntimeException("Flume wasn't able to parse timestamp header"
+ " in the event to resolve time based bucketing. Please check that"
@@ -302,7 +336,8 @@ public class BucketPath {
/**
* A wrapper around
- * {@link BucketPath#escapeString(String, Map, TimeZone, boolean, int, int)}
+ * {@link BucketPath#escapeString(String, Map, TimeZone, boolean, int, int,
+ boolean)}
* with the timezone set to the default.
*/
public static String escapeString(String in, Map<String, String> headers,
@@ -335,6 +370,9 @@ public class BucketPath {
public static String escapeString(String in, Map<String, String> headers,
TimeZone timeZone, boolean needRounding, int unit, int roundDown,
boolean useLocalTimeStamp) {
+
+ long ts = clock.currentTimeMillis();
+
Matcher matcher = tagPattern.matcher(in);
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
@@ -356,7 +394,7 @@ public class BucketPath {
"Expected to match single character tag in string " + in);
char c = matcher.group(1).charAt(0);
replacement = replaceShorthand(c, headers, timeZone,
- needRounding, unit, roundDown, useLocalTimeStamp);
+ needRounding, unit, roundDown, useLocalTimeStamp, ts);
}
// The replacement string must have '$' and '\' chars escaped. This
@@ -383,10 +421,15 @@ public class BucketPath {
* mapping of an attribute name to the value based on the escape sequence
* found in the argument string.
*/
+ @VisibleForTesting
+ @Deprecated
public static Map<String, String> getEscapeMapping(String in,
Map<String, String> headers) {
return getEscapeMapping(in, headers, false, 0, 0);
}
+
+ @VisibleForTesting
+ @Deprecated
public static Map<String, String> getEscapeMapping(String in,
Map<String, String> headers, boolean needRounding,
int unit, int roundDown) {
@@ -421,10 +464,20 @@ public class BucketPath {
}
- //Should not be called from outside unit tests.
+ /*
+ * May not be called from outside unit tests.
+ */
@VisibleForTesting
public static void setClock(Clock clk) {
clock = clk;
}
+
+ /*
+ * May not be called from outside unit tests.
+ */
+ @VisibleForTesting
+ public static Clock getClock() {
+ return clock;
+ }
}
http://git-wip-us.apache.org/repos/asf/flume/blob/0a855488/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
index 9cfefc0..c441c4a 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
@@ -26,10 +26,16 @@ import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
+import org.apache.flume.Clock;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class TestBucketPath {
Calendar cal;
Map<String, String> headers;
@@ -112,4 +118,27 @@ public class TestBucketPath {
System.out.println("Expected String: "+ expectedString);
Assert.assertEquals(expectedString, escapedString);
}
+
+ @Test
+ public void testDateRace() {
+ Clock mockClock = mock(Clock.class);
+ DateTimeFormatter parser = ISODateTimeFormat.dateTimeParser();
+ long two = parser.parseMillis("2013-04-21T02:59:59-00:00");
+ long three = parser.parseMillis("2013-04-21T03:00:00-00:00");
+ when(mockClock.currentTimeMillis()).thenReturn(two, three);
+
+ // save & modify static state (yuck)
+ Clock origClock = BucketPath.getClock();
+ BucketPath.setClock(mockClock);
+
+ String pat = "%H:%M";
+ String escaped = BucketPath.escapeString(pat,
+ new HashMap<String, String>(),
+ TimeZone.getTimeZone("UTC"), true, Calendar.MINUTE, 10, true);
+
+ // restore static state
+ BucketPath.setClock(origClock);
+
+ Assert.assertEquals("Race condition detected", "02:50", escaped);
+ }
}