You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/05 00:11:05 UTC

[1/4] nifi git commit: NIFI-274 Adding ListenSyslog and PutSyslog to standard processors. - Refactoring connection handling on put side, removing number of buffers from properties and basing it off concurrent tasks for the processor. - Refactoring some o

Repository: nifi
Updated Branches:
  refs/heads/master 201eac052 -> 618f22e11


http://git-wip-us.apache.org/repos/asf/nifi/blob/9c542432/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
new file mode 100644
index 0000000..40a9123
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TestPutSyslog {
+
+    private MockCollectingSender sender;
+    private MockPutSyslog proc;
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws IOException {
+        sender = new MockCollectingSender();
+        proc = new MockPutSyslog(sender);
+        runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutSyslog.HOSTNAME, "localhost");
+        runner.setProperty(PutSyslog.PORT, "12345");
+    }
+
+    @Test
+    public void testValidMessageStaticPropertiesUdp() {
+        final String pri = "34";
+        final String version = "1";
+        final String stamp = "2003-10-11T22:14:15.003Z";
+        final String host = "mymachine.example.com";
+        final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+        final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
+
+        runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
+        runner.setProperty(PutSyslog.MSG_VERSION, version);
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
+        runner.setProperty(PutSyslog.MSG_BODY, body);
+
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
+        Assert.assertEquals(1, sender.messages.size());
+        Assert.assertEquals(expectedMessage, sender.messages.get(0));
+    }
+
+    @Test
+    public void testValidMessageStaticPropertiesTcp() {
+        final String pri = "34";
+        final String version = "1";
+        final String stamp = "2003-10-11T22:14:15.003Z";
+        final String host = "mymachine.example.com";
+        final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+        final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
+
+        runner.setProperty(PutSyslog.PROTOCOL, PutSyslog.TCP_VALUE);
+        runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
+        runner.setProperty(PutSyslog.MSG_VERSION, version);
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
+        runner.setProperty(PutSyslog.MSG_BODY, body);
+
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
+        Assert.assertEquals(1, sender.messages.size());
+        Assert.assertEquals(expectedMessage, sender.messages.get(0).replace("\n", ""));
+    }
+
+    @Test
+    public void testValidMessageStaticPropertiesNoVersion() {
+        final String pri = "34";
+        final String stamp = "2003-10-11T22:14:15.003Z";
+        final String host = "mymachine.example.com";
+        final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+        final String expectedMessage = "<" + pri + ">" + stamp + " " + host + " " + body;
+
+        runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
+        runner.setProperty(PutSyslog.MSG_BODY, body);
+
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
+        Assert.assertEquals(1, sender.messages.size());
+        Assert.assertEquals(expectedMessage, sender.messages.get(0));
+    }
+
+    @Test
+    public void testValidMessageELProperties() {
+        final String pri = "34";
+        final String stamp = "2003-10-11T22:14:15.003Z";
+        final String host = "mymachine.example.com";
+        final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+        final String expectedMessage = "<" + pri + ">" + stamp + " " + host + " " + body;
+
+        runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
+        runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("syslog.priority", pri);
+        attributes.put("syslog.timestamp", stamp);
+        attributes.put("syslog.hostname", host);
+        attributes.put("syslog.body", body);
+
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
+        Assert.assertEquals(1, sender.messages.size());
+        Assert.assertEquals(expectedMessage, sender.messages.get(0));
+    }
+
+    @Test
+    public void testInvalidMessageELProperties() {
+        final String pri = "34";
+        final String stamp = "not-a-timestamp";
+        final String host = "mymachine.example.com";
+        final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+        runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
+        runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("syslog.priority", pri);
+        attributes.put("syslog.timestamp", stamp);
+        attributes.put("syslog.hostname", host);
+        attributes.put("syslog.body", body);
+
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_INVALID, 1);
+        Assert.assertEquals(0, sender.messages.size());
+    }
+
+    @Test
+    public void testIOExceptionOnSend() throws IOException {
+        final String pri = "34";
+        final String version = "1";
+        final String stamp = "2003-10-11T22:14:15.003Z";
+        final String host = "mymachine.example.com";
+        final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+        proc = new MockPutSyslog(new MockErrorSender());
+        runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutSyslog.HOSTNAME, "localhost");
+        runner.setProperty(PutSyslog.PORT, "12345");
+        runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
+        runner.setProperty(PutSyslog.MSG_VERSION, version);
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
+        runner.setProperty(PutSyslog.MSG_BODY, body);
+
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_FAILURE, 1);
+        Assert.assertEquals(0, sender.messages.size());
+    }
+
+    @Test
+    public void testIOExceptionCreatingConnection() throws IOException {
+        final String pri = "34";
+        final String version = "1";
+        final String stamp = "2003-10-11T22:14:15.003Z";
+        final String host = "mymachine.example.com";
+        final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+        Processor proc = new MockCreationErrorPutSyslog(new MockErrorSender(), 1);
+        runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutSyslog.HOSTNAME, "localhost");
+        runner.setProperty(PutSyslog.PORT, "12345");
+        runner.setProperty(PutSyslog.BATCH_SIZE, "1");
+        runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
+        runner.setProperty(PutSyslog.MSG_VERSION, version);
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
+        runner.setProperty(PutSyslog.MSG_BODY, body);
+
+        // the first run will throw IOException when calling send so the connection won't be re-qeued
+        // the second run will try to create a new connection but throw an exception which should be caught and route files to failure
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+        runner.run(2);
+
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_FAILURE, 2);
+        Assert.assertEquals(0, sender.messages.size());
+    }
+
+    @Test
+    public void testLargeMessageFailure() {
+        final String pri = "34";
+        final String stamp = "2015-10-15T22:14:15.003Z";
+        final String host = "mymachine.example.com";
+
+        final StringBuilder bodyBuilder = new StringBuilder(4096);
+        for (int i=0; i < 4096; i++) {
+            bodyBuilder.append("a");
+        }
+
+        runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
+        runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("syslog.priority", pri);
+        attributes.put("syslog.timestamp", stamp);
+        attributes.put("syslog.hostname", host);
+        attributes.put("syslog.body", bodyBuilder.toString());
+
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
+        runner.run();
+
+        // should have dynamically created a larger buffer
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
+        Assert.assertEquals(1, sender.messages.size());
+    }
+
+    @Test
+    public void testNoIncomingData() {
+        runner.setProperty(PutSyslog.MSG_PRIORITY, "10");
+        runner.setProperty(PutSyslog.MSG_VERSION, "1");
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, "2003-10-11T22:14:15.003Z");
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, "localhost");
+        runner.setProperty(PutSyslog.MSG_BODY, "test");
+
+        // queue one file but run several times to test no incoming data
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+        runner.run(5);
+
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testBatchingFlowFiles() {
+        runner.setProperty(PutSyslog.BATCH_SIZE, "10");
+        runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
+        runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("syslog.priority", "10");
+        attributes.put("syslog.timestamp", "2015-10-11T22:14:15.003Z");
+        attributes.put("syslog.hostname", "my.host.name");
+        attributes.put("syslog.body", "blah blah blah");
+
+        for (int i=0; i < 15; i++) {
+            runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
+        }
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 10);
+        Assert.assertEquals(10, sender.messages.size());
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 15);
+        Assert.assertEquals(15, sender.messages.size());
+    }
+
+    // Mock processor to return a MockCollectingSender
+    static class MockPutSyslog extends PutSyslog {
+
+        ChannelSender mockSender;
+
+        public MockPutSyslog(ChannelSender sender) {
+            this.mockSender = sender;
+        }
+
+        @Override
+        protected ChannelSender createSender(String protocol, String host, int port, Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
+            return mockSender;
+        }
+    }
+
+    // Mock processor to test exception when creating new senders
+    static class MockCreationErrorPutSyslog extends PutSyslog {
+
+        int numSendersCreated;
+        int numSendersAllowed;
+        ChannelSender mockSender;
+
+        public MockCreationErrorPutSyslog(ChannelSender sender, int numSendersAllowed) {
+            this.mockSender = sender;
+            this.numSendersAllowed = numSendersAllowed;
+        }
+
+        @Override
+        protected ChannelSender createSender(String protocol, String host, int port, Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
+            if (numSendersCreated >= numSendersAllowed) {
+                throw new IOException("too many senders");
+            }
+            numSendersCreated++;
+            return mockSender;
+        }
+    }
+
+    // Mock sender that saves any messages passed to send()
+    static class MockCollectingSender extends PutSyslog.ChannelSender {
+
+        List<String> messages = new ArrayList<>();
+
+        public MockCollectingSender() throws IOException {
+            super("myhost", 0, new LinkedBlockingQueue<ByteBuffer>(1), Charset.forName("UTF-8"));
+            this.bufferPool.offer(ByteBuffer.allocate(1024));
+        }
+
+        @Override
+        public void send(String message) throws IOException {
+            messages.add(message);
+            super.send(message);
+        }
+
+        @Override
+        void write(ByteBuffer buffer) throws IOException {
+
+        }
+
+        @Override
+        boolean isConnected() {
+            return true;
+        }
+
+        @Override
+        void close() {
+
+        }
+    }
+
+    // Mock sender that throws IOException on calls to write() or send()
+    static class MockErrorSender extends PutSyslog.ChannelSender {
+
+        public MockErrorSender() throws IOException {
+            super(null, 0, null, null);
+        }
+
+        @Override
+        public void send(String message) throws IOException {
+            throw new IOException("error");
+        }
+
+        @Override
+        void write(ByteBuffer buffer) throws IOException {
+            throw new IOException("error");
+        }
+
+        @Override
+        boolean isConnected() {
+            return false;
+        }
+
+        @Override
+        void close() {
+
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9c542432/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java
new file mode 100644
index 0000000..d1b1bbf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestSyslogParser {
+
+    static final Charset CHARSET = Charset.forName("UTF-8");
+
+    private SyslogParser parser;
+
+    @Before
+    public void setup() {
+        parser = new SyslogParser(CHARSET);
+    }
+
+    @Test
+    public void testRFC3164SingleDigitDay() {
+        final String pri = "10";
+        final String stamp = "Oct  1 13:14:04";
+        final String host = "my.host.com";
+        final String body = "some body message";
+        final String message = "<" + pri + ">" + stamp + " " + host + " " + body;
+
+        final byte[] bytes = message.getBytes(CHARSET);
+        final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+        buffer.clear();
+        buffer.put(bytes);
+
+        final SyslogEvent event = parser.parseEvent(buffer);
+        Assert.assertNotNull(event);
+        Assert.assertEquals(pri, event.getPriority());
+        Assert.assertEquals("2", event.getSeverity());
+        Assert.assertEquals("1", event.getFacility());
+        Assert.assertNull(event.getVersion());
+        Assert.assertEquals(stamp, event.getTimeStamp());
+        Assert.assertEquals(host, event.getHostName());
+        Assert.assertEquals(body, event.getMsgBody());
+        Assert.assertEquals(message, event.getFullMessage());
+        Assert.assertTrue(event.isValid());
+    }
+
+    @Test
+    public void testRFC3164DoubleDigitDay() {
+        final String pri = "31";
+        final String stamp = "Oct 13 14:14:43";
+        final String host = "localhost";
+        final String body = "AppleCameraAssistant[470]: DeviceMessageNotificationCallback: kIOPMMessageSystemPowerEventOccurred: 0x00000000";
+        final String message = "<" + pri + ">" + stamp + " " + host + " " + body;
+
+        final byte[] bytes = message.getBytes(CHARSET);
+        final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+        buffer.clear();
+        buffer.put(bytes);
+
+        final SyslogEvent event = parser.parseEvent(buffer);
+        Assert.assertNotNull(event);
+        Assert.assertEquals(pri, event.getPriority());
+        Assert.assertEquals("7", event.getSeverity());
+        Assert.assertEquals("3", event.getFacility());
+        Assert.assertNull(event.getVersion());
+        Assert.assertEquals(stamp, event.getTimeStamp());
+        Assert.assertEquals(host, event.getHostName());
+        Assert.assertEquals(body, event.getMsgBody());
+        Assert.assertEquals(message, event.getFullMessage());
+        Assert.assertTrue(event.isValid());
+    }
+
+    @Test
+    public void testRFC3164WithVersion() {
+        final String pri = "31";
+        final String version = "1";
+        final String stamp = "Oct 13 14:14:43";
+        final String host = "localhost";
+        final String body = "AppleCameraAssistant[470]: DeviceMessageNotificationCallback: kIOPMMessageSystemPowerEventOccurred: 0x00000000";
+        final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
+
+        final byte[] bytes = message.getBytes(CHARSET);
+        final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+        buffer.clear();
+        buffer.put(bytes);
+
+        final SyslogEvent event = parser.parseEvent(buffer);
+        Assert.assertNotNull(event);
+        Assert.assertEquals(pri, event.getPriority());
+        Assert.assertEquals("7", event.getSeverity());
+        Assert.assertEquals("3", event.getFacility());
+        Assert.assertEquals(version, event.getVersion());
+        Assert.assertEquals(stamp, event.getTimeStamp());
+        Assert.assertEquals(host, event.getHostName());
+        Assert.assertEquals(body, event.getMsgBody());
+        Assert.assertEquals(message, event.getFullMessage());
+        Assert.assertTrue(event.isValid());
+    }
+
+    @Test
+    public void testRFC5424WithVersion() {
+        final String pri = "34";
+        final String version = "1";
+        final String stamp = "2003-10-11T22:14:15.003Z";
+        final String host = "mymachine.example.com";
+        final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+        final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
+
+        final byte[] bytes = message.getBytes(CHARSET);
+        final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+        buffer.clear();
+        buffer.put(bytes);
+
+        final SyslogEvent event = parser.parseEvent(buffer);
+        Assert.assertNotNull(event);
+        Assert.assertEquals(pri, event.getPriority());
+        Assert.assertEquals("2", event.getSeverity());
+        Assert.assertEquals("4", event.getFacility());
+        Assert.assertEquals(version, event.getVersion());
+        Assert.assertEquals(stamp, event.getTimeStamp());
+        Assert.assertEquals(host, event.getHostName());
+        Assert.assertEquals(body, event.getMsgBody());
+        Assert.assertEquals(message, event.getFullMessage());
+        Assert.assertTrue(event.isValid());
+    }
+
+    @Test
+    public void testRFC5424WithoutVersion() {
+        final String pri = "34";
+        final String stamp = "2003-10-11T22:14:15.003Z";
+        final String host = "mymachine.example.com";
+        final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+        final String message = "<" + pri + ">" + stamp + " " + host + " " + body;
+
+        final byte[] bytes = message.getBytes(CHARSET);
+        final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+        buffer.clear();
+        buffer.put(bytes);
+
+        final SyslogEvent event = parser.parseEvent(buffer);
+        Assert.assertNotNull(event);
+        Assert.assertEquals(pri, event.getPriority());
+        Assert.assertEquals("2", event.getSeverity());
+        Assert.assertEquals("4", event.getFacility());
+        Assert.assertNull(event.getVersion());
+        Assert.assertEquals(stamp, event.getTimeStamp());
+        Assert.assertEquals(host, event.getHostName());
+        Assert.assertEquals(body, event.getMsgBody());
+        Assert.assertEquals(message, event.getFullMessage());
+        Assert.assertTrue(event.isValid());
+    }
+
+    @Test
+    public void testTrailingNewLine() {
+        final String message = "<31>Oct 13 15:43:23 localhost.home some message\n";
+
+        final byte[] bytes = message.getBytes(CHARSET);
+        final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+        buffer.clear();
+        buffer.put(bytes);
+
+        final SyslogEvent event = parser.parseEvent(buffer);
+        Assert.assertNotNull(event);
+        Assert.assertTrue(event.isValid());
+    }
+
+    @Test
+    public void testVariety() {
+        final List<String> messages = new ArrayList<>();
+
+        // supported examples from RFC 3164
+        messages.add("<34>Oct 11 22:14:15 mymachine su: 'su root' failed for " +
+                "lonvick on /dev/pts/8");
+        messages.add("<13>Feb  5 17:32:18 10.0.0.99 Use the BFG!");
+        messages.add("<165>Aug 24 05:34:00 CST 1987 mymachine myproc[10]: %% " +
+                "It's time to make the do-nuts.  %%  Ingredients: Mix=OK, Jelly=OK # " +
+                "Devices: Mixer=OK, Jelly_Injector=OK, Frier=OK # Transport: " +
+                "Conveyer1=OK, Conveyer2=OK # %%");
+        messages.add("<0>Oct 22 10:52:12 scapegoat 1990 Oct 22 10:52:01 TZ-6 " +
+                "scapegoat.dmz.example.org 10.1.2.3 sched[0]: That's All Folks!");
+
+        // supported examples from RFC 5424
+        messages.add("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - " +
+                "ID47 - BOM'su root' failed for lonvick on /dev/pts/8");
+        messages.add("<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc " +
+                "8710 - - %% It's time to make the do-nuts.");
+
+        // non-standard (but common) messages (RFC3339 dates, no version digit)
+        messages.add("<13>2003-08-24T05:14:15Z localhost snarf?");
+        messages.add("<13>2012-08-16T14:34:03-08:00 127.0.0.1 test shnap!");
+
+        for (final String message : messages) {
+            final byte[] bytes = message.getBytes(CHARSET);
+            final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+            buffer.clear();
+            buffer.put(bytes);
+
+            final SyslogEvent event = parser.parseEvent(buffer);
+            Assert.assertTrue(event.isValid());
+        }
+    }
+
+    @Test
+    public void testInvalidPriority() {
+        final String message = "10 Oct 13 14:14:43 localhost some body of the message";
+
+        final byte[] bytes = message.getBytes(CHARSET);
+        final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+        buffer.clear();
+        buffer.put(bytes);
+
+        final SyslogEvent event = parser.parseEvent(buffer);
+        Assert.assertNotNull(event);
+        Assert.assertFalse(event.isValid());
+        Assert.assertEquals(message, event.getFullMessage());
+    }
+
+    @Test
+    public void testParseWithSender() {
+        final String sender = "127.0.0.1";
+        final String message = "<31>Oct 13 15:43:23 localhost.home some message\n";
+
+        final byte[] bytes = message.getBytes(CHARSET);
+        final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+        buffer.clear();
+        buffer.put(bytes);
+
+        final SyslogEvent event = parser.parseEvent(buffer, sender);
+        Assert.assertNotNull(event);
+        Assert.assertTrue(event.isValid());
+        Assert.assertEquals(sender, event.getSender());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9c542432/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 841818a..d48b755 100644
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -46,7 +46,7 @@
         <module>nifi-image-bundle</module>
         <module>nifi-avro-bundle</module>
         <module>nifi-couchbase-bundle</module>
-    </modules>
+  </modules>
     <dependencyManagement>
         <dependencies>
             <dependency>
@@ -131,4 +131,4 @@
             </dependency>
         </dependencies>
     </dependencyManagement>
-</project>
+</project>
\ No newline at end of file


[3/4] nifi git commit: NIFI-274 - added use of Selectors for TCP and UDP connections. Added a max connections to the TCP thread - Added comments and code review changes - fixed fixbugs bug

Posted by tk...@apache.org.
NIFI-274 - added use of Selectors for TCP and UDP connections. Added a max connections to the TCP thread
         - Added comments and code review changes
         - fixed fixbugs bug


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5611dac3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5611dac3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5611dac3

Branch: refs/heads/master
Commit: 5611dac3f88efb9ba3148634b0546054363ff5b2
Parents: 9c54243
Author: Tony Kurc <tr...@gmail.com>
Authored: Fri Oct 30 08:45:06 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Wed Nov 4 18:00:18 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/ListenSyslog.java  | 328 +++++++++++++------
 .../nifi/processors/standard/PutSyslog.java     |  45 +--
 .../processors/standard/TestListenSyslog.java   |   3 +-
 3 files changed, 247 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5611dac3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 9f57c9f..8012b88 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -16,6 +16,34 @@
  */
 package org.apache.nifi.processors.standard;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -40,29 +68,6 @@ import org.apache.nifi.processors.standard.util.SyslogEvent;
 import org.apache.nifi.processors.standard.util.SyslogParser;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.StandardSocketOptions;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 @Tags({"syslog", "listen", "udp", "tcp", "logs"})
 @CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " +
@@ -104,7 +109,13 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             .defaultValue("1 MB")
             .required(true)
             .build();
-
+    public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
+            .name("Max number of TCP connections")
+            .description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
+            .addValidator(StandardValidators.createLongValidator(1, 65535, true))
+            .defaultValue("2")
+            .required(true)
+            .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -132,6 +143,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         descriptors.add(RECV_BUFFER_SIZE);
         descriptors.add(MAX_SOCKET_BUFFER_SIZE);
         descriptors.add(CHARSET);
+        descriptors.add(MAX_CONNECTIONS);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
         final Set<Relationship> relationships = new HashSet<>();
@@ -168,14 +180,21 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         final String protocol = context.getProperty(PROTOCOL).getValue();
         final String charSet = context.getProperty(CHARSET).getValue();
+        final int maxConnections;
+
+        if (protocol.equals(UDP_VALUE.getValue())) {
+            maxConnections = 1;
+        } else{
+            maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
+        }
 
         parser = new SyslogParser(Charset.forName(charSet));
-        bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE);
+        bufferPool = new BufferPool(maxConnections, bufferSize, false, Integer.MAX_VALUE);
         syslogEvents = new LinkedBlockingQueue<>(10);
         errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
 
         // create either a UDP or TCP reader and call open() to bind to the given port
-        channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents);
+        channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents, maxConnections);
         channelReader.open(port, maxChannelBufferSize);
 
         final Thread readerThread = new Thread(channelReader);
@@ -185,12 +204,12 @@ public class ListenSyslog extends AbstractSyslogProcessor {
     }
 
     // visible for testing to be overridden and provide a mock ChannelReader if desired
-    protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents)
+    protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections)
             throws IOException {
         if (protocol.equals(UDP_VALUE.getValue())) {
             return new DatagramChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
         } else {
-            return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
+            return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger(), maxConnections);
         }
     }
 
