You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/10/21 21:15:58 UTC
git commit: FLUME-1666. Oops, forgot new test in previous commit
Updated Branches:
refs/heads/trunk 8db5de8f8 -> 730c822c8
FLUME-1666. Oops, forgot new test in previous commit
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/730c822c
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/730c822c
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/730c822c
Branch: refs/heads/trunk
Commit: 730c822c8fd3c393558ee63b48c82bb5a0763266
Parents: 8db5de8
Author: Mike Percy <mp...@cloudera.com>
Authored: Mon Oct 21 12:04:22 2013 -0700
Committer: Mike Percy <mp...@cloudera.com>
Committed: Mon Oct 21 12:14:33 2013 -0700
----------------------------------------------------------------------
.../flume/source/TestSyslogTcpSource.java | 136 +++++++++++++++++++
1 file changed, 136 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/730c822c/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
new file mode 100644
index 0000000..a6a1d5b
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+import com.google.common.base.Charsets;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestSyslogTcpSource {
+ private static final org.slf4j.Logger logger =
+ LoggerFactory.getLogger(TestSyslogTcpSource.class);
+ private SyslogTcpSource source;
+ private Channel channel;
+ private static final int TEST_SYSLOG_PORT = 0;
+ private final DateTime time = new DateTime();
+ private final String stamp1 = time.toString();
+ private final String host1 = "localhost.localdomain";
+ private final String data1 = "test syslog data";
+ private final String bodyWithTandH = stamp1 + " " + host1 + " " + data1;
+ // Helper function to generate a syslog message.
+ private byte[] getEvent() {
+ // timestamp with 'Z' appended, translates to UTC
+ final String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n";
+ return msg1.getBytes();
+ }
+
+ private void init(boolean keepFields){
+ source = new SyslogTcpSource();
+ channel = new MemoryChannel();
+
+ Configurables.configure(channel, new Context());
+
+ List<Channel> channels = new ArrayList<Channel>();
+ channels.add(channel);
+
+ ChannelSelector rcs = new ReplicatingChannelSelector();
+ rcs.setChannels(channels);
+
+ source.setChannelProcessor(new ChannelProcessor(rcs));
+ Context context = new Context();
+ context.put("port", String.valueOf(TEST_SYSLOG_PORT));
+ context.put("keepFields", String.valueOf(keepFields));
+
+ source.configure(context);
+
+ }
+ /** Tests the keepFields configuration parameter (enabled or disabled)
+ using SyslogTcpSource.*/
+ private void runKeepFieldsTest(boolean keepFields) throws IOException {
+ init(keepFields);
+ source.start();
+ // Write some message to the syslog port
+ Socket syslogSocket;
+ for (int i = 0; i < 10 ; i++) {
+ syslogSocket = new Socket(
+ InetAddress.getLocalHost(), source.getSourcePort());
+ syslogSocket.getOutputStream().write(getEvent());
+ syslogSocket.close();
+ }
+
+ List<Event> channelEvents = new ArrayList<Event>();
+ Transaction txn = channel.getTransaction();
+ txn.begin();
+ for (int i = 0; i < 10; i++) {
+ Event e = channel.take();
+ if (e == null) {
+ throw new NullPointerException("Event is null");
+ }
+ channelEvents.add(e);
+ }
+
+ try {
+ txn.commit();
+ } catch (Throwable t) {
+ txn.rollback();
+ } finally {
+ txn.close();
+ }
+
+ source.stop();
+ for (Event e : channelEvents) {
+ Assert.assertNotNull(e);
+ String str = new String(e.getBody(), Charsets.UTF_8);
+ logger.info(str);
+ if (keepFields) {
+ Assert.assertArrayEquals(bodyWithTandH.getBytes(), e.getBody());
+ } else if (!keepFields) {
+ Assert.assertArrayEquals(data1.getBytes(), e.getBody());
+ }
+ }
+ }
+
+ @Test
+ public void testKeepFields () throws IOException {
+ runKeepFieldsTest(true);
+ }
+
+ @Test
+ public void testRemoveFields() throws IOException{
+ runKeepFieldsTest(false);
+ }
+ }
+