You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by de...@apache.org on 2017/09/11 19:52:58 UTC
flume git commit: FLUME-1520. Timestamp interceptor should support
custom headers
Repository: flume
Updated Branches:
refs/heads/trunk 773555c5c -> 13771c905
FLUME-1520. Timestamp interceptor should support custom headers
This change adds a configuration parameter to the TimestampInterceptor
for the user to be able to define the name of the timestamp header.
Reviewers: Tristan Stevens, Attila Simon
(Hari Shreedharan, Tristan Stevens, Attila Simon via Denes Arvay)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/13771c90
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/13771c90
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/13771c90
Branch: refs/heads/trunk
Commit: 13771c905316052d3e94aeb3b4a0d49a27c0f852
Parents: 773555c
Author: Denes Arvay <de...@apache.org>
Authored: Mon Sep 11 15:16:45 2017 +0200
Committer: Denes Arvay <de...@apache.org>
Committed: Mon Sep 11 21:48:19 2017 +0200
----------------------------------------------------------------------
.../flume/interceptor/TimestampInterceptor.java | 27 ++++++++-----
.../interceptor/TestTimestampInterceptor.java | 41 ++++++++++++++------
flume-ng-doc/sphinx/FlumeUserGuide.rst | 19 ++++-----
3 files changed, 56 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/13771c90/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java
index 50c3695..4ed6387 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java
@@ -28,18 +28,22 @@ import static org.apache.flume.interceptor.TimestampInterceptor.Constants.*;
/**
* Simple Interceptor class that sets the current system timestamp on all events
* that are intercepted.
- * By convention, this timestamp header is named "timestamp" and its format
+ * By convention, this timestamp header is named "timestamp" by default and its format
* is a "stringified" long timestamp in milliseconds since the UNIX epoch.
+ * The name of the header can be changed through the configuration using the
+ * config key "header".
*/
public class TimestampInterceptor implements Interceptor {
private final boolean preserveExisting;
+ private final String header;
/**
* Only {@link TimestampInterceptor.Builder} can build me
*/
- private TimestampInterceptor(boolean preserveExisting) {
+ private TimestampInterceptor(boolean preserveExisting, String header) {
this.preserveExisting = preserveExisting;
+ this.header = header;
}
@Override
@@ -53,11 +57,11 @@ public class TimestampInterceptor implements Interceptor {
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
- if (preserveExisting && headers.containsKey(TIMESTAMP)) {
+ if (preserveExisting && headers.containsKey(header)) {
// we must preserve the existing timestamp
} else {
long now = System.currentTimeMillis();
- headers.put(TIMESTAMP, Long.toString(now));
+ headers.put(header, Long.toString(now));
}
return event;
}
@@ -85,24 +89,27 @@ public class TimestampInterceptor implements Interceptor {
*/
public static class Builder implements Interceptor.Builder {
- private boolean preserveExisting = PRESERVE_DFLT;
+ private boolean preserveExisting = DEFAULT_PRESERVE;
+ private String header = DEFAULT_HEADER_NAME;
@Override
public Interceptor build() {
- return new TimestampInterceptor(preserveExisting);
+ return new TimestampInterceptor(preserveExisting, header);
}
@Override
public void configure(Context context) {
- preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
+ preserveExisting = context.getBoolean(CONFIG_PRESERVE, DEFAULT_PRESERVE);
+ header = context.getString(CONFIG_HEADER_NAME, DEFAULT_HEADER_NAME);
}
}
public static class Constants {
- public static String TIMESTAMP = "timestamp";
- public static String PRESERVE = "preserveExisting";
- public static boolean PRESERVE_DFLT = false;
+ public static final String CONFIG_PRESERVE = "preserveExisting";
+ public static final boolean DEFAULT_PRESERVE = false;
+ public static final String CONFIG_HEADER_NAME = "headerName";
+ public static final String DEFAULT_HEADER_NAME = "timestamp";
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/13771c90/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestTimestampInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestTimestampInterceptor.java b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestTimestampInterceptor.java
index 3d3eeee..06648c6 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestTimestampInterceptor.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestTimestampInterceptor.java
@@ -35,17 +35,16 @@ public class TestTimestampInterceptor {
public void testBasic() throws ClassNotFoundException, InstantiationException,
IllegalAccessException {
- InterceptorBuilderFactory factory = new InterceptorBuilderFactory();
Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
InterceptorType.TIMESTAMP.toString());
Interceptor interceptor = builder.build();
Event event = EventBuilder.withBody("test event", Charsets.UTF_8);
- Assert.assertNull(event.getHeaders().get(Constants.TIMESTAMP));
+ Assert.assertNull(event.getHeaders().get(Constants.DEFAULT_HEADER_NAME));
Long now = System.currentTimeMillis();
event = interceptor.intercept(event);
- String timestampStr = event.getHeaders().get(Constants.TIMESTAMP);
+ String timestampStr = event.getHeaders().get(Constants.DEFAULT_HEADER_NAME);
Assert.assertNotNull(timestampStr);
Assert.assertTrue(Long.parseLong(timestampStr) >= now);
}
@@ -60,7 +59,6 @@ public class TestTimestampInterceptor {
Context ctx = new Context();
ctx.put("preserveExisting", "true");
- InterceptorBuilderFactory factory = new InterceptorBuilderFactory();
Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
InterceptorType.TIMESTAMP.toString());
builder.configure(ctx);
@@ -68,13 +66,12 @@ public class TestTimestampInterceptor {
long originalTs = 1L;
Event event = EventBuilder.withBody("test event", Charsets.UTF_8);
- event.getHeaders().put(Constants.TIMESTAMP, Long.toString(originalTs));
+ event.getHeaders().put(Constants.DEFAULT_HEADER_NAME, Long.toString(originalTs));
Assert.assertEquals(Long.toString(originalTs),
- event.getHeaders().get(Constants.TIMESTAMP));
+ event.getHeaders().get(Constants.DEFAULT_HEADER_NAME));
- Long now = System.currentTimeMillis();
event = interceptor.intercept(event);
- String timestampStr = event.getHeaders().get(Constants.TIMESTAMP);
+ String timestampStr = event.getHeaders().get(Constants.DEFAULT_HEADER_NAME);
Assert.assertNotNull(timestampStr);
Assert.assertTrue(Long.parseLong(timestampStr) == originalTs);
}
@@ -89,7 +86,6 @@ public class TestTimestampInterceptor {
Context ctx = new Context();
ctx.put("preserveExisting", "false"); // DEFAULT BEHAVIOR
- InterceptorBuilderFactory factory = new InterceptorBuilderFactory();
Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
InterceptorType.TIMESTAMP.toString());
builder.configure(ctx);
@@ -97,15 +93,36 @@ public class TestTimestampInterceptor {
long originalTs = 1L;
Event event = EventBuilder.withBody("test event", Charsets.UTF_8);
- event.getHeaders().put(Constants.TIMESTAMP, Long.toString(originalTs));
+ event.getHeaders().put(Constants.DEFAULT_HEADER_NAME, Long.toString(originalTs));
Assert.assertEquals(Long.toString(originalTs),
- event.getHeaders().get(Constants.TIMESTAMP));
+ event.getHeaders().get(Constants.DEFAULT_HEADER_NAME));
Long now = System.currentTimeMillis();
event = interceptor.intercept(event);
- String timestampStr = event.getHeaders().get(Constants.TIMESTAMP);
+ String timestampStr = event.getHeaders().get(Constants.DEFAULT_HEADER_NAME);
Assert.assertNotNull(timestampStr);
Assert.assertTrue(Long.parseLong(timestampStr) >= now);
}
+ @Test
+ public void testCustomHeader() throws Exception {
+ Context ctx = new Context();
+ ctx.put(TimestampInterceptor.Constants.CONFIG_HEADER_NAME, "timestampHeader");
+ Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
+ InterceptorType.TIMESTAMP.toString());
+ builder.configure(ctx);
+ Interceptor interceptor = builder.build();
+
+ long originalTs = 1L;
+ Event event = EventBuilder.withBody("test event", Charsets.UTF_8);
+ event.getHeaders().put(Constants.DEFAULT_HEADER_NAME, Long.toString(originalTs));
+
+ Long now = System.currentTimeMillis();
+ event = interceptor.intercept(event);
+ Assert.assertEquals(Long.toString(originalTs),
+ event.getHeaders().get(Constants.DEFAULT_HEADER_NAME));
+ String timestampStr = event.getHeaders().get("timestampHeader");
+ Assert.assertNotNull(timestampStr);
+ Assert.assertTrue(Long.parseLong(timestampStr) >= now);
+ }
}
http://git-wip-us.apache.org/repos/asf/flume/blob/13771c90/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index bbe9330..6183d9a 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -3956,15 +3956,16 @@ Timestamp Interceptor
~~~~~~~~~~~~~~~~~~~~~
This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor
-inserts a header with key ``timestamp`` whose value is the relevant timestamp. This interceptor
-can preserve an existing timestamp if it is already present in the configuration.
-
-================ ======= ========================================================================
-Property Name Default Description
-================ ======= ========================================================================
-**type** -- The component type name, has to be ``timestamp`` or the FQCN
-preserveExisting false If the timestamp already exists, should it be preserved - true or false
-================ ======= ========================================================================
+inserts a header with key ``timestamp`` (or as specified by the ``header`` property) whose value is the relevant timestamp.
+This interceptor can preserve an existing timestamp if it is already present in the configuration.
+
+================ ========= ========================================================================
+Property Name Default Description
+================ ========= ========================================================================
+**type** -- The component type name, has to be ``timestamp`` or the FQCN
+header timestamp The name of the header in which to place the generated timestamp.
+preserveExisting false If the timestamp already exists, should it be preserved - true or false
+================ ========= ========================================================================
Example for agent named a1: