You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/02/27 01:37:11 UTC

git commit: FLUME-1922. HDFS Sink should optionally insert the timestamp at the sink.

Updated Branches:
  refs/heads/trunk 7006510b8 -> 629314622


FLUME-1922. HDFS Sink should optionally insert the timestamp at the sink.

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/62931462
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/62931462
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/62931462

Branch: refs/heads/trunk
Commit: 6293146224a261d340b39824ce7dadcda9b1efd7
Parents: 7006510
Author: Mike Percy <mp...@apache.org>
Authored: Tue Feb 26 16:27:54 2013 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Feb 26 16:27:54 2013 -0800

----------------------------------------------------------------------
 .../apache/flume/formatter/output/BucketPath.java  |   35 +++++-
 .../flume/formatter/output/TestBucketPath.java     |    2 +-
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |    3 +-
 .../org/apache/flume/sink/hdfs/HDFSEventSink.java  |   19 +++-
 .../apache/flume/sink/hdfs/TestHDFSEventSink.java  |   91 +++++++++++++++
 5 files changed, 139 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/62931462/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
index fcc26f2..971c75c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
@@ -27,6 +27,9 @@ import java.util.TimeZone;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.flume.Clock;
+import org.apache.flume.SystemClock;
 import org.apache.flume.tools.TimestampRoundDownUtil;
 
 import com.google.common.base.Preconditions;
