You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ro...@apache.org on 2014/11/20 05:32:51 UTC

flume git commit: FLUME-2246. Make event data size configurable for logger sinker

Repository: flume
Updated Branches:
  refs/heads/flume-1.6 6ad8adae6 -> 4e50b16ab


FLUME-2246. Make event data size configurable for logger sinker

(Ashish Paliwal via Roshan Naik)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4e50b16a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4e50b16a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4e50b16a

Branch: refs/heads/flume-1.6
Commit: 4e50b16abed9dfb03b8b870083a05d3320bcd38e
Parents: 6ad8ada
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Wed Nov 19 20:28:23 2014 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Wed Nov 19 20:32:55 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/flume/sink/LoggerSink.java  | 32 +++++++++++++++++---
 .../org/apache/flume/sink/TestLoggerSink.java   | 23 ++++++++++++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  1 +
 3 files changed, 52 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/4e50b16a/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
index 128fa84..9cf9bc2 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
@@ -18,18 +18,20 @@
 
 package org.apache.flume.sink;
 
+import com.google.common.base.Strings;
 import org.apache.flume.Channel;
+import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Sink;
 import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
 import org.apache.flume.event.EventHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * <p>
- * A {@link Sink} implementation that logs all events received at the INFO level
+ * A {@link org.apache.flume.Sink} implementation that logs all events received at the INFO level
  * to the <tt>org.apache.flume.sink.LoggerSink</tt> logger.
  * </p>
  * <p>
@@ -49,11 +51,33 @@ import org.slf4j.LoggerFactory;
  * TODO
  * </p>
  */
-public class LoggerSink extends AbstractSink {
+public class LoggerSink extends AbstractSink implements Configurable {
 
   private static final Logger logger = LoggerFactory
       .getLogger(LoggerSink.class);
 
+  // Default Max bytes to dump
+  public static final int DEFAULT_MAX_BYTE_DUMP = 16;
+
+  // Max number of bytes to be dumped
+  private int maxBytesToLog = DEFAULT_MAX_BYTE_DUMP;
+
+  public static final String MAX_BYTES_DUMP_KEY = "maxBytesToLog";
+
+  @Override
+  public void configure(Context context) {
+    String strMaxBytes = context.getString(MAX_BYTES_DUMP_KEY);
+    if (!Strings.isNullOrEmpty(strMaxBytes)) {
+      try {
+        maxBytesToLog = Integer.parseInt(strMaxBytes);
+      } catch (NumberFormatException e) {
+        logger.warn(String.format("Unable to convert %s to integer, using default value(%d) for maxByteToDump",
+                strMaxBytes, DEFAULT_MAX_BYTE_DUMP));
+        maxBytesToLog = DEFAULT_MAX_BYTE_DUMP;
+      }
+    }
+  }
+
   @Override
   public Status process() throws EventDeliveryException {
     Status result = Status.READY;
@@ -67,7 +91,7 @@ public class LoggerSink extends AbstractSink {
 
       if (event != null) {
         if (logger.isInfoEnabled()) {
-          logger.info("Event: " + EventHelper.dumpEvent(event));
+          logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
         }
       } else {
         // No event found, request back-off semantics from the sink runner

http://git-wip-us.apache.org/repos/asf/flume/blob/4e50b16a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
index 92ff6fe..3257ced 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
@@ -19,6 +19,7 @@
 
 package org.apache.flume.sink;
 
+import com.google.common.base.Strings;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -56,6 +57,28 @@ public class TestLoggerSink {
 
     for (int i = 0; i < 10; i++) {
       Event event = EventBuilder.withBody(("Test " + i).getBytes());
+      channel.put(event);
+      sink.process();
+    }
+
+    sink.stop();
+  }
+
+  @Test
+  public void testAppendWithCustomSize() throws InterruptedException, LifecycleException,
+          EventDeliveryException {
+
+    Channel channel = new PseudoTxnMemoryChannel();
+    Context context = new Context();
+    context.put(LoggerSink.MAX_BYTES_DUMP_KEY, String.valueOf(30));
+    Configurables.configure(channel, context);
+    Configurables.configure(sink, context);
+
+    sink.setChannel(channel);
+    sink.start();
+
+    for (int i = 0; i < 10; i++) {
+      Event event = EventBuilder.withBody((Strings.padStart("Test " + i, 30, 'P')).getBytes());
 
       channel.put(event);
       sink.process();

http://git-wip-us.apache.org/repos/asf/flume/blob/4e50b16a/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 0199d62..bcadc2d 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1732,6 +1732,7 @@ Property Name   Default  Description
 ==============  =======  ===========================================
 **channel**     --
 **type**        --       The component type name, needs to be ``logger``
+maxBytesToLog   16       Maximum number of bytes of the Event body to log
 ==============  =======  ===========================================
 
 Example for agent named a1: