You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/10/10 03:50:46 UTC

git commit: FLUME-1666. Syslog source strips timestamp and hostname from log message body

Updated Branches:
  refs/heads/trunk 02fc1a8cf -> 1f95219ea


FLUME-1666. Syslog source strips timestamp and hostname from log message body

(Jeff Lord via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1f95219e
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1f95219e
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1f95219e

Branch: refs/heads/trunk
Commit: 1f95219ea6f87173018bde126a3485575a8ee252
Parents: 02fc1a8
Author: Mike Percy <mp...@cloudera.com>
Authored: Wed Oct 9 18:49:31 2013 -0700
Committer: Mike Percy <mp...@cloudera.com>
Committed: Wed Oct 9 18:49:31 2013 -0700

----------------------------------------------------------------------
 .../SyslogSourceConfigurationConstants.java     |  3 +++
 .../apache/flume/source/SyslogTcpSource.java    | 20 ++++++++++++++++++++
 .../org/apache/flume/source/SyslogUtils.java    | 19 +++++++++++++++----
 .../flume/source/TestSyslogUdpSource.java       |  4 ++++
 .../apache/flume/source/TestSyslogUtils.java    |  5 +++--
 flume-ng-doc/sphinx/FlumeDeveloperGuide.rst     |  2 +-
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  2 ++
 7 files changed, 48 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/1f95219e/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
index 5a73c88..985949c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
@@ -66,6 +66,9 @@ public final class SyslogSourceConfigurationConstants {
   public static final String CONFIG_READBUF_SIZE = "readBufferBytes";
   public static final int DEFAULT_READBUF_SIZE = 1024;
 
+  public static final String CONFIG_KEEP_FIELDS = "keepFields";
+  public static final boolean DEFAULT_KEEP_FIELDS = false;
+
   private SyslogSourceConfigurationConstants() {
     // Disable explicit creation of objects.
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/1f95219e/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
index db9e0fd..7a12d27 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
@@ -19,10 +19,12 @@
 package org.apache.flume.source;
 
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
@@ -56,6 +58,7 @@ implements EventDrivenSource, Configurable {
   private Integer eventSize;
   private Map<String, String> formaterProp;
   private CounterGroup counterGroup = new CounterGroup();
+  private Boolean keepFields;
 
   public class syslogTcpHandler extends SimpleChannelHandler {
 
@@ -65,6 +68,10 @@ implements EventDrivenSource, Configurable {
       syslogUtils.setEventSize(eventSize);
     }
 
+    public void setKeepFields(boolean removeFields){
+      syslogUtils.setKeepFields(removeFields);
+    }
+
     public void setFormater(Map<String, String> prop) {
       syslogUtils.addFormats(prop);
     }
@@ -103,6 +110,7 @@ implements EventDrivenSource, Configurable {
         syslogTcpHandler handler = new syslogTcpHandler();
         handler.setEventSize(eventSize);
         handler.setFormater(formaterProp);
+        handler.setKeepFields(keepFields);
         return Channels.pipeline(handler);
       }
     });
@@ -146,6 +154,18 @@ implements EventDrivenSource, Configurable {
     eventSize = context.getInteger("eventSize", SyslogUtils.DEFAULT_SIZE);
     formaterProp = context.getSubProperties(
         SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);
+    keepFields = context.getBoolean
+      (SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS, false);
+  }
+
+  @VisibleForTesting
+  public int getSourcePort() {
+    SocketAddress localAddress = nettyChannel.getLocalAddress();
+    if (localAddress instanceof InetSocketAddress) {
+      InetSocketAddress addr = (InetSocketAddress) localAddress;
+      return addr.getPort();
+    }
+    return 0;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/1f95219e/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
index c2a29a1..f2ea932 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
@@ -20,6 +20,8 @@
 package org.apache.flume.source;
 
 import org.apache.flume.Event;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.event.EventBuilder;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
@@ -37,6 +39,8 @@ import java.util.regex.MatchResult;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
 public class SyslogUtils {
   final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_2 = "yyyy-MM-dd'T'HH:mm:ss.SZ";
   final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_1 = "yyyy-MM-dd'T'HH:mm:ss.S";
@@ -79,6 +83,7 @@ public class SyslogUtils {
   private boolean isBadEvent;
   private boolean isIncompleteEvent;
   private Integer maxSize;
+  private boolean keepFields;
 
   private class SyslogFormatter {
     public Pattern regexPattern;
@@ -98,15 +103,16 @@ public class SyslogUtils {
   }
 
   public SyslogUtils(boolean isUdp) {
-    this(DEFAULT_SIZE, isUdp);
+    this(DEFAULT_SIZE, SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS, isUdp);
   }
 
-  public SyslogUtils(Integer eventSize, boolean isUdp){
+  public SyslogUtils(Integer eventSize, boolean keepFields, boolean isUdp) {
     this.isUdp = isUdp;
     isBadEvent = false;
     isIncompleteEvent = false;
     maxSize = (eventSize < MIN_SIZE) ? MIN_SIZE : eventSize;
     baos = new ByteArrayOutputStream(eventSize);
+    this.keepFields = keepFields;
     initHeaderFormats();
   }
 
@@ -219,7 +225,7 @@ public class SyslogUtils {
       headers.put(EVENT_STATUS, SyslogStatus.INCOMPLETE.getSyslogStatus());
     }
 
-    if ((msgBody != null) && (msgBody.length() > 0)) {
+    if ((msgBody != null) && (msgBody.length() > 0) && !keepFields) {
       body = msgBody.getBytes();
     } else {
       body = baos.toByteArray();
@@ -380,4 +386,9 @@ public class SyslogUtils {
     this.maxSize = eventSize;
   }
 
-}
+  public void setKeepFields(Boolean keepFields) {
+    this.keepFields= keepFields;
+  }
+  }
+
+

http://git-wip-us.apache.org/repos/asf/flume/blob/1f95219e/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
index 2d7a429..eae26ed 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
@@ -20,6 +20,8 @@ package org.apache.flume.source;
 
 import java.util.ArrayList;
 import java.util.List;
+
+import com.google.common.base.Charsets;
 import org.apache.log4j.Logger;
 import org.apache.log4j.net.SyslogAppender;
 
@@ -91,6 +93,8 @@ public class TestSyslogUdpSource {
     source.stop();
     logger.removeAppender(appender);
 
+    String str = new String(e.getBody(), Charsets.UTF_8);
+    logger.info(str);
     Assert.assertNotNull(e);
     Assert.assertEquals(String.valueOf(SyslogAppender.LOG_FTP / 8),
         e.getHeaders().get(SyslogUtils.SYSLOG_FACILITY));

http://git-wip-us.apache.org/repos/asf/flume/blob/1f95219e/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
index 7208464..898096b 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
@@ -162,7 +162,8 @@ public class TestSyslogUtils {
             format1, host1, data1);
   }
 
-  public void checkHeader(String msg1, String stamp1, String format1, String host1, String data1) throws ParseException {
+  public void checkHeader(String msg1, String stamp1, String format1,
+      String host1, String data1) throws ParseException {
     SyslogUtils util = new SyslogUtils(false);
     ChannelBuffer buff = ChannelBuffers.buffer(200);
 
@@ -397,7 +398,7 @@ public class TestSyslogUtils {
   @Test
   public void testExtractBadEventLarge() {
     String badData1 = "<10> bad bad data bad bad\n";
-    SyslogUtils util = new SyslogUtils(5, false);
+    SyslogUtils util = new SyslogUtils(5, true, false);
     ChannelBuffer buff = ChannelBuffers.buffer(100);
     buff.writeBytes(badData1.getBytes());
     Event e = util.extractEvent(buff);

http://git-wip-us.apache.org/repos/asf/flume/blob/1f95219e/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
index 2be9c68..ee7b89b 100644
--- a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
@@ -166,7 +166,7 @@ RPC clients - Avro and Thrift
 As of Flume 1.4.0, Avro is the default RPC protocol.  The
 ``NettyAvroRpcClient`` and ``ThriftRpcClient`` implement the ``RpcClient``
 interface. The client needs to create this object with the host and port of
-the target Flume agent, and canthen use the ``RpcClient`` to send data into
+the target Flume agent, and can then use the ``RpcClient`` to send data into
 the agent. The following example shows how to use the Flume Client SDK API
 within a user's data-generating application:
 

http://git-wip-us.apache.org/repos/asf/flume/blob/1f95219e/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 4892dfc..98859ce 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1170,6 +1170,8 @@ Property Name    Default      Description
 **host**         --           Host name or IP address to bind to
 **port**         --           Port # to bind to
 eventSize        2500         Maximum size of a single event line, in bytes
+keepFields       false        Setting this to true will preserve the
+                              Timestamp and Hostname in the body of the event.
 selector.type                 replicating or multiplexing
 selector.*       replicating  Depends on the selector.type value
 interceptors     --           Space-separated list of interceptors