@@ -287,6 +306,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         private final ProcessorLog logger;
         private DatagramChannel datagramChannel;
         private volatile boolean stopped = false;
+        private Selector selector;
 
         public DatagramChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
                                      final ProcessorLog logger) {
@@ -308,37 +328,48 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                 }
             }
             datagramChannel.socket().bind(new InetSocketAddress(port));
+            selector = Selector.open();
+            datagramChannel.register(selector, SelectionKey.OP_READ);
         }
 
         @Override
         public void run() {
+            final ByteBuffer buffer = bufferPool.poll();
             while (!stopped) {
-                final ByteBuffer buffer = bufferPool.poll();
                 try {
-                    if (buffer == null) {
-                        Thread.sleep(10L);
-                        logger.debug("no available buffers, continuing...");
-                        continue;
-                    }
-
-                    final SocketAddress sender = datagramChannel.receive(buffer);
-                    if (sender == null) {
-                        Thread.sleep(1000L); // nothing to do so wait...
-                    } else {
-                        final SyslogEvent event = syslogParser.parseEvent(buffer); // TODO parse with sender?
-                        logger.trace(event.getFullMessage());
-                        syslogEvents.put(event); // block until space is available
+                    int selected = selector.select();
+                    if (selected > 0){
+                        Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
+                        while (selectorKeys.hasNext()) {
+                            SelectionKey key = selectorKeys.next();
+                            selectorKeys.remove();
+                            if (!key.isValid()) {
+                                continue;
+                            }
+                            DatagramChannel channel = (DatagramChannel) key.channel();
+                            SocketAddress sender;
+                            buffer.clear();
+                            while (!stopped && (sender = channel.receive(buffer)) != null) {
+                                final SyslogEvent event;
+                                if (sender instanceof InetSocketAddress) {
+                                    event = syslogParser.parseEvent(buffer, ((InetSocketAddress)sender).getAddress().toString());
+                                } else {
+                                    event = syslogParser.parseEvent(buffer);
+                                }
+                                logger.trace(event.getFullMessage());
+                                syslogEvents.put(event); // block until space is available
+                            }
+                        }
                     }
                 } catch (InterruptedException e) {
-                    stop();
+                    stopped = true;
                 } catch (IOException e) {
                     logger.error("Error reading from DatagramChannel", e);
-                }  finally {
-                    if (buffer != null) {
-                        bufferPool.returnBuffer(buffer, 0);
-                    }
                 }
             }
+            if (buffer != null) {
+                bufferPool.returnBuffer(buffer, 0);
+            }
         }
 
         @Override
@@ -348,11 +379,13 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
         @Override
         public void stop() {
+            selector.wakeup();
             stopped = true;
         }
 
         @Override
         public void close() {
+            IOUtils.closeQuietly(selector);
             IOUtils.closeQuietly(datagramChannel);
         }
     }
@@ -367,21 +400,27 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         private final SyslogParser syslogParser;
         private final BlockingQueue<SyslogEvent> syslogEvents;
         private final ProcessorLog logger;
-        private ServerSocketChannel serverSocketChannel;
-        private ExecutorService executor = Executors.newFixedThreadPool(2);
+        private final ExecutorService executor;
         private volatile boolean stopped = false;
+        private Selector selector;
+        private final BlockingQueue<SelectionKey> keyQueue;
+        private final int maxConnections;
+        private final AtomicInteger currentConnections = new AtomicInteger(0);
 
         public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
-                                   final ProcessorLog logger) {
+                                   final ProcessorLog logger, final int maxConnections) {
             this.bufferPool = bufferPool;
             this.syslogParser = syslogParser;
             this.syslogEvents = syslogEvents;
             this.logger = logger;
+            this.maxConnections = maxConnections;
+            this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
+            this.executor = Executors.newFixedThreadPool(maxConnections);
         }
 
         @Override
         public void open(final int port, int maxBufferSize) throws IOException {
-            serverSocketChannel = ServerSocketChannel.open();
+            final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
             serverSocketChannel.configureBlocking(false);
             if (maxBufferSize > 0) {
                 serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
@@ -391,42 +430,85 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                 }
             }
             serverSocketChannel.socket().bind(new InetSocketAddress(port));
