You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/02/27 01:37:11 UTC
git commit: FLUME-1922. HDFS Sink should optionally insert the
timestamp at the sink.
Updated Branches:
refs/heads/trunk 7006510b8 -> 629314622
FLUME-1922. HDFS Sink should optionally insert the timestamp at the sink.
(Hari Shreedharan via Mike Percy)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/62931462
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/62931462
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/62931462
Branch: refs/heads/trunk
Commit: 6293146224a261d340b39824ce7dadcda9b1efd7
Parents: 7006510
Author: Mike Percy <mp...@apache.org>
Authored: Tue Feb 26 16:27:54 2013 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Feb 26 16:27:54 2013 -0800
----------------------------------------------------------------------
.../apache/flume/formatter/output/BucketPath.java | 35 +++++-
.../flume/formatter/output/TestBucketPath.java | 2 +-
flume-ng-doc/sphinx/FlumeUserGuide.rst | 3 +-
.../org/apache/flume/sink/hdfs/HDFSEventSink.java | 19 +++-
.../apache/flume/sink/hdfs/TestHDFSEventSink.java | 91 +++++++++++++++
5 files changed, 139 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/62931462/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
index fcc26f2..971c75c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
@@ -27,6 +27,9 @@ import java.util.TimeZone;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.flume.Clock;
+import org.apache.flume.SystemClock;
import org.apache.flume.tools.TimestampRoundDownUtil;
import com.google.common.base.Preconditions;
@@ -40,6 +43,8 @@ public class BucketPath {
final public static String TAG_REGEX = "\\%(\\w|\\%)|\\%\\{([\\w\\.-]+)\\}";
final public static Pattern tagPattern = Pattern.compile(TAG_REGEX);
+ private static Clock clock = new SystemClock();
+
/**
* Returns true if in contains a substring matching TAG_REGEX (i.e. of the
* form %{...} or %x.
@@ -123,7 +128,8 @@ public class BucketPath {
*/
public static String replaceShorthand(char c, Map<String, String> headers,
boolean needRounding, int unit, int roundDown) {
- return replaceShorthand(c, headers, null, needRounding, unit, roundDown);
+ return replaceShorthand(c, headers, null, needRounding, unit, roundDown,
+ false);
}
/**
@@ -150,11 +156,18 @@ public class BucketPath {
* @return
*/
public static String replaceShorthand(char c, Map<String, String> headers,
- TimeZone timeZone, boolean needRounding, int unit, int roundDown) {
-
- String timestampHeader = headers.get("timestamp");
+ TimeZone timeZone, boolean needRounding, int unit, int roundDown,
+ boolean useLocalTimestamp) {
long ts;
+ String timestampHeader;
try {
+ if(!useLocalTimestamp) {
+ timestampHeader = headers.get("timestamp");
+ Preconditions.checkNotNull(timestampHeader, "Expected timestamp in " +
+ "the Flume event headers, but it was null");
+ } else {
+ timestampHeader = String.valueOf(clock.currentTimeMillis());
+ }
ts = Long.valueOf(timestampHeader);
} catch (NumberFormatException e) {
throw new RuntimeException("Flume wasn't able to parse timestamp header"
@@ -294,7 +307,8 @@ public class BucketPath {
*/
public static String escapeString(String in, Map<String, String> headers,
boolean needRounding, int unit, int roundDown) {
- return escapeString(in, headers, null, needRounding, unit, roundDown);
+ return escapeString(in, headers, null, needRounding, unit, roundDown,
+ false);
}
/**
@@ -319,7 +333,8 @@ public class BucketPath {
* @return Escaped string.
*/
public static String escapeString(String in, Map<String, String> headers,
- TimeZone timeZone, boolean needRounding, int unit, int roundDown) {
+ TimeZone timeZone, boolean needRounding, int unit, int roundDown,
+ boolean useLocalTimeStamp) {
Matcher matcher = tagPattern.matcher(in);
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
@@ -341,7 +356,7 @@ public class BucketPath {
"Expected to match single character tag in string " + in);
char c = matcher.group(1).charAt(0);
replacement = replaceShorthand(c, headers, timeZone,
- needRounding, unit, roundDown);
+ needRounding, unit, roundDown, useLocalTimeStamp);
}
// The replacement string must have '$' and '\' chars escaped. This
@@ -405,5 +420,11 @@ public class BucketPath {
return mapping;
}
+
+ //Should not be called from outside unit tests.
+ @VisibleForTesting
+ public static void setClock(Clock clk) {
+ clock = clk;
+ }
}
http://git-wip-us.apache.org/repos/asf/flume/blob/62931462/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
index 090b3a8..9cfefc0 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
@@ -103,7 +103,7 @@ public class TestBucketPath {
TimeZone utcTimeZone = TimeZone.getTimeZone("UTC");
String test = "%c";
String escapedString = BucketPath.escapeString(
- test, headers, utcTimeZone, false, Calendar.HOUR_OF_DAY, 12);
+ test, headers, utcTimeZone, false, Calendar.HOUR_OF_DAY, 12, false);
System.out.println("Escaped String: " + escapedString);
SimpleDateFormat format = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
format.setTimeZone(utcTimeZone);
http://git-wip-us.apache.org/repos/asf/flume/blob/62931462/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 8a4ecda..5ac903e 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1345,7 +1345,7 @@ complete files in the directory.
Required properties are in **bold**.
.. note:: For all of the time related escape sequences, a header with the key
- "timestamp" must exist among the headers of the event. One way to add
+ "timestamp" must exist among the headers of the event (unless ``hdfs.useLocalTimeStamp`` is set to ``true``). One way to add
this automatically is to use the TimestampInterceptor.
====================== ============ ======================================================================
@@ -1383,6 +1383,7 @@ hdfs.round false Should the timestamp be rounded down (if t
hdfs.roundValue 1 Rounded down to the highest multiple of this (in the unit configured using ``hdfs.roundUnit``), less than current time.
hdfs.roundUnit second The unit of the round down value - ``second``, ``minute`` or ``hour``.
hdfs.timeZone Local Time Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.
+hdfs.useLocalTimeStamp false Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.
serializer ``TEXT`` Other possible options include ``avro_event`` or the
fully-qualified class name of an implementation of the
``EventSerializer.Builder`` interface.
http://git-wip-us.apache.org/repos/asf/flume/blob/62931462/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index e980d13..76e3d1f 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -37,10 +37,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.flume.Channel;
+import org.apache.flume.Clock;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
+import org.apache.flume.SystemClock;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.BucketPath;
@@ -134,12 +137,14 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
private boolean needRounding = false;
private int roundUnit = Calendar.SECOND;
private int roundValue = 1;
+ private boolean useLocalTime = false;
private long callTimeout;
private Context context;
private SinkCounter sinkCounter;
private volatile int idleTimeout;
+ private Clock clock;
/*
* Extended Java LinkedHashMap for open file handle LRU queue.
@@ -268,6 +273,11 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
}
}
+ this.useLocalTime = context.getBoolean("hdfs.useLocalTimeStamp", false);
+ if(useLocalTime) {
+ clock = new SystemClock();
+ }
+
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
@@ -390,9 +400,9 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
// reconstruct the path name by substituting place holders
String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
- timeZone, needRounding, roundUnit, roundValue);
+ timeZone, needRounding, roundUnit, roundValue, useLocalTime);
String realName = BucketPath.escapeString(fileName, event.getHeaders(),
- timeZone, needRounding, roundUnit, roundValue);
+ timeZone, needRounding, roundUnit, roundValue, useLocalTime);
String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
BucketWriter bucketWriter = sfWriters.get(lookupPath);
@@ -759,4 +769,9 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
}
});
}
+
+ @VisibleForTesting
+ void setBucketClock(Clock clock) {
+ BucketPath.setClock(clock);
+ }
}
http://git-wip-us.apache.org/repos/asf/flume/blob/62931462/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index cdddd50..5b7cec9 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -35,10 +35,12 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
+import org.apache.flume.Clock;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink.Status;
+import org.apache.flume.SystemClock;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
@@ -508,6 +510,95 @@ public class TestHDFSEventSink {
}
@Test
+ public void testSimpleAppendLocalTime() throws InterruptedException,
+ LifecycleException, EventDeliveryException, IOException {
+ final long currentTime = System.currentTimeMillis();
+ Clock clk = new Clock() {
+ @Override
+ public long currentTimeMillis() {
+ return currentTime;
+ }
+ };
+
+ LOG.debug("Starting...");
+ final String fileName = "FlumeData";
+ final long rollCount = 5;
+ final long batchSize = 2;
+ final int numBatches = 4;
+ String newPath = testPath + "/singleBucket/%s" ;
+ String expectedPath = testPath + "/singleBucket/" +
+ String.valueOf(currentTime/1000);
+ int totalEvents = 0;
+ int i = 1, j = 1;
+
+ // clear the test directory
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+ Path dirPath = new Path(expectedPath);
+ fs.delete(dirPath, true);
+ fs.mkdirs(dirPath);
+
+ Context context = new Context();
+
+ context.put("hdfs.path", newPath);
+ context.put("hdfs.filePrefix", fileName);
+ context.put("hdfs.rollCount", String.valueOf(rollCount));
+ context.put("hdfs.batchSize", String.valueOf(batchSize));
+ context.put("hdfs.useLocalTimeStamp", String.valueOf(true));
+
+ Configurables.configure(sink, context);
+
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, context);
+
+ sink.setChannel(channel);
+ sink.setBucketClock(clk);
+ sink.start();
+
+ Calendar eventDate = Calendar.getInstance();
+ List<String> bodies = Lists.newArrayList();
+
+ // push the event batches into channel
+ for (i = 1; i < numBatches; i++) {
+ Transaction txn = channel.getTransaction();
+ txn.begin();
+ for (j = 1; j <= batchSize; j++) {
+ Event event = new SimpleEvent();
+ eventDate.clear();
+ eventDate.set(2011, i, i, i, 0); // yy mm dd
+ event.getHeaders().put("timestamp",
+ String.valueOf(eventDate.getTimeInMillis()));
+ event.getHeaders().put("hostname", "Host" + i);
+ String body = "Test." + i + "." + j;
+ event.setBody(body.getBytes());
+ bodies.add(body);
+ channel.put(event);
+ totalEvents++;
+ }
+ txn.commit();
+ txn.close();
+
+ // execute sink to process the events
+ sink.process();
+ }
+
+ sink.stop();
+
+ // loop through all the files generated and check their contains
+ FileStatus[] dirStat = fs.listStatus(dirPath);
+ Path fList[] = FileUtil.stat2Paths(dirStat);
+
+ // check that the roll happened correctly for the given data
+ long expectedFiles = totalEvents / rollCount;
+ if (totalEvents % rollCount > 0) expectedFiles++;
+ Assert.assertEquals("num files wrong, found: " +
+ Lists.newArrayList(fList), expectedFiles, fList.length);
+ verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
+ // The clock in bucketpath is static, so restore the real clock
+ sink.setBucketClock(new SystemClock());
+ }
+
+ @Test
public void testAppend() throws InterruptedException, LifecycleException,
EventDeliveryException, IOException {