@@ -40,6 +43,8 @@ public class BucketPath {
   final public static String TAG_REGEX = "\\%(\\w|\\%)|\\%\\{([\\w\\.-]+)\\}";
   final public static Pattern tagPattern = Pattern.compile(TAG_REGEX);
 
+  private static Clock clock = new SystemClock();
+
   /**
    * Returns true if in contains a substring matching TAG_REGEX (i.e. of the
    * form %{...} or %x.
@@ -123,7 +128,8 @@ public class BucketPath {
    */
   public static String replaceShorthand(char c, Map<String, String> headers,
       boolean needRounding, int unit, int roundDown) {
-    return replaceShorthand(c, headers, null, needRounding, unit, roundDown);
+    return replaceShorthand(c, headers, null, needRounding, unit, roundDown,
+      false);
   }
 
   /**
@@ -150,11 +156,18 @@ public class BucketPath {
    * @return
    */
   public static String replaceShorthand(char c, Map<String, String> headers,
-      TimeZone timeZone, boolean needRounding, int unit, int roundDown) {
-
-    String timestampHeader = headers.get("timestamp");
+    TimeZone timeZone, boolean needRounding, int unit, int roundDown,
+    boolean useLocalTimestamp) {
     long ts;
+    String timestampHeader;
     try {
+      if(!useLocalTimestamp) {
+        timestampHeader = headers.get("timestamp");
+        Preconditions.checkNotNull(timestampHeader, "Expected timestamp in " +
+          "the Flume event headers, but it was null");
+      } else {
+        timestampHeader = String.valueOf(clock.currentTimeMillis());
+      }
       ts = Long.valueOf(timestampHeader);
     } catch (NumberFormatException e) {
       throw new RuntimeException("Flume wasn't able to parse timestamp header"
@@ -294,7 +307,8 @@ public class BucketPath {
    */
   public static String escapeString(String in, Map<String, String> headers,
       boolean needRounding, int unit, int roundDown) {
-    return escapeString(in, headers, null, needRounding, unit, roundDown);
+    return escapeString(in, headers, null, needRounding, unit, roundDown,
+      false);
   }
 
   /**
@@ -319,7 +333,8 @@ public class BucketPath {
    * @return Escaped string.
    */
   public static String escapeString(String in, Map<String, String> headers,
-      TimeZone timeZone, boolean needRounding, int unit, int roundDown) {
+    TimeZone timeZone, boolean needRounding, int unit, int roundDown,
+    boolean useLocalTimeStamp) {
     Matcher matcher = tagPattern.matcher(in);
     StringBuffer sb = new StringBuffer();
     while (matcher.find()) {
@@ -341,7 +356,7 @@ public class BucketPath {
             "Expected to match single character tag in string " + in);
         char c = matcher.group(1).charAt(0);
         replacement = replaceShorthand(c, headers, timeZone,
-            needRounding, unit, roundDown);
+            needRounding, unit, roundDown, useLocalTimeStamp);
       }
 
       // The replacement string must have '$' and '\' chars escaped. This
@@ -405,5 +420,11 @@ public class BucketPath {
     return mapping;
 
   }
+
+  //Should not be called from outside unit tests.
+  @VisibleForTesting
+  public static void setClock(Clock clk) {
+    clock = clk;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/62931462/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
index 090b3a8..9cfefc0 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
@@ -103,7 +103,7 @@ public class TestBucketPath {
     TimeZone utcTimeZone = TimeZone.getTimeZone("UTC");
     String test = "%c";
     String escapedString = BucketPath.escapeString(
-        test, headers, utcTimeZone, false, Calendar.HOUR_OF_DAY, 12);
+        test, headers, utcTimeZone, false, Calendar.HOUR_OF_DAY, 12, false);
     System.out.println("Escaped String: " + escapedString);
     SimpleDateFormat format = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
     format.setTimeZone(utcTimeZone);

http://git-wip-us.apache.org/repos/asf/flume/blob/62931462/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 8a4ecda..5ac903e 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1345,7 +1345,7 @@ complete files in the directory.
 Required properties are in **bold**.
 
 .. note:: For all of the time related escape sequences, a header with the key
-          "timestamp" must exist among the headers of the event. One way to add
+          "timestamp" must exist among the headers of the event (unless ``hdfs.useLocalTimeStamp`` is set to ``true``). One way to add
           this automatically is to use the TimestampInterceptor.
 
 ======================  ============  ======================================================================
@@ -1383,6 +1383,7 @@ hdfs.round              false         Should the timestamp be rounded down (if t
 hdfs.roundValue         1             Rounded down to the highest multiple of this (in the unit configured using ``hdfs.roundUnit``), less than current time.
 hdfs.roundUnit          second        The unit of the round down value - ``second``, ``minute`` or ``hour``.
 hdfs.timeZone           Local Time    Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.
+hdfs.useLocalTimeStamp  false         Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.
 serializer              ``TEXT``      Other possible options include ``avro_event`` or the
                                       fully-qualified class name of an implementation of the
                                       ``EventSerializer.Builder`` interface.

http://git-wip-us.apache.org/repos/asf/flume/blob/62931462/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index e980d13..76e3d1f 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -37,10 +37,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.flume.Channel;
+import org.apache.flume.Clock;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
+import org.apache.flume.SystemClock;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.formatter.output.BucketPath;
@@ -134,12 +137,14 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
   private boolean needRounding = false;
   private int roundUnit = Calendar.SECOND;
   private int roundValue = 1;
+  private boolean useLocalTime = false;
 
   private long callTimeout;
   private Context context;
   private SinkCounter sinkCounter;
 
   private volatile int idleTimeout;
+  private Clock clock;
 
   /*
    * Extended Java LinkedHashMap for open file handle LRU queue.
@@ -268,6 +273,11 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
       }
     }
 
+    this.useLocalTime = context.getBoolean("hdfs.useLocalTimeStamp", false);
+    if(useLocalTime) {
+      clock = new SystemClock();
+    }
+
     if (sinkCounter == null) {
       sinkCounter = new SinkCounter(getName());
     }
@@ -390,9 +400,9 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
 
         // reconstruct the path name by substituting place holders
         String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
-            timeZone, needRounding, roundUnit, roundValue);
+            timeZone, needRounding, roundUnit, roundValue, useLocalTime);
         String realName = BucketPath.escapeString(fileName, event.getHeaders(),
-          timeZone, needRounding, roundUnit, roundValue);
+          timeZone, needRounding, roundUnit, roundValue, useLocalTime);
 
         String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
         BucketWriter bucketWriter = sfWriters.get(lookupPath);
@@ -759,4 +769,9 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
       }
     });
   }
+
+  @VisibleForTesting
+  void setBucketClock(Clock clock) {
+    BucketPath.setClock(clock);
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/62931462/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index cdddd50..5b7cec9 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -35,10 +35,12 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Channel;
+import org.apache.flume.Clock;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Sink.Status;
+import org.apache.flume.SystemClock;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
@@ -508,6 +510,95 @@ public class TestHDFSEventSink {
   }
 
   @Test
+  public void testSimpleAppendLocalTime() throws InterruptedException,
+    LifecycleException, EventDeliveryException, IOException {
+    final long currentTime = System.currentTimeMillis();
+    Clock clk = new Clock() {
+      @Override
+      public long currentTimeMillis() {
+        return currentTime;
+      }
+    };
+
+    LOG.debug("Starting...");
+    final String fileName = "FlumeData";
+    final long rollCount = 5;
+    final long batchSize = 2;
+    final int numBatches = 4;
+    String newPath = testPath + "/singleBucket/%s" ;
+    String expectedPath = testPath + "/singleBucket/" +
+      String.valueOf(currentTime/1000);
+    int totalEvents = 0;
+    int i = 1, j = 1;
+
+    // clear the test directory
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    Path dirPath = new Path(expectedPath);
+    fs.delete(dirPath, true);
+    fs.mkdirs(dirPath);
+
+    Context context = new Context();
+
+    context.put("hdfs.path", newPath);
+    context.put("hdfs.filePrefix", fileName);
+    context.put("hdfs.rollCount", String.valueOf(rollCount));
+    context.put("hdfs.batchSize", String.valueOf(batchSize));
+    context.put("hdfs.useLocalTimeStamp", String.valueOf(true));
+
+    Configurables.configure(sink, context);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, context);
+
+    sink.setChannel(channel);
+    sink.setBucketClock(clk);
+    sink.start();
+
+    Calendar eventDate = Calendar.getInstance();
+    List<String> bodies = Lists.newArrayList();
+
+    // push the event batches into channel
+    for (i = 1; i < numBatches; i++) {
+      Transaction txn = channel.getTransaction();
+      txn.begin();
+      for (j = 1; j <= batchSize; j++) {
+        Event event = new SimpleEvent();
+        eventDate.clear();
+        eventDate.set(2011, i, i, i, 0); // yy mm dd
+        event.getHeaders().put("timestamp",
+          String.valueOf(eventDate.getTimeInMillis()));
+        event.getHeaders().put("hostname", "Host" + i);
+        String body = "Test." + i + "." + j;
+        event.setBody(body.getBytes());
+        bodies.add(body);
+        channel.put(event);
+        totalEvents++;
+      }
+      txn.commit();
+      txn.close();
+
+      // execute sink to process the events
+      sink.process();
+    }
+
+    sink.stop();
+
+    // loop through all the files generated and check their contains
+    FileStatus[] dirStat = fs.listStatus(dirPath);
+    Path fList[] = FileUtil.stat2Paths(dirStat);
+
+    // check that the roll happened correctly for the given data
+    long expectedFiles = totalEvents / rollCount;
+    if (totalEvents % rollCount > 0) expectedFiles++;
+    Assert.assertEquals("num files wrong, found: " +
+      Lists.newArrayList(fList), expectedFiles, fList.length);
+    verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
+    // The clock in bucketpath is static, so restore the real clock
+    sink.setBucketClock(new SystemClock());
+  }
+
+  @Test
   public void testAppend() throws InterruptedException, LifecycleException,
       EventDeliveryException, IOException {