+            selector = Selector.open();
+            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
         }
 
         @Override
         public void run() {
             while (!stopped) {
                 try {
-                    final SocketChannel socketChannel = serverSocketChannel.accept();
-                    if (socketChannel == null) {
-                        Thread.sleep(1000L); // wait for an incoming connection...
-                    } else {
-                        final SocketChannelHandler handler = new SocketChannelHandler(
-                                bufferPool, socketChannel, syslogParser, syslogEvents, logger);
-                        logger.debug("Accepted incoming connection");
-                        executor.submit(handler);
+                    int selected = selector.select();
+                    if (selected > 0){
+                        Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
+                        while (selectorKeys.hasNext()){
+                            SelectionKey key = selectorKeys.next();
+                            selectorKeys.remove();
+                            if (!key.isValid()){
+                                continue;
+                            }
+                            if (key.isAcceptable()) {
+                                // Handle new connections coming in
+                                final ServerSocketChannel channel = (ServerSocketChannel) key.channel();
+                                final SocketChannel socketChannel = channel.accept();
+                                // Check for available connections
+                                if (currentConnections.incrementAndGet() > maxConnections){
+                                    currentConnections.decrementAndGet();
+                                    logger.info("Rejecting connection from {} because max connections has been met", new Object[]{ socketChannel.getRemoteAddress().toString() });
+                                    IOUtils.closeQuietly(socketChannel);
+                                    continue;
+                                }
+                                logger.debug("Accepted incoming connection from {}",
+                                        new Object[]{socketChannel.getRemoteAddress().toString()} );
+                                // Set socket to non-blocking, and register with selector
+                                socketChannel.configureBlocking(false);
+                                SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
+                                // Prepare the byte buffer for the reads, clear it out and attach to key
+                                ByteBuffer buffer = bufferPool.poll();
+                                buffer.clear();
+                                buffer.mark();
+                                readKey.attach(buffer);
+                            } else if (key.isReadable()) {
+                                // Clear out the operations the select is interested in until done reading
+                                key.interestOps(0);
+                                // Create and execute the read handler
+                                final SocketChannelHandler handler = new SocketChannelHandler(key, this,
+                                        syslogParser, syslogEvents, logger);
+                                // and launch the thread
+                                executor.execute(handler);
+                            }
+                        }
+                    }
+                    // Add back all idle sockets to the select
+                    SelectionKey key;
+                    while((key = keyQueue.poll()) != null){
+                        key.interestOps(SelectionKey.OP_READ);
                     }
                 } catch (IOException e) {
                     logger.error("Error accepting connection from SocketChannel", e);
-                } catch (InterruptedException e) {
-                    stop();
                 }
             }
         }
 
         @Override
         public int getPort() {
-            return serverSocketChannel == null ? 0 : serverSocketChannel.socket().getLocalPort();
+            // Return the port for the key listening for accepts
+            for(SelectionKey key : selector.keys()){
+                if (key.isValid() && key.isAcceptable()) {
+                    return ((SocketChannel)key.channel()).socket().getLocalPort();
+                }
+            }
+            return 0;
         }
 
         @Override
         public void stop() {
             stopped = true;
+            selector.wakeup();
         }
 
         @Override
         public void close() {
-            IOUtils.closeQuietly(serverSocketChannel);
             executor.shutdown();
             try {
                 // Wait a while for existing tasks to terminate
@@ -439,6 +521,21 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                 // Preserve interrupt status
                 Thread.currentThread().interrupt();
             }
+            for(SelectionKey key : selector.keys()){
+                IOUtils.closeQuietly(key.channel());
+            }
+            IOUtils.closeQuietly(selector);
+        }
+
+        public void completeConnection(SelectionKey key) {
+            // connection is done. Return the buffer to the pool
+            bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0);
+            currentConnections.decrementAndGet();
+        }
+
+        public void addBackForSelection(SelectionKey key) {
+            keyQueue.offer(key);
+            selector.wakeup();
         }
 
     }
@@ -449,17 +546,17 @@ public class ListenSyslog extends AbstractSyslogProcessor {
      */
     public static class SocketChannelHandler implements Runnable {
 
-        private final BufferPool bufferPool;
-        private final SocketChannel socketChannel;
+        private final SelectionKey key;
+        private final SocketChannelReader dispatcher;
         private final SyslogParser syslogParser;
         private final BlockingQueue<SyslogEvent> syslogEvents;
         private final ProcessorLog logger;
         private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096);
 
-        public SocketChannelHandler(final BufferPool bufferPool, final SocketChannel socketChannel, final SyslogParser syslogParser,
+        public SocketChannelHandler(final SelectionKey key, final SocketChannelReader dispatcher, final SyslogParser syslogParser,
                                     final BlockingQueue<SyslogEvent> syslogEvents, final ProcessorLog logger) {
-            this.bufferPool = bufferPool;
-            this.socketChannel = socketChannel;
+            this.key = key;
+            this.dispatcher = dispatcher;
             this.syslogParser = syslogParser;
             this.syslogEvents = syslogEvents;
             this.logger = logger;
@@ -467,55 +564,72 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
         @Override
         public void run() {
-            try {
-                int bytesRead = 0;
-                while (bytesRead >= 0 && !Thread.interrupted()) {
-
-                    final ByteBuffer buffer = bufferPool.poll();
-                    if (buffer == null) {
-                        Thread.sleep(10L);
-                        logger.debug("no available buffers, continuing...");
-                        continue;
-                    }
+            boolean eof = false;
+            SocketChannel socketChannel = null;
+            ByteBuffer socketBuffer = null;
 
-                    try {
-                        // read until the buffer is full
-                        bytesRead = socketChannel.read(buffer);
-                        while (bytesRead > 0) {
-                            bytesRead = socketChannel.read(buffer);
-                        }
-                        buffer.flip();
-
-                        // go through the buffer looking for the end of each message
-                        int bufferLength = buffer.limit();
-                        for (int i = 0; i < bufferLength; i++) {
-                            byte currByte = buffer.get(i);
-                            currBytes.write(currByte);
-
-                            // at the end of a message so parse an event, reset the buffer, and break out of the loop
-                            if (currByte == '\n') {
-                                final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
-                                        socketChannel.socket().getInetAddress().toString());
-                                logger.trace(event.getFullMessage());
-                                syslogEvents.put(event); // block until space is available
-                                currBytes.reset();
-                            }
+            try {
+                int bytesRead;
+                socketChannel = (SocketChannel) key.channel();
+                socketBuffer = (ByteBuffer) key.attachment();
+                // read until the buffer is full
+                while ((bytesRead = socketChannel.read(socketBuffer)) > 0) {
+                    // prepare byte buffer for reading
+                    socketBuffer.flip();
+                    // mark the current position as start, in case of partial message read
+                    socketBuffer.mark();
+
+                    // get total bytes in buffer
+                    int total = socketBuffer.remaining();
+                    // go through the buffer looking for the end of each message
+                    currBytes.reset();
+                    for (int i = 0; i < total; i++) {
+                        // NOTE: For higher throughput, the looking for \n and copying into the byte
+                        // stream could be improved
+                        // Pull data out of buffer and cram into byte array
+                        byte currByte = socketBuffer.get();
+                        currBytes.write(currByte);
+
+                        // check if at end of a message
+                        if (currByte == '\n') {
+                            // parse an event, reset the buffer
+                            final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
+                                    socketChannel.socket().getInetAddress().toString());
+                            logger.trace(event.getFullMessage());
+                            syslogEvents.put(event); // block until space is available
+                            currBytes.reset();
+                            // Mark this as the start of the next message
+                            socketBuffer.mark();
                         }
-                    } finally {
-                        bufferPool.returnBuffer(buffer, 0);
                     }
+                    // Preserve bytes in buffer for next call to run
+                    // NOTE: This code could benefit from the  two ByteBuffer read calls to avoid
+                    //  this compact for higher throughput
+                    socketBuffer.reset();
+                    socketBuffer.compact();
+                    logger.debug("done handling SocketChannel");
+                }
+                // Check for closed socket
+                if( bytesRead < 0 ){
+                    eof = true;
                 }
-
-                logger.debug("done handling SocketChannel");
             } catch (ClosedByInterruptException | InterruptedException e) {
-                // nothing to do here
+                logger.debug("read loop interrupted, closing connection");
+                // Treat same as closed socket
+                eof = true;
             } catch (IOException e) {
                 logger.error("Error reading from channel", e);
+             // Treat same as closed socket
+                eof = true;
             } finally {
-                IOUtils.closeQuietly(socketChannel);
+                if(eof == true) {
+                    IOUtils.closeQuietly(socketChannel);
+                    dispatcher.completeConnection(key);
+                } else {
+                    dispatcher.addBackForSelection(key);
+                }
             }
         }
-
     }
 
     static void logMaxBufferWarning(final ProcessorLog logger, int maxBufferSize, int actualReceiveBufSize) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/5611dac3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
index 502b26f..5e558ca 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
@@ -225,6 +225,29 @@ public class PutSyslog extends AbstractSyslogProcessor {
         }
     }
 
+    private void pruneIdleSenders(final long idleThreshold){
+        long currentTime = System.currentTimeMillis();
+        final List<ChannelSender> putBack = new ArrayList<>();
+
+        // if a connection hasn't been used with in the threshold then it gets closed
+        ChannelSender sender;
+        while ((sender = senderPool.poll()) != null) {
+            if (currentTime > (sender.lastUsed + idleThreshold)) {
+                getLogger().debug("Closing idle connection...");
+                sender.close();
+            } else {
+                putBack.add(sender);
+            }
+        }
+        // re-queue senders that weren't idle, but if the queue is full then close the sender
+        for (ChannelSender putBackSender : putBack) {
+            boolean returned = senderPool.offer(putBackSender);
+            if (!returned) {
+                putBackSender.close();
+            }
+        }
+    }
+
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
         final String protocol = context.getProperty(PROTOCOL).getValue();
@@ -232,27 +255,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
 
         final List<FlowFile> flowFiles = session.get(batchSize);
         if (flowFiles == null || flowFiles.isEmpty()) {
-            final List<ChannelSender> putBack = new ArrayList<>();
-            final long expirationThreshold = context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
-
-            // if a connection hasn't been used with in the threshold then it gets closed
-            ChannelSender sender;
-            while ((sender = senderPool.poll()) != null) {
-                if (System.currentTimeMillis() > (sender.lastUsed + expirationThreshold)) {
-                    getLogger().debug("Closing idle connection...");
-                    sender.close();
-                } else {
-                    putBack.add(sender);
-                }
-            }
-
-            // re-queue senders that weren't idle, but if the queue is full then close the sender
-            for (ChannelSender putBackSender : putBack) {
-                boolean returned = senderPool.offer(putBackSender);
-                if (!returned) {
-                    putBackSender.close();
-                }
-            }
+            pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/5611dac3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index 0e0d972..eb71f88 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -391,7 +391,8 @@ public class TestListenSyslog {
         }
 
         @Override
-        protected ChannelReader createChannelReader(String protocol, BufferPool bufferPool, SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents) throws IOException {
+        protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser,
+                final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) {
             return new ChannelReader() {
                 @Override
                 public void open(int port, int maxBufferSize) throws IOException {


[2/4] nifi git commit: NIFI-274 Adding ListenSyslog and PutSyslog to standard processors. - Refactoring connection handling on put side, removing number of buffers from properties and basing it off concurrent tasks for the processor. - Refactoring some o

Posted by tk...@apache.org.
NIFI-274 Adding ListenSyslog and PutSyslog to standard processors.
- Refactoring connection handling on put side, removing number of buffers from properties and basing it off concurrent tasks for the processor.
- Refactoring some of the TCP handling so it keeps reading from a connection until the client closes it
- Adding an error queue
- Adding a sender field on the syslog event to record the system that sent the message


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9c542432
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9c542432
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9c542432

Branch: refs/heads/master
Commit: 9c542432da7c9df6087a180cd939820dcce7008d
Parents: 201eac0
Author: Bryan Bende <bb...@apache.org>
Authored: Tue Oct 13 09:48:19 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Wed Nov 4 17:56:13 2015 -0500

----------------------------------------------------------------------
 .../standard/AbstractSyslogProcessor.java       |  81 +++
 .../nifi/processors/standard/ListenSyslog.java  | 527 +++++++++++++++++++
 .../nifi/processors/standard/PutSyslog.java     | 460 ++++++++++++++++
 .../processors/standard/util/SyslogEvent.java   | 180 +++++++
 .../processors/standard/util/SyslogParser.java  | 165 ++++++
 .../org.apache.nifi.processor.Processor         |   2 +
 .../processors/standard/TestListenSyslog.java   | 426 +++++++++++++++
 .../nifi/processors/standard/TestPutSyslog.java | 398 ++++++++++++++
 .../standard/util/TestSyslogParser.java         | 253 +++++++++
 nifi-nar-bundles/pom.xml                        |   4 +-
 10 files changed, 2494 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9c542432/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
new file mode 100644
index 0000000..f7d5eeb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.util.StandardValidators;
+
+/**
+ * Base class for Syslog processors.
+ */
+public abstract class AbstractSyslogProcessor extends AbstractProcessor {
+
+    public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP");
+    public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP");
+
+    public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
+            .Builder().name("Protocol")
+            .description("The protocol for Syslog communication, either TCP or UDP.")
+            .required(true)
+            .allowableValues(TCP_VALUE, UDP_VALUE)
+            .defaultValue(UDP_VALUE.getValue())
+            .build();
+    public static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The port for Syslog communication.")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("Character Set")
+            .description("Specifies which character set of the Syslog messages")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+
+    /**
+     * FlowFile Attributes for each Syslog message.
+     */
+    public enum SyslogAttributes implements FlowFileAttributeKey {
+        PRIORITY("syslog.priority"),
+        SEVERITY("syslog.severity"),
+        FACILITY("syslog.facility"),
+        VERSION("syslog.version"),
+        TIMESTAMP("syslog.timestamp"),
+        HOSTNAME("syslog.hostname"),
+        SENDER("syslog.sender"),
+        BODY("syslog.body"),
+        VALID("syslog.valid");
+
+        private String key;
+
+        SyslogAttributes(String key) {
+            this.key = key;
+        }
+
+        @Override
+        public String key() {
+            return key;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9c542432/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
new file mode 100644
index 0000000..9f57c9f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.io.nio.BufferPool;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.SyslogEvent;
+import org.apache.nifi.processors.standard.util.SyslogParser;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"syslog", "listen", "udp", "tcp", "logs"})
+@CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " +
+        "expressions for RFC5424 and RFC3164 formatted messages. The format of each message is: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) " +
+        "where version is optional. The timestamp can be an RFC5424 timestamp with a format of \"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", " +
+        "or it can be an RFC3164 timestamp with a format of \"MMM d HH:mm:ss\". If an incoming messages matches one of these patterns, the message will be " +
+        "parsed and the individual pieces will be placed in FlowFile attributes, with the original message in the content of the FlowFile. If an incoming " +
+        "message does not match one of these patterns it will not be parsed and the syslog.valid attribute will be set to false with the original message " +
+        "in the content of the FlowFile. Valid messages will be transferred on the success relationship, and invalid messages will be transferred on the " +
+        "invalid relationship.")
+@WritesAttributes({ @WritesAttribute(attribute="syslog.priority", description="The priority of the Syslog message."),
+                    @WritesAttribute(attribute="syslog.severity", description="The severity of the Syslog message derived from the priority."),
+                    @WritesAttribute(attribute="syslog.facility", description="The facility of the Syslog message derived from the priority."),
+                    @WritesAttribute(attribute="syslog.version", description="The optional version from the Syslog message."),
+                    @WritesAttribute(attribute="syslog.timestamp", description="The timestamp of the Syslog message."),
+                    @WritesAttribute(attribute="syslog.hostname", description="The hostname of the Syslog message."),
+                    @WritesAttribute(attribute="syslog.sender", description="The hostname of the Syslog server that sent the message."),
+                    @WritesAttribute(attribute="syslog.body", description="The body of the Syslog message, everything after the hostname."),
+                    @WritesAttribute(attribute="syslog.valid", description="An indicator of whether this message matched the expected formats. " +
+                            "If this value is false, the other attributes will be empty and only the original message will be available in the content."),
+                    @WritesAttribute(attribute="mime.type", description="The mime.type of the FlowFile which will be text/plain for Syslog messages.")})
+public class ListenSyslog extends AbstractSyslogProcessor {
+
+    public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder()
+            .name("Receive Buffer Size")
+            .description("The size of each buffer used to receive Syslog messages. Adjust this value appropriately based on the expected size of the " +
+                    "incoming Syslog messages. When UDP is selected each buffer will hold one Syslog message. When TCP is selected messages are read " +
+                    "from an incoming connection until the buffer is full, or the connection is closed. ")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("65507 KB")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder()
+            .name("Max Size of Socket Buffer")
+            .description("The maximum size of the socket buffer that should be used. This is a suggestion to the Operating System " +
+                    "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " +
+                    "the data can be read, and incoming data will be dropped.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("1 MB")
+            .required(true)
+            .build();
+
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Syslog messages that match one of the expected formats will be sent out this relationship as a FlowFile per message.")
+            .build();
+    public static final Relationship REL_INVALID = new Relationship.Builder()
+            .name("invalid")
+            .description("Syslog messages that do not match one of the expected formats will be sent out this relationship as a FlowFile per message.")
+            .build();
+
+    private Set<Relationship> relationships;
+    private List<PropertyDescriptor> descriptors;
+
+    private volatile BufferPool bufferPool;
+    private volatile ChannelReader channelReader;
+    private volatile SyslogParser parser;
+    private volatile BlockingQueue<SyslogEvent> syslogEvents;
+    private volatile BlockingQueue<SyslogEvent> errorEvents;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(PROTOCOL);
+        descriptors.add(PORT);
+        descriptors.add(RECV_BUFFER_SIZE);
+        descriptors.add(MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(CHARSET);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_INVALID);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+        // since properties were changed, clear any events that were queued
+        if (syslogEvents != null) {
+            syslogEvents.clear();
+        }
+        if (errorEvents != null) {
+            errorEvents.clear();
+        }
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) throws IOException {
+        final int port = context.getProperty(PORT).asInteger();
+        final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final String protocol = context.getProperty(PROTOCOL).getValue();
+        final String charSet = context.getProperty(CHARSET).getValue();
+
+        parser = new SyslogParser(Charset.forName(charSet));
+        bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE);
+        syslogEvents = new LinkedBlockingQueue<>(10);
+        errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
+
+        // create either a UDP or TCP reader and call open() to bind to the given port
+        channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents);
+        channelReader.open(port, maxChannelBufferSize);
+
+        final Thread readerThread = new Thread(channelReader);
+        readerThread.setName("ListenSyslog [" + getIdentifier() + "]");
+        readerThread.setDaemon(true);
+        readerThread.start();
+    }
+
+    // visible for testing to be overridden and provide a mock ChannelReader if desired
+    protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents)
+            throws IOException {
+        if (protocol.equals(UDP_VALUE.getValue())) {
+            return new DatagramChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
+        } else {
+            return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
+        }
+    }
+
+    // used for testing to access the random port that was selected
+    protected int getPort() {
+        return channelReader == null ? 0 : channelReader.getPort();
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        if (channelReader != null) {
+            channelReader.stop();
+            channelReader.close();
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        // try to pull from the error queue first, if empty then pull from main queue
+        SyslogEvent initialEvent = errorEvents.poll();
+        if (initialEvent == null) {
+            initialEvent = syslogEvents.poll();
+        }
+
+        // if nothing in either queue then just return
+        if (initialEvent == null) {
+            return;
+        }
+
+        final SyslogEvent event = initialEvent;
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(SyslogAttributes.PRIORITY.key(), event.getPriority());
+        attributes.put(SyslogAttributes.SEVERITY.key(), event.getSeverity());
+        attributes.put(SyslogAttributes.FACILITY.key(), event.getFacility());
+        attributes.put(SyslogAttributes.VERSION.key(), event.getVersion());
+        attributes.put(SyslogAttributes.TIMESTAMP.key(), event.getTimeStamp());
+        attributes.put(SyslogAttributes.HOSTNAME.key(), event.getHostName());
+        attributes.put(SyslogAttributes.SENDER.key(), event.getSender());
+        attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody());
+        attributes.put(SyslogAttributes.VALID.key(), String.valueOf(event.isValid()));
+        attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+
+        FlowFile flowFile = session.create();
+        flowFile = session.putAllAttributes(flowFile, attributes);
+
+        try {
+            // write the raw bytes of the message as the FlowFile content
+            flowFile = session.write(flowFile, new OutputStreamCallback() {
+                @Override
+                public void process(OutputStream out) throws IOException {
+                    out.write(event.getRawMessage());
+                }
+            });
+
+            if (event.isValid()) {
+                getLogger().info("Transferring {} to success", new Object[]{flowFile});
+                session.transfer(flowFile, REL_SUCCESS);
+            } else {
+                getLogger().info("Transferring {} to invalid", new Object[]{flowFile});
+                session.transfer(flowFile, REL_INVALID);
+            }
+
+        } catch (ProcessException e) {
+            getLogger().error("Error processing Syslog message", e);
+            errorEvents.offer(event);
+            session.remove(flowFile);
+        }
+    }
+
+    /**
+     * Reads messages from a channel until told to stop.
+     */
+    public interface ChannelReader extends Runnable {
+
+        void open(int port, int maxBufferSize) throws IOException;
+
+        int getPort();
+
+        void stop();
+
+        void close();
+    }
+
+    /**
+     * Reads from the Datagram channel into an available buffer. If data is read then the buffer is queued for
+     * processing, otherwise the buffer is returned to the buffer pool.
+     */
+    public static class DatagramChannelReader implements ChannelReader {
+
+        private final BufferPool bufferPool;
+        private final SyslogParser syslogParser;
+        private final BlockingQueue<SyslogEvent> syslogEvents;
+        private final ProcessorLog logger;
+        private DatagramChannel datagramChannel;
+        private volatile boolean stopped = false;
+
+        public DatagramChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
+                                     final ProcessorLog logger) {
+            this.bufferPool = bufferPool;
+            this.syslogParser = syslogParser;
+            this.syslogEvents = syslogEvents;
+            this.logger = logger;
+        }
+
+        @Override
+        public void open(final int port, int maxBufferSize) throws IOException {
+            datagramChannel = DatagramChannel.open();
+            datagramChannel.configureBlocking(false);
+            if (maxBufferSize > 0) {
+                datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
+                final int actualReceiveBufSize = datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF);
+                if (actualReceiveBufSize < maxBufferSize) {
+                    logMaxBufferWarning(logger, maxBufferSize, actualReceiveBufSize);
+                }
+            }
+            datagramChannel.socket().bind(new InetSocketAddress(port));
+        }
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                final ByteBuffer buffer = bufferPool.poll();
+                try {
+                    if (buffer == null) {
+                        Thread.sleep(10L);
+                        logger.debug("no available buffers, continuing...");
+                        continue;
+                    }
+
+                    final SocketAddress sender = datagramChannel.receive(buffer);
+                    if (sender == null) {
+                        Thread.sleep(1000L); // nothing to do so wait...
+                    } else {
+                        final SyslogEvent event = syslogParser.parseEvent(buffer); // TODO parse with sender?
+                        logger.trace(event.getFullMessage());
+                        syslogEvents.put(event); // block until space is available
+                    }
+                } catch (InterruptedException e) {
+                    stop();
+                } catch (IOException e) {
+                    logger.error("Error reading from DatagramChannel", e);
+                }  finally {
+                    if (buffer != null) {
+                        bufferPool.returnBuffer(buffer, 0);
+                    }
+                }
+            }
+        }
+
+        @Override
+        public int getPort() {
+            return datagramChannel == null ? 0 : datagramChannel.socket().getLocalPort();
+        }
+
+        @Override
+        public void stop() {
+            stopped = true;
+        }
+
+        @Override
+        public void close() {
+            IOUtils.closeQuietly(datagramChannel);
+        }
+    }
+
+    /**
+     * Accepts Socket connections on the given port and creates a handler for each connection to
+     * be executed by a thread pool.
+     */
+    public static class SocketChannelReader implements ChannelReader {
+
+        private final BufferPool bufferPool;
+        private final SyslogParser syslogParser;
+        private final BlockingQueue<SyslogEvent> syslogEvents;
+        private final ProcessorLog logger;
+        private ServerSocketChannel serverSocketChannel;
+        private ExecutorService executor = Executors.newFixedThreadPool(2);
+        private volatile boolean stopped = false;
+
+        public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
+                                   final ProcessorLog logger) {
+            this.bufferPool = bufferPool;
+            this.syslogParser = syslogParser;
+            this.syslogEvents = syslogEvents;
+            this.logger = logger;
+        }
+
+        @Override
+        public void open(final int port, int maxBufferSize) throws IOException {
+            serverSocketChannel = ServerSocketChannel.open();
+            serverSocketChannel.configureBlocking(false);
+            if (maxBufferSize > 0) {
+                serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
+                final int actualReceiveBufSize = serverSocketChannel.getOption(StandardSocketOptions.SO_RCVBUF);
+                if (actualReceiveBufSize < maxBufferSize) {
+                    logMaxBufferWarning(logger, maxBufferSize, actualReceiveBufSize);
+                }
+            }
+            serverSocketChannel.socket().bind(new InetSocketAddress(port));
+        }
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                try {
+                    final SocketChannel socketChannel = serverSocketChannel.accept();
+                    if (socketChannel == null) {
+                        Thread.sleep(1000L); // wait for an incoming connection...
+                    } else {
+                        final SocketChannelHandler handler = new SocketChannelHandler(
+                                bufferPool, socketChannel, syslogParser, syslogEvents, logger);
+                        logger.debug("Accepted incoming connection");
+                        executor.submit(handler);
+                    }
+                } catch (IOException e) {
+                    logger.error("Error accepting connection from SocketChannel", e);
+                } catch (InterruptedException e) {
+                    stop();
+                }
+            }
+        }
+
+        @Override
+        public int getPort() {
+            return serverSocketChannel == null ? 0 : serverSocketChannel.socket().getLocalPort();
+        }
+
+        @Override
+        public void stop() {
+            stopped = true;
+        }
+
+        @Override
+        public void close() {
+            IOUtils.closeQuietly(serverSocketChannel);
+            executor.shutdown();
+            try {
+                // Wait a while for existing tasks to terminate
+                if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
+                    executor.shutdownNow();
+                }
+            } catch (InterruptedException ie) {
+                // (Re-)Cancel if current thread also interrupted
+                executor.shutdownNow();
+                // Preserve interrupt status
+                Thread.currentThread().interrupt();
+            }
+        }
+
+    }
+
+    /**
+     * Reads from the given SocketChannel into the provided buffer. If data is read then the buffer is queued for
+     * processing, otherwise the buffer is returned to the buffer pool.
+     */
+    public static class SocketChannelHandler implements Runnable {
+
+        private final BufferPool bufferPool;
+        private final SocketChannel socketChannel;
+        private final SyslogParser syslogParser;
+        private final BlockingQueue<SyslogEvent> syslogEvents;
+        private final ProcessorLog logger;
+        private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096);
+
+        public SocketChannelHandler(final BufferPool bufferPool, final SocketChannel socketChannel, final SyslogParser syslogParser,
+                                    final BlockingQueue<SyslogEvent> syslogEvents, final ProcessorLog logger) {
+            this.bufferPool = bufferPool;
+            this.socketChannel = socketChannel;
+            this.syslogParser = syslogParser;
+            this.syslogEvents = syslogEvents;
+            this.logger = logger;
+        }
+
+        @Override
+        public void run() {
+            try {
+                int bytesRead = 0;
+                while (bytesRead >= 0 && !Thread.interrupted()) {
+
+                    final ByteBuffer buffer = bufferPool.poll();
+                    if (buffer == null) {
+                        Thread.sleep(10L);
+                        logger.debug("no available buffers, continuing...");
+                        continue;
+                    }
+
+                    try {
+                        // read until the buffer is full
+                        bytesRead = socketChannel.read(buffer);
+                        while (bytesRead > 0) {
+                            bytesRead = socketChannel.read(buffer);
+                        }
+                        buffer.flip();
+
+                        // go through the buffer looking for the end of each message
+                        int bufferLength = buffer.limit();
+                        for (int i = 0; i < bufferLength; i++) {
+                            byte currByte = buffer.get(i);
+                            currBytes.write(currByte);
+
+                            // at the end of a message so parse an event, reset the buffer, and break out of the loop
+                            if (currByte == '\n') {
+                                final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
+                                        socketChannel.socket().getInetAddress().toString());
+                                logger.trace(event.getFullMessage());
+                                syslogEvents.put(event); // block until space is available
+                                currBytes.reset();
+                            }
+                        }
+                    } finally {
+                        bufferPool.returnBuffer(buffer, 0);
+                    }
+                }
+
+                logger.debug("done handling SocketChannel");
+            } catch (ClosedByInterruptException | InterruptedException e) {
+                // nothing to do here
+            } catch (IOException e) {
+                logger.error("Error reading from channel", e);
+            } finally {
+                IOUtils.closeQuietly(socketChannel);
+            }
+        }
+
+    }
+
+    static void logMaxBufferWarning(final ProcessorLog logger, int maxBufferSize, int actualReceiveBufSize) {
+        logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to "
+                + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's "
+                + "maximum receive buffer");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9c542432/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
new file mode 100644
index 0000000..502b26f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.SyslogParser;
+import org.apache.nifi.util.ObjectHolder;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@TriggerWhenEmpty
+@Tags({"syslog", "put", "udp", "tcp", "logs"})
+@CapabilityDescription("Sends Syslog messages to a given host and port over TCP or UDP. Messages are constructed from the \"Message ___\" properties of the processor " +
+        "which can use expression language to generate messages from incoming FlowFiles. The properties are used to construct messages of the form: " +
+        "(<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) where version is optional.  The constructed messages are checked against regular expressions for " +
+        "RFC5424 and RFC3164 formatted messages. The timestamp can be an RFC5424 timestamp with a format of \"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", " +
+        "or it can be an RFC3164 timestamp with a format of \"MMM d HH:mm:ss\". If a message is constructed that does not form a valid Syslog message according to the " +
+        "above description, then it is routed to the invalid relationship. Valid messages are pushed to Syslog with successes routed to the success relationship, and " +
+        "failures routed to the failure relationship.")
+public class PutSyslog extends AbstractSyslogProcessor {
+
+    public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The ip address or hostname of the Syslog server.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("localhost")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
+            .name("Send Buffer Size")
+            .description("The size of each buffer used to send a Syslog message. Adjust this value appropriately based on the expected size of the " +
+                    "Syslog messages being produced. Messages larger than this buffer size will still be sent, but will not make use of the buffer pool.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("2048 B")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor
+            .Builder().name("Batch Size")
+            .description("The number of incoming FlowFiles to process in a single execution of this processor.")
+            .required(true)
+            .defaultValue("25")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor
+            .Builder().name("Idle Connection Expiration")
+            .description("The amount of time a connection should be held open without being used before closing the connection.")
+            .required(true)
+            .defaultValue("5 seconds")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor MSG_PRIORITY = new PropertyDescriptor
+            .Builder().name("Message Priority")
+            .description("The priority for the Syslog messages, excluding < >.")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+    public static final PropertyDescriptor MSG_VERSION = new PropertyDescriptor
+            .Builder().name("Message Version")
+            .description("The version for the Syslog messages.")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+    public static final PropertyDescriptor MSG_TIMESTAMP = new PropertyDescriptor
+            .Builder().name("Message Timestamp")
+            .description("The timestamp for the Syslog messages. The timestamp can be an RFC5424 timestamp with a format of " +
+                    "\"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", \" or it can be an RFC3164 timestamp " +
+                    "with a format of \"MMM d HH:mm:ss\".")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+    public static final PropertyDescriptor MSG_HOSTNAME = new PropertyDescriptor
+            .Builder().name("Message Hostname")
+            .description("The hostname for the Syslog messages.")
+            .required(true)
+            .defaultValue("${hostname(true)}")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+    public static final PropertyDescriptor MSG_BODY = new PropertyDescriptor
+            .Builder().name("Message Body")
+            .description("The body for the Syslog messages.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are sent successfully to Syslog are sent out this relationship.")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that failed to send to Syslog are sent out this relationship.")
+            .build();
+    public static final Relationship REL_INVALID = new Relationship.Builder()
+            .name("invalid")
+            .description("FlowFiles that do not form a valid Syslog message are sent out this relationship.")
+            .build();
+
+    private Set<Relationship> relationships;
+    private List<PropertyDescriptor> descriptors;
+
+    private volatile BlockingQueue<ByteBuffer> bufferPool;
+    private volatile BlockingQueue<ChannelSender> senderPool;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(HOSTNAME);
+        descriptors.add(PROTOCOL);
+        descriptors.add(PORT);
+        descriptors.add(IDLE_EXPIRATION);
+        descriptors.add(SEND_BUFFER_SIZE);
+        descriptors.add(BATCH_SIZE);
+        descriptors.add(CHARSET);
+        descriptors.add(MSG_PRIORITY);
+        descriptors.add(MSG_VERSION);
+        descriptors.add(MSG_TIMESTAMP);
+        descriptors.add(MSG_HOSTNAME);
+        descriptors.add(MSG_BODY);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_INVALID);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) throws IOException {
+        final int bufferSize = context.getProperty(SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        this.bufferPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
+        for (int i=0; i < context.getMaxConcurrentTasks(); i++) {
+            this.bufferPool.offer(ByteBuffer.allocate(bufferSize));
+        }
+
+        // create a pool of senders based on the number of concurrent tasks for this processor
+        this.senderPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
+        for (int i=0; i < context.getMaxConcurrentTasks(); i++) {
+            senderPool.offer(createSender(context, bufferPool));
+        }
+    }
+
+    protected ChannelSender createSender(final ProcessContext context, final BlockingQueue<ByteBuffer> bufferPool) throws IOException {
+        final int port = context.getProperty(PORT).asInteger();
+        final String host = context.getProperty(HOSTNAME).getValue();
+        final String protocol = context.getProperty(PROTOCOL).getValue();
+        final String charSet = context.getProperty(CHARSET).getValue();
+        return createSender(protocol, host, port, Charset.forName(charSet), bufferPool);
+    }
+
+    // visible for testing to override and provide a mock sender if desired
+    protected ChannelSender createSender(final String protocol, final String host, final int port, final Charset charset, final BlockingQueue<ByteBuffer> bufferPool)
+            throws IOException {
+        if (protocol.equals(UDP_VALUE.getValue())) {
+            return new DatagramChannelSender(host, port, bufferPool, charset);
+        } else {
+            return new SocketChannelSender(host, port, bufferPool, charset);
+        }
+    }
+
+    @OnStopped
+    public void onStopped() {
+        ChannelSender sender = senderPool.poll();
+        while (sender != null) {
+            sender.close();
+            sender = senderPool.poll();
+        }
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final String protocol = context.getProperty(PROTOCOL).getValue();
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+        final List<FlowFile> flowFiles = session.get(batchSize);
+        if (flowFiles == null || flowFiles.isEmpty()) {
+            final List<ChannelSender> putBack = new ArrayList<>();
+            final long expirationThreshold = context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
+
+            // if a connection hasn't been used with in the threshold then it gets closed
+            ChannelSender sender;
+            while ((sender = senderPool.poll()) != null) {
+                if (System.currentTimeMillis() > (sender.lastUsed + expirationThreshold)) {
+                    getLogger().debug("Closing idle connection...");
+                    sender.close();
+                } else {
+                    putBack.add(sender);
+                }
+            }
+
+            // re-queue senders that weren't idle, but if the queue is full then close the sender
+            for (ChannelSender putBackSender : putBack) {
+                boolean returned = senderPool.offer(putBackSender);
+                if (!returned) {
+                    putBackSender.close();
+                }
+            }
+            return;
+        }
+
+        // get a sender from the pool, or create a new one if the pool is empty
+        // if we can't create a new connection then route flow files to failure and yield
+        ChannelSender sender = senderPool.poll();
+        if (sender == null) {
+            try {
+                getLogger().debug("No available connections, creating a new one...");
+                sender = createSender(context, bufferPool);
+            } catch (IOException e) {
+                for (final FlowFile flowFile : flowFiles) {
+                    getLogger().error("No available connections, and unable to create a new one, transferring {} to failure",
+                            new Object[]{flowFile}, e);
+                    session.transfer(flowFile, REL_FAILURE);
+                }
+                context.yield();
+                return;
+            }
+        }
+
+        final ObjectHolder<IOException> exceptionHolder = new ObjectHolder<>(null);
+        try {
+            for (FlowFile flowFile : flowFiles) {
+                final String priority = context.getProperty(MSG_PRIORITY).evaluateAttributeExpressions(flowFile).getValue();
+                final String version = context.getProperty(MSG_VERSION).evaluateAttributeExpressions(flowFile).getValue();
+                final String timestamp = context.getProperty(MSG_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
+                final String hostname = context.getProperty(MSG_HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
+                final String body = context.getProperty(MSG_BODY).evaluateAttributeExpressions(flowFile).getValue();
+
+                final StringBuilder messageBuilder = new StringBuilder();
+                messageBuilder.append("<").append(priority).append(">");
+                if (version != null) {
+                    messageBuilder.append(version).append(" ");
+                }
+                messageBuilder.append(timestamp).append(" ").append(hostname).append(" ").append(body);
+
+                final String fullMessage = messageBuilder.toString();
+                getLogger().debug(fullMessage);
+
+                if (isValid(fullMessage)) {
+                    try {
+                        // now that we validated, add a new line if doing TCP
+                        if (protocol.equals(TCP_VALUE.getValue())) {
+                            messageBuilder.append('\n');
+                        }
+
+                        sender.send(messageBuilder.toString());
+                        getLogger().info("Transferring {} to success", new Object[]{flowFile});
+                        session.transfer(flowFile, REL_SUCCESS);
+                    } catch (IOException e) {
+                        getLogger().error("Transferring {} to failure", new Object[]{flowFile}, e);
+                        session.transfer(flowFile, REL_FAILURE);
+                        exceptionHolder.set(e);
+                    }
+                } else {
+                    getLogger().info("Transferring {} to invalid", new Object[]{flowFile});
+                    session.transfer(flowFile, REL_INVALID);
+                }
+            }
+        } finally {
+            // if the connection is still open and no IO errors happened then try to return, if pool is full then close
+            if (sender.isConnected() && exceptionHolder.get() == null) {
+                boolean returned = senderPool.offer(sender);
+                if (!returned) {
+                    sender.close();
+                }
+            } else {
+                // probably already closed here, but quietly close anyway to be safe
+                sender.close();
+            }
+
+        }
+
+    }
+
+    private boolean isValid(final String message) {
+        for (Pattern pattern : SyslogParser.MESSAGE_PATTERNS) {
+            Matcher matcher = pattern.matcher(message);
+            if (matcher.matches()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Base class for sending messages over a channel.
+     */
+    public static abstract class ChannelSender {
+
+        final int port;
+        final String host;
+        final BlockingQueue<ByteBuffer> bufferPool;
+        final Charset charset;
+        volatile long lastUsed;
+
+        ChannelSender(final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException {
+            this.port = port;
+            this.host = host;
+            this.bufferPool = bufferPool;
+            this.charset = charset;
+        }
+
+        public void send(final String message) throws IOException {
+            final byte[] bytes = message.getBytes(charset);
+
+            boolean shouldReturn = true;
+            ByteBuffer buffer = bufferPool.poll();
+            if (buffer == null) {
+                buffer = ByteBuffer.allocate(bytes.length);
+                shouldReturn = false;
+            } else if (buffer.limit() < bytes.length) {
+                // we need a large buffer so return the one we got and create a new bigger one
+                bufferPool.offer(buffer);
+                buffer = ByteBuffer.allocate(bytes.length);
+                shouldReturn = false;
+            }
+
+            try {
+                buffer.clear();
+                buffer.put(bytes);
+                buffer.flip();
+                write(buffer);
+                lastUsed = System.currentTimeMillis();
+            } finally {
+                if (shouldReturn) {
+                    bufferPool.offer(buffer);
+                }
+            }
+        }
+
+        // write the given buffer to the underlying channel
+        abstract void write(ByteBuffer buffer) throws IOException;
+
+        // returns true if the underlying channel is connected
+        abstract boolean isConnected();
+
+        // close the underlying channel
+        abstract void close();
+    }
+
+    /**
+     * Sends messages over a DatagramChannel.
+     */
+    static class DatagramChannelSender extends ChannelSender {
+
+        final DatagramChannel channel;
+
+        DatagramChannelSender(final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException {
+            super(host, port, bufferPool, charset);
+            this.channel = DatagramChannel.open();
+            this.channel.connect(new InetSocketAddress(InetAddress.getByName(host), port));
+        }
+
+        @Override
+        public void write(ByteBuffer buffer) throws IOException {
+            while (buffer.hasRemaining()) {
+                channel.write(buffer);
+            }
+        }
+
+        @Override
+        boolean isConnected() {
+            return channel != null && channel.isConnected();
+        }
+
+        @Override
+        public void close() {
+            IOUtils.closeQuietly(channel);
+        }
+    }
+
+    /**
+     * Sends messages over a SocketChannel.
+     */
+    static class SocketChannelSender extends ChannelSender {
+
+        final SocketChannel channel;
+
+        SocketChannelSender(final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException {
+            super(host, port, bufferPool, charset);
+            this.channel = SocketChannel.open();
+            this.channel.connect(new InetSocketAddress(InetAddress.getByName(host), port));
+        }
+
+        @Override
+        public void write(ByteBuffer buffer) throws IOException {
+            while (buffer.hasRemaining()) {
+                channel.write(buffer);
+            }
+        }
+
+        @Override
+        boolean isConnected() {
+            return channel != null && channel.isConnected();
+        }
+
+        @Override
+        public void close() {
+            IOUtils.closeQuietly(channel);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9c542432/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java
new file mode 100644
index 0000000..3d06dbe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+/**
+ * Encapsulates the parsed information for a single Syslog event.
+ */
+public class SyslogEvent {
+
+    private final String priority;
+    private final String severity;
+    private final String facility;
+    private final String version;
+    private final String timeStamp;
+    private final String hostName;
+    private final String sender;
+    private final String msgBody;
+    private final String fullMessage;
+    private final byte[] rawMessage;
+    private final boolean valid;
+
+    private SyslogEvent(final Builder builder) {
+        this.priority = builder.priority;
+        this.severity = builder.severity;
+        this.facility = builder.facility;
+        this.version = builder.version;
+        this.timeStamp = builder.timeStamp;
+        this.hostName = builder.hostName;
+        this.sender = builder.sender;
+        this.msgBody = builder.msgBody;
+        this.fullMessage = builder.fullMessage;
+        this.rawMessage = builder.rawMessage;
+        this.valid = builder.valid;
+    }
+
+    public String getPriority() {
+        return priority;
+    }
+
+    public String getSeverity() {
+        return severity;
+    }
+
+    public String getFacility() {
+        return facility;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public String getTimeStamp() {
+        return timeStamp;
+    }
+
+    public String getHostName() {
+        return hostName;
+    }
+
+    public String getSender() {
+        return sender;
+    }
+
+    public String getMsgBody() {
+        return msgBody;
+    }
+
+    public String getFullMessage() {
+        return fullMessage;
+    }
+
+    public byte[] getRawMessage() {
+        return rawMessage;
+    }
+
+    public boolean isValid() {
+        return valid;
+    }
+
+    public static final class Builder {
+        private String priority;
+        private String severity;
+        private String facility;
+        private String version;
+        private String timeStamp;
+        private String hostName;
+        private String sender;
+        private String msgBody;
+        private String fullMessage;
+        private byte[] rawMessage;
+        private boolean valid;
+
+        public void reset() {
+            this.priority = null;
+            this.severity = null;
+            this.facility = null;
+            this.version = null;
+            this.timeStamp = null;
+            this.hostName = null;
+            this.sender = null;
+            this.msgBody = null;
+            this.fullMessage = null;
+            this.valid = false;
+        }
+
+        public Builder priority(String priority) {
+            this.priority = priority;
+            return this;
+        }
+
+        public Builder severity(String severity) {
+            this.severity = severity;
+            return this;
+        }
+
+        public Builder facility(String facility) {
+            this.facility = facility;
+            return this;
+        }
+
+        public Builder version(String version) {
+            this.version = version;
+            return this;
+        }
+
+        public Builder timestamp(String timestamp) {
+            this.timeStamp = timestamp;
+            return this;
+        }
+
+        public Builder hostname(String hostName) {
+            this.hostName = hostName;
+            return this;
+        }
+
+        public Builder sender(String sender) {
+            this.sender = sender;
+            return this;
+        }
+
+        public Builder msgBody(String msgBody) {
+            this.msgBody = msgBody;
+            return this;
+        }
+
+        public Builder fullMessage(String fullMessage) {
+            this.fullMessage = fullMessage;
+            return this;
+        }
+
+        public Builder rawMessage(byte[] rawMessage) {
+            this.rawMessage = rawMessage;
+            return this;
+        }
+
+        public Builder valid(boolean valid) {
+            this.valid = valid;
+            return this;
+        }
+
+        public SyslogEvent build() {
+            return new SyslogEvent(this);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9c542432/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java
new file mode 100644
index 0000000..fd59e5b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.MatchResult;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Parses a Syslog message from a ByteBuffer into a SyslogEvent instance.
+ *
+ * The Syslog regular expressions below were adapted from the Apache Flume project.
+ */
+public class SyslogParser {
+
+    public static final String SYSLOG_MSG_RFC5424_0 =
+            "(?:\\<(\\d{1,3})\\>)" + // priority
+                    "(?:(\\d)?\\s?)" + // version
+      /* yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null stamp) */
+                    "(?:" +
+                    "(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}" +
+                    "(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)" + // stamp
+                    "\\s" + // separator
+                    "(?:([\\w][\\w\\d\\.@\\-]*)|-)" + // host name or - (null)
+                    "\\s" + // separator
+                    "(.*)$"; // body
+
+    public static final String SYSLOG_MSG_RFC3164_0 =
+            "(?:\\<(\\d{1,3})\\>)" +
+                    "(?:(\\d)?\\s?)" + // version
+                    // stamp MMM d HH:mm:ss, single digit date has two spaces
+                    "([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" +
+                    "\\s" + // separator
+                    "([\\w][\\w\\d\\.@-]*)" + // host
+                    "\\s(.*)$";  // body
+
+    public static final Collection<Pattern> MESSAGE_PATTERNS;
+    static {
+        List<Pattern> patterns = new ArrayList<>();
+        patterns.add(Pattern.compile(SYSLOG_MSG_RFC5424_0));
+        patterns.add(Pattern.compile(SYSLOG_MSG_RFC3164_0));
+        MESSAGE_PATTERNS = Collections.unmodifiableList(patterns);
+    }
+
+    // capture group positions from the above message patterns
+    public static final int SYSLOG_PRIORITY_POS = 1;
+    public static final int SYSLOG_VERSION_POS = 2;
+    public static final int SYSLOG_TIMESTAMP_POS = 3;
+    public static final int SYSLOG_HOSTNAME_POS = 4;
+    public static final int SYSLOG_BODY_POS = 5;
+
+    private Charset charset;
+
+    public SyslogParser(final Charset charset) {
+        this.charset = charset;
+    }
+
+    /**
+     *  Parses a SyslogEvent from a byte buffer.
+     *
+     * @param buffer a byte buffer containing a syslog message
+     * @return a SyslogEvent parsed from the byte array
+     */
+    public SyslogEvent parseEvent(final ByteBuffer buffer) {
+        return parseEvent(buffer, null);
+    }
+
+    /**
+     *  Parses a SyslogEvent from a byte buffer.
+     *
+     * @param buffer a byte buffer containing a syslog message
+     * @param sender the hostname of the syslog server that sent the message
+     * @return a SyslogEvent parsed from the byte array
+     */
+    public SyslogEvent parseEvent(final ByteBuffer buffer, final String sender) {
+        if (buffer == null) {
+            return null;
+        }
+        if (buffer.position() != 0) {
+            buffer.flip();
+        }
+        byte bytes[] = new byte[buffer.limit()];
+        buffer.get(bytes, 0, buffer.limit());
+        return parseEvent(bytes, sender);
+    }
+
+    /**
+     * Parses a SyslogEvent from a byte array.
+     *
+     * @param bytes a byte array containing a syslog message
+     * @param sender the hostname of the syslog server that sent the message
+     * @return a SyslogEvent parsed from the byte array
+     */
+    public SyslogEvent parseEvent(final byte[] bytes, final String sender) {
+        if (bytes == null || bytes.length == 0) {
+            return null;
+        }
+
+        // remove trailing new line before parsing
+        int length = bytes.length;
+        if (bytes[length - 1] == '\n') {
+            length = length - 1;
+        }
+
+        final String message = new String(bytes, 0, length, charset);
+
+        final SyslogEvent.Builder builder = new SyslogEvent.Builder()
+                .valid(false).fullMessage(message).rawMessage(bytes).sender(sender);
+
+        for (Pattern pattern : MESSAGE_PATTERNS) {
+            final Matcher matcher = pattern.matcher(message);
+            if (!matcher.matches()) {
+                continue;
+            }
+
+            final MatchResult res = matcher.toMatchResult();
+            for (int grp = 1; grp <= res.groupCount(); grp++) {
+                String value = res.group(grp);
+                if (grp == SYSLOG_TIMESTAMP_POS) {
+                    builder.timestamp(value);
+                } else if (grp == SYSLOG_HOSTNAME_POS) {
+                    builder.hostname(value);
+                } else if (grp == SYSLOG_PRIORITY_POS) {
+                    int pri = Integer.parseInt(value);
+                    int sev = pri % 8;
+                    int facility = pri / 8;
+                    builder.priority(value);
+                    builder.severity(String.valueOf(sev));
+                    builder.facility(String.valueOf(facility));
+                } else if (grp == SYSLOG_VERSION_POS) {
+                    builder.version(value);
+                } else if (grp == SYSLOG_BODY_POS) {
+                    builder.msgBody(value);
+                }
+            }
+
+            builder.valid(true);
+            break;
+        }
+
+        // either invalid w/original msg, or fully parsed event
+        return builder.build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9c542432/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 8507c96..60379ed 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -44,6 +44,7 @@ org.apache.nifi.processors.standard.InvokeHTTP
 org.apache.nifi.processors.standard.GetJMSQueue
 org.apache.nifi.processors.standard.GetJMSTopic
 org.apache.nifi.processors.standard.ListenHTTP
+org.apache.nifi.processors.standard.ListenSyslog
 org.apache.nifi.processors.standard.ListenUDP
 org.apache.nifi.processors.standard.ListSFTP
 org.apache.nifi.processors.standard.LogAttribute
@@ -58,6 +59,7 @@ org.apache.nifi.processors.standard.PutFTP
 org.apache.nifi.processors.standard.PutJMS
 org.apache.nifi.processors.standard.PutSFTP
 org.apache.nifi.processors.standard.PutSQL
+org.apache.nifi.processors.standard.PutSyslog
 org.apache.nifi.processors.standard.ReplaceText
 org.apache.nifi.processors.standard.ReplaceTextWithMapping
 org.apache.nifi.processors.standard.RouteOnAttribute

http://git-wip-us.apache.org/repos/asf/nifi/blob/9c542432/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
new file mode 100644
index 0000000..0e0d972
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.io.nio.BufferPool;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.util.SyslogEvent;
+import org.apache.nifi.processors.standard.util.SyslogParser;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+public class TestListenSyslog {
+
+    static final Logger LOGGER = LoggerFactory.getLogger(TestListenSyslog.class);
+
+    static final String PRI = "34";
+    static final String SEV = "2";
+    static final String FAC = "4";
+    static final String TIME = "Oct 13 15:43:23";
+    static final String HOST = "localhost.home";
+    static final String BODY = "some message";
+
+    static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n";
+    static final String INVALID_MESSAGE = "this is not valid\n";
+
+    @Test
+    public void testUDP() throws IOException, InterruptedException {
+        final ListenSyslog proc = new ListenSyslog();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue());
+        runner.setProperty(ListenSyslog.PORT, "0");
+
+        // schedule to start listening on a random port
+        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+        final ProcessContext context = runner.getProcessContext();
+        proc.onScheduled(context);
+
+        final int numMessages = 20;
+        final int port = proc.getPort();
+        Assert.assertTrue(port > 0);
+
+        // write some UDP messages to the port in the background
+        final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE));
+        sender.setDaemon(true);
+        sender.start();
+
+        // call onTrigger until we read all datagrams, or 30 seconds passed
+        try {
+            int numTransfered = 0;
+            long timeout = System.currentTimeMillis() + 30000;
+
+            while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
+                Thread.sleep(10);
+                proc.onTrigger(context, processSessionFactory);
+                numTransfered = runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size();
+            }
+            Assert.assertEquals("Did not process all the datagrams", numMessages, numTransfered);
+
+            MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
+            checkFlowFile(flowFile);
+
+        } finally {
+            // unschedule to close connections
+            proc.onUnscheduled();
+        }
+    }
+
+    @Test
+    public void testTCPSingleConnection() throws IOException, InterruptedException {
+        final ListenSyslog proc = new ListenSyslog();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
+        runner.setProperty(ListenSyslog.PORT, "0");
+
+        // schedule to start listening on a random port
+        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+        final ProcessContext context = runner.getProcessContext();
+        proc.onScheduled(context);
+
+        final int numMessages = 20;
+        final int port = proc.getPort();
+        Assert.assertTrue(port > 0);
+
+        // write some TCP messages to the port in the background
+        final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE));
+        sender.setDaemon(true);
+        sender.start();
+
+        // call onTrigger until we read all messages, or 30 seconds passed
+        try {
+            int numTransfered = 0;
+            long timeout = System.currentTimeMillis() + 30000;
+
+            while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
+                Thread.sleep(10);
+                proc.onTrigger(context, processSessionFactory);
+                numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
+            }
+            Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
+
+            MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
+            checkFlowFile(flowFile);
+        } finally {
+            // unschedule to close connections
+            proc.onUnscheduled();
+        }
+    }
+
+    @Test
+    public void testTCPMultipleConnection() throws IOException, InterruptedException {
+        final ListenSyslog proc = new ListenSyslog();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
+        runner.setProperty(ListenSyslog.PORT, "0");
+
+        // schedule to start listening on a random port
+        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+        final ProcessContext context = runner.getProcessContext();
+        proc.onScheduled(context);
+
+        final int numMessages = 20;
+        final int port = proc.getPort();
+        Assert.assertTrue(port > 0);
+
+        // write some TCP messages to the port in the background
+        final Thread sender = new Thread(new MultiConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE));
+        sender.setDaemon(true);
+        sender.start();
+
+        // call onTrigger until we read all messages, or 30 seconds passed
+        try {
+            int numTransfered = 0;
+            long timeout = System.currentTimeMillis() + 30000;
+
+            while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
+                Thread.sleep(10);
+                proc.onTrigger(context, processSessionFactory);
+                numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
+            }
+            Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
+
+            MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
+            checkFlowFile(flowFile);
+        } finally {
+            // unschedule to close connections
+            proc.onUnscheduled();
+        }
+    }
+
+    @Test
+    public void testInvalid() throws IOException, InterruptedException {
+        final ListenSyslog proc = new ListenSyslog();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
+        runner.setProperty(ListenSyslog.PORT, "0");
+
+        // schedule to start listening on a random port
+        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+        final ProcessContext context = runner.getProcessContext();
+        proc.onScheduled(context);
+
+        final int numMessages = 10;
+        final int port = proc.getPort();
+        Assert.assertTrue(port > 0);
+
+        // write some TCP messages to the port in the background
+        final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 100, INVALID_MESSAGE));
+        sender.setDaemon(true);
+        sender.start();
+
+        // call onTrigger until we read all messages, or 30 seconds passed
+        try {
+            int numTransfered = 0;
+            long timeout = System.currentTimeMillis() + 30000;
+
+            while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
+                Thread.sleep(50);
+                proc.onTrigger(context, processSessionFactory);
+                numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size();
+            }
+            Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
+
+        } finally {
+            // unschedule to close connections
+            proc.onUnscheduled();
+        }
+    }
+
+    @Test
+    public void testErrorQueue() {
+        final SyslogEvent event1 = Mockito.mock(SyslogEvent.class);
+        Mockito.when(event1.getRawMessage()).thenThrow(new ProcessException("ERROR"));
+
+        final SyslogEvent event2 = new SyslogEvent.Builder()
+                .facility("fac").severity("sev")
+                .fullMessage("abc").hostname("host")
+                .msgBody("body").timestamp("123").valid(true)
+                .rawMessage("abc".getBytes(Charset.forName("UTF-8")))
+                .build();
+
+        final MockProcessor proc = new MockProcessor(Arrays.asList(event1, event2));
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(ListenSyslog.PORT, "12345");
+
+        // should keep re-processing event1 from the error queue
+        runner.run(3);
+        runner.assertTransferCount(ListenSyslog.REL_INVALID, 0);
+        runner.assertTransferCount(ListenSyslog.REL_SUCCESS, 0);
+    }
+
+
+    private void checkFlowFile(MockFlowFile flowFile) {
+        flowFile.assertContentEquals(VALID_MESSAGE);
+        Assert.assertEquals(PRI, flowFile.getAttribute(ListenSyslog.SyslogAttributes.PRIORITY.key()));
+        Assert.assertEquals(SEV, flowFile.getAttribute(ListenSyslog.SyslogAttributes.SEVERITY.key()));
+        Assert.assertEquals(FAC, flowFile.getAttribute(ListenSyslog.SyslogAttributes.FACILITY.key()));
+        Assert.assertEquals(TIME, flowFile.getAttribute(ListenSyslog.SyslogAttributes.TIMESTAMP.key()));
+        Assert.assertEquals(HOST, flowFile.getAttribute(ListenSyslog.SyslogAttributes.HOSTNAME.key()));
+        Assert.assertEquals(BODY, flowFile.getAttribute(ListenSyslog.SyslogAttributes.BODY.key()));
+        Assert.assertEquals("true", flowFile.getAttribute(ListenSyslog.SyslogAttributes.VALID.key()));
+    }
+
+    /**
+     * Sends a given number of datagrams to the given port.
+     */
+    public static final class DatagramSender implements Runnable {
+
+        final int port;
+        final int numMessages;
+        final long delay;
+        final String message;
+
+        public DatagramSender(int port, int numMessages, long delay, String message) {
+            this.port = port;
+            this.numMessages = numMessages;
+            this.delay = delay;
+            this.message = message;
+        }
+
+        @Override
+        public void run() {
+            byte[] bytes = message.getBytes(Charset.forName("UTF-8"));
+            final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+
+            try (DatagramChannel channel = DatagramChannel.open()) {
+                channel.connect(new InetSocketAddress("localhost", port));
+                for (int i=0; i < numMessages; i++) {
+                    buffer.clear();
+                    buffer.put(bytes);
+                    buffer.flip();
+
+                    while(buffer.hasRemaining()) {
+                        channel.write(buffer);
+                    }
+
+                    Thread.sleep(delay);
+                }
+            } catch (IOException e) {
+                LOGGER.error(e.getMessage(), e);
+            } catch (InterruptedException e) {
+                LOGGER.error(e.getMessage(), e);
+            }
+        }
+    }
+
+    /**
+     * Sends a given number of datagrams to the given port.
+     */
+    public static final class SingleConnectionSocketSender implements Runnable {
+
+        final int port;
+        final int numMessages;
+        final long delay;
+        final String message;
+
+        public SingleConnectionSocketSender(int port, int numMessages, long delay, String message) {
+            this.port = port;
+            this.numMessages = numMessages;
+            this.delay = delay;
+            this.message = message;
+        }
+
+        @Override
+        public void run() {
+            byte[] bytes = message.getBytes(Charset.forName("UTF-8"));
+            final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+
+            try (SocketChannel channel = SocketChannel.open()) {
+                channel.connect(new InetSocketAddress("localhost", port));
+
+                for (int i=0; i < numMessages; i++) {
+                    buffer.clear();
+                    buffer.put(bytes);
+                    buffer.flip();
+
+                    while (buffer.hasRemaining()) {
+                        channel.write(buffer);
+                    }
+                    Thread.sleep(delay);
+                }
+            } catch (IOException e) {
+                LOGGER.error(e.getMessage(), e);
+            } catch (InterruptedException e) {
+                LOGGER.error(e.getMessage(), e);
+            }
+        }
+    }
+
+    /**
+     * Sends a given number of datagrams to the given port.
+     */
+    public static final class MultiConnectionSocketSender implements Runnable {
+
+        final int port;
+        final int numMessages;
+        final long delay;
+        final String message;
+
+        public MultiConnectionSocketSender(int port, int numMessages, long delay, String message) {
+            this.port = port;
+            this.numMessages = numMessages;
+            this.delay = delay;
+            this.message = message;
+        }
+
+        @Override
+        public void run() {
+            byte[] bytes = message.getBytes(Charset.forName("UTF-8"));
+            final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+
+            for (int i=0; i < numMessages; i++) {
+                try (SocketChannel channel = SocketChannel.open()) {
+                    channel.connect(new InetSocketAddress("localhost", port));
+
+                    buffer.clear();
+                    buffer.put(bytes);
+                    buffer.flip();
+
+                    while (buffer.hasRemaining()) {
+                        channel.write(buffer);
+                    }
+                    Thread.sleep(delay);
+                } catch (IOException e) {
+                    LOGGER.error(e.getMessage(), e);
+                } catch (InterruptedException e) {
+                    LOGGER.error(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    // A mock version of ListenSyslog that will queue the provided events
+    private static class MockProcessor extends ListenSyslog {
+
+        private List<SyslogEvent> eventList;
+
+        public MockProcessor(List<SyslogEvent> eventList) {
+            this.eventList = eventList;
+        }
+
+        @Override
+        protected ChannelReader createChannelReader(String protocol, BufferPool bufferPool, SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents) throws IOException {
+            return new ChannelReader() {
+                @Override
+                public void open(int port, int maxBufferSize) throws IOException {
+
+                }
+
+                @Override
+                public int getPort() {
+                    return 0;
+                }
+
+                @Override
+                public void stop() {
+
+                }
+
+                @Override
+                public void close() {
+
+                }
+
+                @Override
+                public void run() {
+                    for (SyslogEvent event : eventList) {
+                        syslogEvents.offer(event);
+                    }
+                }
+            };
+        }
+    }
+
+}


[4/4] nifi git commit: NIFI-274 - Fixing TestListenSyslog, fixing default buffer size to be bytes, adding syslog.protocol to attributes - Adding syslog.port to ListenSyslog attributes, logging at warn level when rejecting tcp connections

Posted by tk...@apache.org.
NIFI-274 - Fixing TestListenSyslog, fixing default buffer size to be bytes, adding syslog.protocol to attributes
         - Adding syslog.port to ListenSyslog attributes, logging at warn level when rejecting tcp connections
         - Adding @InputRequirement to processors and adding appropriate send and receive provenance events


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/618f22e1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/618f22e1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/618f22e1

Branch: refs/heads/master
Commit: 618f22e110032df09d77a80587b553e6995038e2
Parents: 5611dac
Author: Bryan Bende <bb...@apache.org>
Authored: Wed Nov 4 09:01:52 2015 -0500
Committer: Tony Kurc <tr...@gmail.com>
Committed: Wed Nov 4 18:01:59 2015 -0500

----------------------------------------------------------------------
 .../standard/AbstractSyslogProcessor.java       |  4 +-
 .../nifi/processors/standard/ListenSyslog.java  | 36 +++++++++++----
 .../nifi/processors/standard/PutSyslog.java     | 15 +++++-
 .../processors/standard/TestListenSyslog.java   | 48 ++++++++++++++++++--
 .../nifi/processors/standard/TestPutSyslog.java | 18 ++++++++
 5 files changed, 104 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/618f22e1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
index f7d5eeb..fea01bd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
@@ -64,7 +64,9 @@ public abstract class AbstractSyslogProcessor extends AbstractProcessor {
         HOSTNAME("syslog.hostname"),
         SENDER("syslog.sender"),
         BODY("syslog.body"),
-        VALID("syslog.valid");
+        VALID("syslog.valid"),
+        PROTOCOL("syslog.protocol"),
+        PORT("syslog.pprt");
 
         private String key;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/618f22e1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 8012b88..1660e3a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.StandardSocketOptions;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.DatagramChannel;
 import java.nio.channels.SelectionKey;
@@ -45,6 +46,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -68,7 +70,7 @@ import org.apache.nifi.processors.standard.util.SyslogEvent;
 import org.apache.nifi.processors.standard.util.SyslogParser;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
 
-
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @Tags({"syslog", "listen", "udp", "tcp", "logs"})
 @CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " +
         "expressions for RFC5424 and RFC3164 formatted messages. The format of each message is: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) " +
@@ -88,6 +90,8 @@ import org.apache.nifi.stream.io.ByteArrayOutputStream;
                     @WritesAttribute(attribute="syslog.body", description="The body of the Syslog message, everything after the hostname."),
                     @WritesAttribute(attribute="syslog.valid", description="An indicator of whether this message matched the expected formats. " +
                             "If this value is false, the other attributes will be empty and only the original message will be available in the content."),
+                    @WritesAttribute(attribute="syslog.protocol", description="The protocol over which the Syslog message was received."),
+                    @WritesAttribute(attribute="syslog.port", description="The port over which the Syslog message was received."),
                     @WritesAttribute(attribute="mime.type", description="The mime.type of the FlowFile which will be text/plain for Syslog messages.")})
 public class ListenSyslog extends AbstractSyslogProcessor {
 
@@ -97,7 +101,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                     "incoming Syslog messages. When UDP is selected each buffer will hold one Syslog message. When TCP is selected messages are read " +
                     "from an incoming connection until the buffer is full, or the connection is closed. ")
             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .defaultValue("65507 KB")
+            .defaultValue("65507 B")
             .required(true)
             .build();
     public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder()
@@ -110,8 +114,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             .required(true)
             .build();
     public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
-            .name("Max number of TCP connections")
-            .description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
+            .name("Max Number of TCP Connections")
+            .description("The maximum number of concurrent connections to accept Syslog messages in TCP mode.")
             .addValidator(StandardValidators.createLongValidator(1, 65535, true))
             .defaultValue("2")
             .required(true)
@@ -142,8 +146,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         descriptors.add(PORT);
         descriptors.add(RECV_BUFFER_SIZE);
         descriptors.add(MAX_SOCKET_BUFFER_SIZE);
-        descriptors.add(CHARSET);
         descriptors.add(MAX_CONNECTIONS);
+        descriptors.add(CHARSET);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
         final Set<Relationship> relationships = new HashSet<>();
@@ -184,7 +188,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
         if (protocol.equals(UDP_VALUE.getValue())) {
             maxConnections = 1;
-        } else{
+        } else {
             maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
         }
 
@@ -240,6 +244,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         }
 
         final SyslogEvent event = initialEvent;
+        final String port = context.getProperty(PORT).getValue();
+        final String protocol = context.getProperty(PROTOCOL).getValue();
 
         final Map<String,String> attributes = new HashMap<>();
         attributes.put(SyslogAttributes.PRIORITY.key(), event.getPriority());
@@ -251,11 +257,16 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         attributes.put(SyslogAttributes.SENDER.key(), event.getSender());
         attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody());
         attributes.put(SyslogAttributes.VALID.key(), String.valueOf(event.isValid()));
+        attributes.put(SyslogAttributes.PROTOCOL.key(), protocol);
+        attributes.put(SyslogAttributes.PORT.key(), port);
         attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
 
         FlowFile flowFile = session.create();
         flowFile = session.putAllAttributes(flowFile, attributes);
 
+        final String transitUri = new StringBuilder().append(protocol).append("://").append(event.getSender())
+                .append(":").append(port).toString();
+
         try {
             // write the raw bytes of the message as the FlowFile content
             flowFile = session.write(flowFile, new OutputStreamCallback() {
@@ -268,6 +279,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             if (event.isValid()) {
                 getLogger().info("Transferring {} to success", new Object[]{flowFile});
                 session.transfer(flowFile, REL_SUCCESS);
+                session.getProvenanceReporter().receive(flowFile, transitUri);
             } else {
                 getLogger().info("Transferring {} to invalid", new Object[]{flowFile});
                 session.transfer(flowFile, REL_INVALID);
@@ -454,7 +466,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                                 // Check for available connections
                                 if (currentConnections.incrementAndGet() > maxConnections){
                                     currentConnections.decrementAndGet();
-                                    logger.info("Rejecting connection from {} because max connections has been met", new Object[]{ socketChannel.getRemoteAddress().toString() });
+                                    logger.warn("Rejecting connection from {} because max connections has been met",
+                                            new Object[]{ socketChannel.getRemoteAddress().toString() });
                                     IOUtils.closeQuietly(socketChannel);
                                     continue;
                                 }
@@ -494,8 +507,11 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         public int getPort() {
             // Return the port for the key listening for accepts
             for(SelectionKey key : selector.keys()){
-                if (key.isValid() && key.isAcceptable()) {
-                    return ((SocketChannel)key.channel()).socket().getLocalPort();
+                if (key.isValid()) {
+                    final Channel channel = key.channel();
+                    if (channel instanceof  ServerSocketChannel) {
+                        return ((ServerSocketChannel)channel).socket().getLocalPort();
+                    }
                 }
             }
             return 0;
@@ -619,7 +635,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                 eof = true;
             } catch (IOException e) {
                 logger.error("Error reading from channel", e);
-             // Treat same as closed socket
+                // Treat same as closed socket
                 eof = true;
             } finally {
                 if(eof == true) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/618f22e1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
index 5e558ca..b7e7a9c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -33,6 +34,7 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.util.SyslogParser;
 import org.apache.nifi.util.ObjectHolder;
+import org.apache.nifi.util.StopWatch;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -52,6 +54,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @TriggerWhenEmpty
 @Tags({"syslog", "put", "udp", "tcp", "logs"})
 @CapabilityDescription("Sends Syslog messages to a given host and port over TCP or UDP. Messages are constructed from the \"Message ___\" properties of the processor " +
@@ -59,7 +62,7 @@ import java.util.regex.Pattern;
         "(<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) where version is optional.  The constructed messages are checked against regular expressions for " +
         "RFC5424 and RFC3164 formatted messages. The timestamp can be an RFC5424 timestamp with a format of \"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", " +
         "or it can be an RFC3164 timestamp with a format of \"MMM d HH:mm:ss\". If a message is constructed that does not form a valid Syslog message according to the " +
-        "above description, then it is routed to the invalid relationship. Valid messages are pushed to Syslog with successes routed to the success relationship, and " +
+        "above description, then it is routed to the invalid relationship. Valid messages are sent to the Syslog server and successes are routed to the success relationship, " +
         "failures routed to the failure relationship.")
 public class PutSyslog extends AbstractSyslogProcessor {
 
@@ -277,9 +280,14 @@ public class PutSyslog extends AbstractSyslogProcessor {
             }
         }
 
+        final String port = context.getProperty(PORT).getValue();
+        final String host = context.getProperty(HOSTNAME).getValue();
+        final String transitUri = new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
         final ObjectHolder<IOException> exceptionHolder = new ObjectHolder<>(null);
+
         try {
             for (FlowFile flowFile : flowFiles) {
+                final StopWatch timer = new StopWatch(true);
                 final String priority = context.getProperty(MSG_PRIORITY).evaluateAttributeExpressions(flowFile).getValue();
                 final String version = context.getProperty(MSG_VERSION).evaluateAttributeExpressions(flowFile).getValue();
                 final String timestamp = context.getProperty(MSG_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
@@ -304,6 +312,11 @@ public class PutSyslog extends AbstractSyslogProcessor {
                         }
 
                         sender.send(messageBuilder.toString());
+                        timer.stop();
+
+                        final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
+                        session.getProvenanceReporter().send(flowFile, transitUri, duration, true);
+
                         getLogger().info("Transferring {} to success", new Object[]{flowFile});
                         session.transfer(flowFile, REL_SUCCESS);
                     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/618f22e1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index eb71f88..f0eb345 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -16,12 +16,15 @@
  */
 package org.apache.nifi.processors.standard;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.io.nio.BufferPool;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.standard.util.SyslogEvent;
 import org.apache.nifi.processors.standard.util.SyslogParser;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -89,7 +92,16 @@ public class TestListenSyslog {
             Assert.assertEquals("Did not process all the datagrams", numMessages, numTransfered);
 
             MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
-            checkFlowFile(flowFile);
+            checkFlowFile(flowFile, 0, ListenSyslog.UDP_VALUE.getValue());
+
+            final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
+            Assert.assertNotNull(events);
+            Assert.assertEquals(numMessages, events.size());
+
+            final ProvenanceEventRecord event = events.get(0);
+            Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
+            Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue() + "://" + flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
+                    event.getTransitUri());
 
         } finally {
             // unschedule to close connections
@@ -131,7 +143,17 @@ public class TestListenSyslog {
             Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
 
             MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
-            checkFlowFile(flowFile);
+            checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
+
+            final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
+            Assert.assertNotNull(events);
+            Assert.assertEquals(numMessages, events.size());
+
+            final ProvenanceEventRecord event = events.get(0);
+            Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
+            Assert.assertEquals(ListenSyslog.TCP_VALUE.getValue() + "://" + flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
+                    event.getTransitUri());
+
         } finally {
             // unschedule to close connections
             proc.onUnscheduled();
@@ -143,6 +165,7 @@ public class TestListenSyslog {
         final ListenSyslog proc = new ListenSyslog();
         final TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
+        runner.setProperty(ListenSyslog.MAX_CONNECTIONS, "5");
         runner.setProperty(ListenSyslog.PORT, "0");
 
         // schedule to start listening on a random port
@@ -172,7 +195,17 @@ public class TestListenSyslog {
             Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
 
             MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
-            checkFlowFile(flowFile);
+            checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
+
+            final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
+            Assert.assertNotNull(events);
+            Assert.assertEquals(numMessages, events.size());
+
+            final ProvenanceEventRecord event = events.get(0);
+            Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
+            Assert.assertEquals(ListenSyslog.TCP_VALUE.getValue() + "://" + flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
+                    event.getTransitUri());
+
         } finally {
             // unschedule to close connections
             proc.onUnscheduled();
@@ -210,6 +243,8 @@ public class TestListenSyslog {
                 proc.onTrigger(context, processSessionFactory);
                 numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size();
             }
+
+            // all messages should be transferred to invalid
             Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
 
         } finally {
@@ -241,7 +276,7 @@ public class TestListenSyslog {
     }
 
 
-    private void checkFlowFile(MockFlowFile flowFile) {
+    private void checkFlowFile(final MockFlowFile flowFile, final int port, final String protocol) {
         flowFile.assertContentEquals(VALID_MESSAGE);
         Assert.assertEquals(PRI, flowFile.getAttribute(ListenSyslog.SyslogAttributes.PRIORITY.key()));
         Assert.assertEquals(SEV, flowFile.getAttribute(ListenSyslog.SyslogAttributes.SEVERITY.key()));
@@ -250,6 +285,9 @@ public class TestListenSyslog {
         Assert.assertEquals(HOST, flowFile.getAttribute(ListenSyslog.SyslogAttributes.HOSTNAME.key()));
         Assert.assertEquals(BODY, flowFile.getAttribute(ListenSyslog.SyslogAttributes.BODY.key()));
         Assert.assertEquals("true", flowFile.getAttribute(ListenSyslog.SyslogAttributes.VALID.key()));
+        Assert.assertEquals(String.valueOf(port), flowFile.getAttribute(ListenSyslog.SyslogAttributes.PORT.key()));
+        Assert.assertEquals(protocol, flowFile.getAttribute(ListenSyslog.SyslogAttributes.PROTOCOL.key()));
+        Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key())));
     }
 
     /**
@@ -392,7 +430,7 @@ public class TestListenSyslog {
 
         @Override
         protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser,
-                final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) {
+                                                    final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) {
             return new ChannelReader() {
                 @Override
                 public void open(int port, int maxBufferSize) throws IOException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/618f22e1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
index 40a9123..eb0d3f4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
@@ -17,6 +17,8 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.nifi.processor.Processor;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
@@ -70,6 +72,14 @@ public class TestPutSyslog {
         runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
         Assert.assertEquals(1, sender.messages.size());
         Assert.assertEquals(expectedMessage, sender.messages.get(0));
+
+        final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
+        Assert.assertNotNull(events);
+        Assert.assertEquals(1, events.size());
+
+        final ProvenanceEventRecord event = events.get(0);
+        Assert.assertEquals(ProvenanceEventType.SEND, event.getEventType());
+        Assert.assertEquals("UDP://localhost:12345", event.getTransitUri());
     }
 
     @Test
@@ -95,6 +105,14 @@ public class TestPutSyslog {
         runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
         Assert.assertEquals(1, sender.messages.size());
         Assert.assertEquals(expectedMessage, sender.messages.get(0).replace("\n", ""));
+
+        final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
+        Assert.assertNotNull(events);
+        Assert.assertEquals(1, events.size());
+
+        final ProvenanceEventRecord event = events.get(0);
+        Assert.assertEquals(ProvenanceEventType.SEND, event.getEventType());
+        Assert.assertEquals("TCP://localhost:12345", event.getTransitUri());
     }
 
     @Test