You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/10/10 03:50:46 UTC
git commit: FLUME-1666. Syslog source strips timestamp and hostname
from log message body
Updated Branches:
refs/heads/trunk 02fc1a8cf -> 1f95219ea
FLUME-1666. Syslog source strips timestamp and hostname from log message body
(Jeff Lord via Mike Percy)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1f95219e
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1f95219e
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1f95219e
Branch: refs/heads/trunk
Commit: 1f95219ea6f87173018bde126a3485575a8ee252
Parents: 02fc1a8
Author: Mike Percy <mp...@cloudera.com>
Authored: Wed Oct 9 18:49:31 2013 -0700
Committer: Mike Percy <mp...@cloudera.com>
Committed: Wed Oct 9 18:49:31 2013 -0700
----------------------------------------------------------------------
.../SyslogSourceConfigurationConstants.java | 3 +++
.../apache/flume/source/SyslogTcpSource.java | 20 ++++++++++++++++++++
.../org/apache/flume/source/SyslogUtils.java | 19 +++++++++++++++----
.../flume/source/TestSyslogUdpSource.java | 4 ++++
.../apache/flume/source/TestSyslogUtils.java | 5 +++--
flume-ng-doc/sphinx/FlumeDeveloperGuide.rst | 2 +-
flume-ng-doc/sphinx/FlumeUserGuide.rst | 2 ++
7 files changed, 48 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/1f95219e/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
index 5a73c88..985949c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
@@ -66,6 +66,9 @@ public final class SyslogSourceConfigurationConstants {
public static final String CONFIG_READBUF_SIZE = "readBufferBytes";
public static final int DEFAULT_READBUF_SIZE = 1024;
+ public static final String CONFIG_KEEP_FIELDS = "keepFields";
+ public static final boolean DEFAULT_KEEP_FIELDS = false;
+
private SyslogSourceConfigurationConstants() {
// Disable explicit creation of objects.
}
http://git-wip-us.apache.org/repos/asf/flume/blob/1f95219e/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
index db9e0fd..7a12d27 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
@@ -19,10 +19,12 @@
package org.apache.flume.source;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
@@ -56,6 +58,7 @@ implements EventDrivenSource, Configurable {
private Integer eventSize;
private Map<String, String> formaterProp;
private CounterGroup counterGroup = new CounterGroup();
+ private Boolean keepFields;
public class syslogTcpHandler extends SimpleChannelHandler {
@@ -65,6 +68,10 @@ implements EventDrivenSource, Configurable {
syslogUtils.setEventSize(eventSize);
}
+ public void setKeepFields(boolean removeFields){
+ syslogUtils.setKeepFields(removeFields);
+ }
+
public void setFormater(Map<String, String> prop) {
syslogUtils.addFormats(prop);
}
@@ -103,6 +110,7 @@ implements EventDrivenSource, Configurable {
syslogTcpHandler handler = new syslogTcpHandler();
handler.setEventSize(eventSize);
handler.setFormater(formaterProp);
+ handler.setKeepFields(keepFields);
return Channels.pipeline(handler);
}
});
@@ -146,6 +154,18 @@ implements EventDrivenSource, Configurable {
eventSize = context.getInteger("eventSize", SyslogUtils.DEFAULT_SIZE);
formaterProp = context.getSubProperties(
SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);
+ keepFields = context.getBoolean
+ (SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS, false);
+ }
+
+ @VisibleForTesting
+ public int getSourcePort() {
+ SocketAddress localAddress = nettyChannel.getLocalAddress();
+ if (localAddress instanceof InetSocketAddress) {
+ InetSocketAddress addr = (InetSocketAddress) localAddress;
+ return addr.getPort();
+ }
+ return 0;
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/1f95219e/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
index c2a29a1..f2ea932 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
@@ -20,6 +20,8 @@
package org.apache.flume.source;
import org.apache.flume.Event;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.event.EventBuilder;
import org.jboss.netty.buffer.ChannelBuffer;
import org.slf4j.Logger;
@@ -37,6 +39,8 @@ import java.util.regex.MatchResult;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
public class SyslogUtils {
final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_2 = "yyyy-MM-dd'T'HH:mm:ss.SZ";
final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_1 = "yyyy-MM-dd'T'HH:mm:ss.S";
@@ -79,6 +83,7 @@ public class SyslogUtils {
private boolean isBadEvent;
private boolean isIncompleteEvent;
private Integer maxSize;
+ private boolean keepFields;
private class SyslogFormatter {
public Pattern regexPattern;
@@ -98,15 +103,16 @@ public class SyslogUtils {
}
public SyslogUtils(boolean isUdp) {
- this(DEFAULT_SIZE, isUdp);
+ this(DEFAULT_SIZE, SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS, isUdp);
}
- public SyslogUtils(Integer eventSize, boolean isUdp){
+ public SyslogUtils(Integer eventSize, boolean keepFields, boolean isUdp) {
this.isUdp = isUdp;
isBadEvent = false;
isIncompleteEvent = false;
maxSize = (eventSize < MIN_SIZE) ? MIN_SIZE : eventSize;
baos = new ByteArrayOutputStream(eventSize);
+ this.keepFields = keepFields;
initHeaderFormats();
}
@@ -219,7 +225,7 @@ public class SyslogUtils {
headers.put(EVENT_STATUS, SyslogStatus.INCOMPLETE.getSyslogStatus());
}
- if ((msgBody != null) && (msgBody.length() > 0)) {
+ if ((msgBody != null) && (msgBody.length() > 0) && !keepFields) {
body = msgBody.getBytes();
} else {
body = baos.toByteArray();
@@ -380,4 +386,9 @@ public class SyslogUtils {
this.maxSize = eventSize;
}
-}
+ public void setKeepFields(Boolean keepFields) {
+ this.keepFields= keepFields;
+ }
+ }
+
+
http://git-wip-us.apache.org/repos/asf/flume/blob/1f95219e/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
index 2d7a429..eae26ed 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
@@ -20,6 +20,8 @@ package org.apache.flume.source;
import java.util.ArrayList;
import java.util.List;
+
+import com.google.common.base.Charsets;
import org.apache.log4j.Logger;
import org.apache.log4j.net.SyslogAppender;
@@ -91,6 +93,8 @@ public class TestSyslogUdpSource {
source.stop();
logger.removeAppender(appender);
+ String str = new String(e.getBody(), Charsets.UTF_8);
+ logger.info(str);
Assert.assertNotNull(e);
Assert.assertEquals(String.valueOf(SyslogAppender.LOG_FTP / 8),
e.getHeaders().get(SyslogUtils.SYSLOG_FACILITY));
http://git-wip-us.apache.org/repos/asf/flume/blob/1f95219e/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
index 7208464..898096b 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
@@ -162,7 +162,8 @@ public class TestSyslogUtils {
format1, host1, data1);
}
- public void checkHeader(String msg1, String stamp1, String format1, String host1, String data1) throws ParseException {
+ public void checkHeader(String msg1, String stamp1, String format1,
+ String host1, String data1) throws ParseException {
SyslogUtils util = new SyslogUtils(false);
ChannelBuffer buff = ChannelBuffers.buffer(200);
@@ -397,7 +398,7 @@ public class TestSyslogUtils {
@Test
public void testExtractBadEventLarge() {
String badData1 = "<10> bad bad data bad bad\n";
- SyslogUtils util = new SyslogUtils(5, false);
+ SyslogUtils util = new SyslogUtils(5, true, false);
ChannelBuffer buff = ChannelBuffers.buffer(100);
buff.writeBytes(badData1.getBytes());
Event e = util.extractEvent(buff);
http://git-wip-us.apache.org/repos/asf/flume/blob/1f95219e/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
index 2be9c68..ee7b89b 100644
--- a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
@@ -166,7 +166,7 @@ RPC clients - Avro and Thrift
As of Flume 1.4.0, Avro is the default RPC protocol. The
``NettyAvroRpcClient`` and ``ThriftRpcClient`` implement the ``RpcClient``
interface. The client needs to create this object with the host and port of
-the target Flume agent, and canthen use the ``RpcClient`` to send data into
+the target Flume agent, and can then use the ``RpcClient`` to send data into
the agent. The following example shows how to use the Flume Client SDK API
within a user's data-generating application:
http://git-wip-us.apache.org/repos/asf/flume/blob/1f95219e/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 4892dfc..98859ce 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1170,6 +1170,8 @@ Property Name Default Description
**host** -- Host name or IP address to bind to
**port** -- Port # to bind to
eventSize 2500 Maximum size of a single event line, in bytes
+keepFields false Setting this to true will preserve the
+ Timestamp and Hostname in the body of the event.
selector.type replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors -- Space-separated list of interceptors