You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ro...@apache.org on 2014/11/20 05:32:51 UTC
flume git commit: FLUME-2246. Make event data size configurable for
logger sinker
Repository: flume
Updated Branches:
refs/heads/flume-1.6 6ad8adae6 -> 4e50b16ab
FLUME-2246. Make event data size configurable for logger sinker
(Ashish Paliwal via Roshan Naik)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4e50b16a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4e50b16a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4e50b16a
Branch: refs/heads/flume-1.6
Commit: 4e50b16abed9dfb03b8b870083a05d3320bcd38e
Parents: 6ad8ada
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Wed Nov 19 20:28:23 2014 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Wed Nov 19 20:32:55 2014 -0800
----------------------------------------------------------------------
.../java/org/apache/flume/sink/LoggerSink.java | 32 +++++++++++++++++---
.../org/apache/flume/sink/TestLoggerSink.java | 23 ++++++++++++++
flume-ng-doc/sphinx/FlumeUserGuide.rst | 1 +
3 files changed, 52 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/4e50b16a/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
index 128fa84..9cf9bc2 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
@@ -18,18 +18,20 @@
package org.apache.flume.sink;
+import com.google.common.base.Strings;
import org.apache.flume.Channel;
+import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Sink;
import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>
- * A {@link Sink} implementation that logs all events received at the INFO level
+ * A {@link org.apache.flume.Sink} implementation that logs all events received at the INFO level
* to the <tt>org.apache.flume.sink.LoggerSink</tt> logger.
* </p>
* <p>
@@ -49,11 +51,33 @@ import org.slf4j.LoggerFactory;
* TODO
* </p>
*/
-public class LoggerSink extends AbstractSink {
+public class LoggerSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory
.getLogger(LoggerSink.class);
+ // Default Max bytes to dump
+ public static final int DEFAULT_MAX_BYTE_DUMP = 16;
+
+ // Max number of bytes to be dumped
+ private int maxBytesToLog = DEFAULT_MAX_BYTE_DUMP;
+
+ public static final String MAX_BYTES_DUMP_KEY = "maxBytesToLog";
+
+ @Override
+ public void configure(Context context) {
+ String strMaxBytes = context.getString(MAX_BYTES_DUMP_KEY);
+ if (!Strings.isNullOrEmpty(strMaxBytes)) {
+ try {
+ maxBytesToLog = Integer.parseInt(strMaxBytes);
+ } catch (NumberFormatException e) {
+ logger.warn(String.format("Unable to convert %s to integer, using default value(%d) for maxByteToDump",
+ strMaxBytes, DEFAULT_MAX_BYTE_DUMP));
+ maxBytesToLog = DEFAULT_MAX_BYTE_DUMP;
+ }
+ }
+ }
+
@Override
public Status process() throws EventDeliveryException {
Status result = Status.READY;
@@ -67,7 +91,7 @@ public class LoggerSink extends AbstractSink {
if (event != null) {
if (logger.isInfoEnabled()) {
- logger.info("Event: " + EventHelper.dumpEvent(event));
+ logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
}
} else {
// No event found, request back-off semantics from the sink runner
http://git-wip-us.apache.org/repos/asf/flume/blob/4e50b16a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
index 92ff6fe..3257ced 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
@@ -19,6 +19,7 @@
package org.apache.flume.sink;
+import com.google.common.base.Strings;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
@@ -56,6 +57,28 @@ public class TestLoggerSink {
for (int i = 0; i < 10; i++) {
Event event = EventBuilder.withBody(("Test " + i).getBytes());
+ channel.put(event);
+ sink.process();
+ }
+
+ sink.stop();
+ }
+
+ @Test
+ public void testAppendWithCustomSize() throws InterruptedException, LifecycleException,
+ EventDeliveryException {
+
+ Channel channel = new PseudoTxnMemoryChannel();
+ Context context = new Context();
+ context.put(LoggerSink.MAX_BYTES_DUMP_KEY, String.valueOf(30));
+ Configurables.configure(channel, context);
+ Configurables.configure(sink, context);
+
+ sink.setChannel(channel);
+ sink.start();
+
+ for (int i = 0; i < 10; i++) {
+ Event event = EventBuilder.withBody((Strings.padStart("Test " + i, 30, 'P')).getBytes());
channel.put(event);
sink.process();
http://git-wip-us.apache.org/repos/asf/flume/blob/4e50b16a/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 0199d62..bcadc2d 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1732,6 +1732,7 @@ Property Name Default Description
============== ======= ===========================================
**channel** --
**type** -- The component type name, needs to be ``logger``
+maxBytesToLog 16 Maximum number of bytes of the Event body to log
============== ======= ===========================================
Example for agent named a1: