You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/04/23 00:31:49 UTC

git commit: FLUME-2014. Race condition when using local timestamp with BucketPath

Updated Branches:
  refs/heads/trunk 6662b34c0 -> 0a855488a


FLUME-2014. Race condition when using local timestamp with BucketPath

(Mike Percy via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0a855488
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0a855488
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0a855488

Branch: refs/heads/trunk
Commit: 0a855488a152cf695e2e1e2f89b67ed2e3095422
Parents: 6662b34
Author: Hari Shreedharan <hs...@apache.org>
Authored: Mon Apr 22 15:30:49 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Mon Apr 22 15:30:49 2013 -0700

----------------------------------------------------------------------
 .../apache/flume/formatter/output/BucketPath.java  |   69 +++++++++++++--
 .../flume/formatter/output/TestBucketPath.java     |   29 ++++++
 2 files changed, 90 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/0a855488/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
index 971c75c..bef4b1f 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
@@ -49,10 +49,14 @@ public class BucketPath {
    * Returns true if in contains a substring matching TAG_REGEX (i.e. of the
    * form %{...} or %x.
    */
+  @VisibleForTesting
+  @Deprecated
   public static boolean containsTag(String in) {
     return tagPattern.matcher(in).find();
   }
 
+  @VisibleForTesting
+  @Deprecated
   public static String expandShorthand(char c) {
     // It's a date
     switch (c) {
@@ -116,16 +120,25 @@ public class BucketPath {
    *
    * Dates follow the same format as unix date, with a few exceptions.
    *
+   * <p>This static method will be REMOVED in a future version of Flume</p>
+   *
    */
+  @VisibleForTesting
+  @Deprecated
   public static String replaceShorthand(char c, Map<String, String> headers) {
     return replaceShorthand(c, headers, false, 0, 0);
   }
 
   /**
    * A wrapper around
-   * {@link BucketPath#replaceShorthand(char, Map, TimeZone, boolean, int, int)}
+   * {@link BucketPath#replaceShorthand(char, Map, TimeZone, boolean, int,
+   * int, boolean)}
    * with the timezone set to the default.
+   *
+   * <p>This static method will be REMOVED in a future version of Flume</p>
    */
+  @VisibleForTesting
+  @Deprecated
   public static String replaceShorthand(char c, Map<String, String> headers,
       boolean needRounding, int unit, int roundDown) {
     return replaceShorthand(c, headers, null, needRounding, unit, roundDown,
@@ -140,6 +153,9 @@ public class BucketPath {
    * Returns the empty string if an escape is not recognized.
    *
    * Dates follow the same format as unix date, with a few exceptions.
+   *
+   * <p>This static method will be REMOVED in a future version of Flume</p>
+   *
    * @param c - The character to replace.
    * @param headers - Event headers
    * @param timeZone - The timezone to use for formatting the timestamp
@@ -153,22 +169,40 @@ public class BucketPath {
    * value, smaller than the time supplied, defaults to 1, if <= 0(rounds off
    * to the second/minute/hour immediately lower than the timestamp supplied.
    * Ignored if needRounding is false.
+   *
    * @return
    */
+  @VisibleForTesting
+  @Deprecated
   public static String replaceShorthand(char c, Map<String, String> headers,
     TimeZone timeZone, boolean needRounding, int unit, int roundDown,
     boolean useLocalTimestamp) {
-    long ts;
-    String timestampHeader;
+    long ts = 0;
+    if (useLocalTimestamp) {
+      ts = clock.currentTimeMillis();
+    }
+    return replaceShorthand(c, headers, timeZone, needRounding, unit,
+        roundDown, false, ts);
+  }
+
+  /**
+   * Not intended as a public API
+   */
+  @VisibleForTesting
+  protected static String replaceShorthand(char c, Map<String, String> headers,
+      TimeZone timeZone, boolean needRounding, int unit, int roundDown,
+      boolean useLocalTimestamp, long ts) {
+
+    String timestampHeader = null;
     try {
       if(!useLocalTimestamp) {
         timestampHeader = headers.get("timestamp");
         Preconditions.checkNotNull(timestampHeader, "Expected timestamp in " +
           "the Flume event headers, but it was null");
+        ts = Long.valueOf(timestampHeader);
       } else {
-        timestampHeader = String.valueOf(clock.currentTimeMillis());
+        timestampHeader = String.valueOf(ts);
       }
-      ts = Long.valueOf(timestampHeader);
     } catch (NumberFormatException e) {
       throw new RuntimeException("Flume wasn't able to parse timestamp header"
         + " in the event to resolve time based bucketing. Please check that"
@@ -302,7 +336,8 @@ public class BucketPath {
 
   /**
    * A wrapper around
-   * {@link BucketPath#escapeString(String, Map, TimeZone, boolean, int, int)}
+   * {@link BucketPath#escapeString(String, Map, TimeZone, boolean, int, int,
+   boolean)}
    * with the timezone set to the default.
    */
   public static String escapeString(String in, Map<String, String> headers,
@@ -335,6 +370,9 @@ public class BucketPath {
   public static String escapeString(String in, Map<String, String> headers,
     TimeZone timeZone, boolean needRounding, int unit, int roundDown,
     boolean useLocalTimeStamp) {
+
+    long ts = clock.currentTimeMillis();
+
     Matcher matcher = tagPattern.matcher(in);
     StringBuffer sb = new StringBuffer();
     while (matcher.find()) {
@@ -356,7 +394,7 @@ public class BucketPath {
             "Expected to match single character tag in string " + in);
         char c = matcher.group(1).charAt(0);
         replacement = replaceShorthand(c, headers, timeZone,
-            needRounding, unit, roundDown, useLocalTimeStamp);
+            needRounding, unit, roundDown, useLocalTimeStamp, ts);
       }
 
       // The replacement string must have '$' and '\' chars escaped. This
@@ -383,10 +421,15 @@ public class BucketPath {
    * mapping of an attribute name to the value based on the escape sequence
    * found in the argument string.
    */
+  @VisibleForTesting
+  @Deprecated
   public static Map<String, String> getEscapeMapping(String in,
       Map<String, String> headers) {
     return getEscapeMapping(in, headers, false, 0, 0);
   }
+
+  @VisibleForTesting
+  @Deprecated
   public static Map<String, String> getEscapeMapping(String in,
       Map<String, String> headers, boolean needRounding,
       int unit, int roundDown) {
@@ -421,10 +464,20 @@ public class BucketPath {
 
   }
 
-  //Should not be called from outside unit tests.
+  /*
+   * May not be called from outside unit tests.
+   */
   @VisibleForTesting
   public static void setClock(Clock clk) {
     clock = clk;
   }
+
+  /*
+   * May not be called from outside unit tests.
+   */
+  @VisibleForTesting
+  public static Clock getClock() {
+    return clock;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/0a855488/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
index 9cfefc0..c441c4a 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
@@ -26,10 +26,16 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.TimeZone;
 
+import org.apache.flume.Clock;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class TestBucketPath {
   Calendar cal;
   Map<String, String> headers;
@@ -112,4 +118,27 @@ public class TestBucketPath {
     System.out.println("Expected String: "+ expectedString);
     Assert.assertEquals(expectedString, escapedString);
   }
+
+  @Test
+  public void testDateRace() {
+    Clock mockClock = mock(Clock.class);
+    DateTimeFormatter parser = ISODateTimeFormat.dateTimeParser();
+    long two = parser.parseMillis("2013-04-21T02:59:59-00:00");
+    long three = parser.parseMillis("2013-04-21T03:00:00-00:00");
+    when(mockClock.currentTimeMillis()).thenReturn(two, three);
+
+    // save & modify static state (yuck)
+    Clock origClock = BucketPath.getClock();
+    BucketPath.setClock(mockClock);
+
+    String pat = "%H:%M";
+    String escaped = BucketPath.escapeString(pat,
+        new HashMap<String, String>(),
+        TimeZone.getTimeZone("UTC"), true, Calendar.MINUTE, 10, true);
+
+    // restore static state
+    BucketPath.setClock(origClock);
+
+    Assert.assertEquals("Race condition detected", "02:50", escaped);
+  }
 }