You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by de...@apache.org on 2017/09/11 19:52:58 UTC

flume git commit: FLUME-1520. Timestamp interceptor should support custom headers

Repository: flume
Updated Branches:
  refs/heads/trunk 773555c5c -> 13771c905


FLUME-1520. Timestamp interceptor should support custom headers

This change adds a configuration parameter to the TimestampInterceptor
for the user to be able to define the name of the timestamp header.

Reviewers: Tristan Stevens, Attila Simon

(Hari Shreedharan, Tristan Stevens, Attila Simon via Denes Arvay)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/13771c90
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/13771c90
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/13771c90

Branch: refs/heads/trunk
Commit: 13771c905316052d3e94aeb3b4a0d49a27c0f852
Parents: 773555c
Author: Denes Arvay <de...@apache.org>
Authored: Mon Sep 11 15:16:45 2017 +0200
Committer: Denes Arvay <de...@apache.org>
Committed: Mon Sep 11 21:48:19 2017 +0200

----------------------------------------------------------------------
 .../flume/interceptor/TimestampInterceptor.java | 27 ++++++++-----
 .../interceptor/TestTimestampInterceptor.java   | 41 ++++++++++++++------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          | 19 ++++-----
 3 files changed, 56 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/13771c90/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java
index 50c3695..4ed6387 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java
@@ -28,18 +28,22 @@ import static org.apache.flume.interceptor.TimestampInterceptor.Constants.*;
 /**
  * Simple Interceptor class that sets the current system timestamp on all events
  * that are intercepted.
- * By convention, this timestamp header is named "timestamp" and its format
+ * By convention, this timestamp header is named "timestamp" by default and its format
  * is a "stringified" long timestamp in milliseconds since the UNIX epoch.
+ * The name of the header can be changed through the configuration using the
+ * config key "header".
  */
 public class TimestampInterceptor implements Interceptor {
 
   private final boolean preserveExisting;
+  private final String header;
 
   /**
    * Only {@link TimestampInterceptor.Builder} can build me
    */
-  private TimestampInterceptor(boolean preserveExisting) {
+  private TimestampInterceptor(boolean preserveExisting, String header) {
     this.preserveExisting = preserveExisting;
+    this.header = header;
   }
 
   @Override
@@ -53,11 +57,11 @@ public class TimestampInterceptor implements Interceptor {
   @Override
   public Event intercept(Event event) {
     Map<String, String> headers = event.getHeaders();
-    if (preserveExisting && headers.containsKey(TIMESTAMP)) {
+    if (preserveExisting && headers.containsKey(header)) {
       // we must preserve the existing timestamp
     } else {
       long now = System.currentTimeMillis();
-      headers.put(TIMESTAMP, Long.toString(now));
+      headers.put(header, Long.toString(now));
     }
     return event;
   }
@@ -85,24 +89,27 @@ public class TimestampInterceptor implements Interceptor {
    */
   public static class Builder implements Interceptor.Builder {
 
-    private boolean preserveExisting = PRESERVE_DFLT;
+    private boolean preserveExisting = DEFAULT_PRESERVE;
+    private String header = DEFAULT_HEADER_NAME;
 
     @Override
     public Interceptor build() {
-      return new TimestampInterceptor(preserveExisting);
+      return new TimestampInterceptor(preserveExisting, header);
     }
 
     @Override
     public void configure(Context context) {
-      preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
+      preserveExisting = context.getBoolean(CONFIG_PRESERVE, DEFAULT_PRESERVE);
+      header = context.getString(CONFIG_HEADER_NAME, DEFAULT_HEADER_NAME);
     }
 
   }
 
   public static class Constants {
-    public static String TIMESTAMP = "timestamp";
-    public static String PRESERVE = "preserveExisting";
-    public static boolean PRESERVE_DFLT = false;
+    public static final String CONFIG_PRESERVE = "preserveExisting";
+    public static final boolean DEFAULT_PRESERVE = false;
+    public static final String CONFIG_HEADER_NAME = "headerName";
+    public static final String DEFAULT_HEADER_NAME = "timestamp";
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/13771c90/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestTimestampInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestTimestampInterceptor.java b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestTimestampInterceptor.java
index 3d3eeee..06648c6 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestTimestampInterceptor.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestTimestampInterceptor.java
@@ -35,17 +35,16 @@ public class TestTimestampInterceptor {
   public void testBasic() throws ClassNotFoundException, InstantiationException,
       IllegalAccessException {
 
-    InterceptorBuilderFactory factory = new InterceptorBuilderFactory();
     Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
         InterceptorType.TIMESTAMP.toString());
     Interceptor interceptor = builder.build();
 
     Event event = EventBuilder.withBody("test event", Charsets.UTF_8);
-    Assert.assertNull(event.getHeaders().get(Constants.TIMESTAMP));
+    Assert.assertNull(event.getHeaders().get(Constants.DEFAULT_HEADER_NAME));
 
     Long now = System.currentTimeMillis();
     event = interceptor.intercept(event);
-    String timestampStr = event.getHeaders().get(Constants.TIMESTAMP);
+    String timestampStr = event.getHeaders().get(Constants.DEFAULT_HEADER_NAME);
     Assert.assertNotNull(timestampStr);
     Assert.assertTrue(Long.parseLong(timestampStr) >= now);
   }
@@ -60,7 +59,6 @@ public class TestTimestampInterceptor {
     Context ctx = new Context();
     ctx.put("preserveExisting", "true");
 
-    InterceptorBuilderFactory factory = new InterceptorBuilderFactory();
     Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
         InterceptorType.TIMESTAMP.toString());
     builder.configure(ctx);
@@ -68,13 +66,12 @@ public class TestTimestampInterceptor {
 
     long originalTs = 1L;
     Event event = EventBuilder.withBody("test event", Charsets.UTF_8);
-    event.getHeaders().put(Constants.TIMESTAMP, Long.toString(originalTs));
+    event.getHeaders().put(Constants.DEFAULT_HEADER_NAME, Long.toString(originalTs));
     Assert.assertEquals(Long.toString(originalTs),
-        event.getHeaders().get(Constants.TIMESTAMP));
+        event.getHeaders().get(Constants.DEFAULT_HEADER_NAME));
 
-    Long now = System.currentTimeMillis();
     event = interceptor.intercept(event);
-    String timestampStr = event.getHeaders().get(Constants.TIMESTAMP);
+    String timestampStr = event.getHeaders().get(Constants.DEFAULT_HEADER_NAME);
     Assert.assertNotNull(timestampStr);
     Assert.assertTrue(Long.parseLong(timestampStr) == originalTs);
   }
@@ -89,7 +86,6 @@ public class TestTimestampInterceptor {
     Context ctx = new Context();
     ctx.put("preserveExisting", "false"); // DEFAULT BEHAVIOR
 
-    InterceptorBuilderFactory factory = new InterceptorBuilderFactory();
     Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
         InterceptorType.TIMESTAMP.toString());
     builder.configure(ctx);
@@ -97,15 +93,36 @@ public class TestTimestampInterceptor {
 
     long originalTs = 1L;
     Event event = EventBuilder.withBody("test event", Charsets.UTF_8);
-    event.getHeaders().put(Constants.TIMESTAMP, Long.toString(originalTs));
+    event.getHeaders().put(Constants.DEFAULT_HEADER_NAME, Long.toString(originalTs));
     Assert.assertEquals(Long.toString(originalTs),
-        event.getHeaders().get(Constants.TIMESTAMP));
+        event.getHeaders().get(Constants.DEFAULT_HEADER_NAME));
 
     Long now = System.currentTimeMillis();
     event = interceptor.intercept(event);
-    String timestampStr = event.getHeaders().get(Constants.TIMESTAMP);
+    String timestampStr = event.getHeaders().get(Constants.DEFAULT_HEADER_NAME);
     Assert.assertNotNull(timestampStr);
     Assert.assertTrue(Long.parseLong(timestampStr) >= now);
   }
 
+  @Test
+  public void testCustomHeader() throws Exception {
+    Context ctx = new Context();
+    ctx.put(TimestampInterceptor.Constants.CONFIG_HEADER_NAME, "timestampHeader");
+    Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
+            InterceptorType.TIMESTAMP.toString());
+    builder.configure(ctx);
+    Interceptor interceptor = builder.build();
+
+    long originalTs = 1L;
+    Event event = EventBuilder.withBody("test event", Charsets.UTF_8);
+    event.getHeaders().put(Constants.DEFAULT_HEADER_NAME, Long.toString(originalTs));
+
+    Long now = System.currentTimeMillis();
+    event = interceptor.intercept(event);
+    Assert.assertEquals(Long.toString(originalTs),
+            event.getHeaders().get(Constants.DEFAULT_HEADER_NAME));
+    String timestampStr = event.getHeaders().get("timestampHeader");
+    Assert.assertNotNull(timestampStr);
+    Assert.assertTrue(Long.parseLong(timestampStr) >= now);
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/13771c90/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index bbe9330..6183d9a 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -3956,15 +3956,16 @@ Timestamp Interceptor
 ~~~~~~~~~~~~~~~~~~~~~
 
 This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor
-inserts a header with key ``timestamp`` whose value is the relevant timestamp. This interceptor
-can preserve an existing timestamp if it is already present in the configuration.
-
-================  =======  ========================================================================
-Property Name     Default  Description
-================  =======  ========================================================================
-**type**          --       The component type name, has to be ``timestamp`` or the FQCN
-preserveExisting  false    If the timestamp already exists, should it be preserved - true or false
-================  =======  ========================================================================
+inserts a header with key ``timestamp`` (or as specified by the ``header`` property) whose value is the relevant timestamp.
+This interceptor can preserve an existing timestamp if it is already present in the configuration.
+
+================  =========  ========================================================================
+Property Name     Default    Description
+================  =========  ========================================================================
+**type**          --         The component type name, has to be ``timestamp`` or the FQCN
+header            timestamp  The name of the header in which to place the generated timestamp.
+preserveExisting  false      If the timestamp already exists, should it be preserved - true or false
+================  =========  ========================================================================
 
 Example for agent named a1: