You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/22 18:04:29 UTC

[40/79] [abbrv] [partial] incubator-nifi git commit: NIFI-270 Made all changes identified by adam, mark, joey to prep for a cleaner build

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
deleted file mode 100644
index 2ae2c07..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio;
-
-import java.io.IOException;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.UUID;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * @author none
- */
-public final class ChannelDispatcher implements Runnable {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(ChannelDispatcher.class);
-    private final Selector serverSocketSelector;
-    private final Selector socketChannelSelector;
-    private final ScheduledExecutorService executor;
-    private final BufferPool emptyBuffers;
-    private final StreamConsumerFactory factory;
-    private final AtomicLong channelReaderFrequencyMilliseconds = new AtomicLong(DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS);
-    private final long timeout;
-    private volatile boolean stop = false;
-    public static final long DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS = 100L;
-
-    public ChannelDispatcher(final Selector serverSocketSelector, final Selector socketChannelSelector, final ScheduledExecutorService service,
-            final StreamConsumerFactory factory, final BufferPool buffers, final long timeout, final TimeUnit unit) {
-        this.serverSocketSelector = serverSocketSelector;
-        this.socketChannelSelector = socketChannelSelector;
-        this.executor = service;
-        this.factory = factory;
-        emptyBuffers = buffers;
-        this.timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
-    }
-
-    public void setChannelReaderFrequency(final long period, final TimeUnit timeUnit) {
-        channelReaderFrequencyMilliseconds.set(TimeUnit.MILLISECONDS.convert(period, timeUnit));
-    }
-
-    @Override
-    public void run() {
-        while (!stop) {
-            try {
-                selectServerSocketKeys();
-                selectSocketChannelKeys();
-            } catch (final Exception ex) {
-                LOGGER.warn("Key selection failed: {} Normal during shutdown.", new Object[]{ex});
-            }
-        }
-    }
-
-    /*
-     * When serverSocketsChannels are registered with the selector, want each invoke of this method to loop through all
-     * channels' keys.
-     * 
-     * @throws IOException
-     */
-    private void selectServerSocketKeys() throws IOException {
-        int numSelected = serverSocketSelector.select(timeout);
-        if (numSelected == 0) {
-            return;
-        }
-
-        // for each registered server socket - see if any connections are waiting to be established
-        final Iterator<SelectionKey> itr = serverSocketSelector.selectedKeys().iterator();
-        while (itr.hasNext()) {
-            SelectionKey serverSocketkey = itr.next();
-            final SelectableChannel channel = serverSocketkey.channel();
-            AbstractChannelReader reader = null;
-            if (serverSocketkey.isValid() && serverSocketkey.isAcceptable()) {
-                final ServerSocketChannel ssChannel = (ServerSocketChannel) serverSocketkey.channel();
-                final SocketChannel sChannel = ssChannel.accept();
-                if (sChannel != null) {
-                    sChannel.configureBlocking(false);
-                    final SelectionKey socketChannelKey = sChannel.register(socketChannelSelector, SelectionKey.OP_READ);
-                    final String readerId = sChannel.socket().toString();
-                    reader = new SocketChannelReader(readerId, socketChannelKey, emptyBuffers, factory);
-                    final ScheduledFuture<?> readerFuture = executor.scheduleWithFixedDelay(reader, 10L,
-                            channelReaderFrequencyMilliseconds.get(), TimeUnit.MILLISECONDS);
-                    reader.setScheduledFuture(readerFuture);
-                    socketChannelKey.attach(reader);
-                }
-            }
-            itr.remove(); // do this so that the next select operation returns a positive value; otherwise, it will return 0.
-            if (reader != null && LOGGER.isDebugEnabled()) {
-                LOGGER.debug(this + " New Connection established.  Server channel: " + channel + " Reader: " + reader);
-            }
-        }
-    }
-
-    /*
-     * When invoking this method, only want to iterate through the selected keys once. When a key is entered into the selectors
-     * selected key set, select will return a positive value. The next select will return 0 if nothing has changed. Note that
-     * the selected key set is not manually changed via a remove operation.
-     * 
-     * @throws IOException
-     */
-    private void selectSocketChannelKeys() throws IOException {
-        // once a channel associated with a key in this selector is 'ready', it causes this select to immediately return.
-        // thus, for each trip through the run() we only get hit with one real timeout...the one in selectServerSocketKeys.
-        int numSelected = socketChannelSelector.select(timeout);
-        if (numSelected == 0) {
-            return;
-        }
-
-        for (SelectionKey socketChannelKey : socketChannelSelector.selectedKeys()) {
-            final SelectableChannel channel = socketChannelKey.channel();
-            AbstractChannelReader reader = null;
-            // there are 2 kinds of channels in this selector, both which have their own readers and are executed in their own
-            // threads. We will get here whenever a new SocketChannel is created due to an incoming connection. However,
-            // for a DatagramChannel we don't want to create a new reader unless it is a new DatagramChannel. The only
-            // way to tell if it's new is the lack of an attachment. 
-            if (channel instanceof DatagramChannel && socketChannelKey.attachment() == null) {
-                reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory);
-                socketChannelKey.attach(reader);
-                final ScheduledFuture<?> readerFuture = executor.scheduleWithFixedDelay(reader, 10L, channelReaderFrequencyMilliseconds.get(),
-                        TimeUnit.MILLISECONDS);
-                reader.setScheduledFuture(readerFuture);
-            }
-            if (reader != null && LOGGER.isDebugEnabled()) {
-                LOGGER.debug(this + " New Connection established.  Server channel: " + channel + " Reader: " + reader);
-            }
-        }
-
-    }
-
-    public void stop() {
-        stop = true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
deleted file mode 100644
index b0a1cfb..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.StandardSocketOptions;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
-
-import org.apache.commons.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class provides the entry point to NIO based socket listeners for NiFi
- * processors and services. There are 2 supported types of Listeners, Datagram
- * (UDP based transmissions) and ServerSocket (TCP based transmissions). This
- * will create the ChannelDispatcher, which is a Runnable and is controlled via
- * the ScheduledExecutorService, which is also created by this class. The
- * ChannelDispatcher handles connections to the ServerSocketChannels and creates
- * the readers associated with the resulting SocketChannels. Additionally, this
- * creates and manages two Selectors, one for ServerSocketChannels and another
- * for SocketChannels and DatagramChannels.
- *
- * The threading model for this consists of one thread for the
- * ChannelDispatcher, one thread per added SocketChannel reader, one thread per
- * added DatagramChannel reader. The ChannelDispatcher is not scheduled with
- * fixed delay as the others are. It is throttled by the provided timeout value.
- * Within the ChannelDispatcher there are two blocking operations which will
- * block for the given timeout each time through the enclosing loop.
- *
- * All channels are cached in one of the two Selectors via their SelectionKey.
- * The serverSocketSelector maintains all the added ServerSocketChannels; the
- * socketChannelSelector maintains the all the add DatagramChannels and the
- * created SocketChannels. Further, the SelectionKey of the DatagramChannel and
- * the SocketChannel is injected with the channel's associated reader.
- *
- * All ChannelReaders will get throttled by the unavailability of buffers in the
- * provided BufferPool. This is designed to create back pressure.
- *
- * @author none
- */
-public final class ChannelListener {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(ChannelListener.class);
-    private final ScheduledExecutorService executor;
-    private final Selector serverSocketSelector; // used to listen for new connections
-    private final Selector socketChannelSelector; // used to listen on existing connections
-    private final ChannelDispatcher channelDispatcher;
-    private final BufferPool bufferPool;
-    private final int initialBufferPoolSize;
-    private volatile long channelReaderFrequencyMSecs = 50;
-
-    public ChannelListener(final int threadPoolSize, final StreamConsumerFactory consumerFactory, final BufferPool bufferPool, int timeout,
-            TimeUnit unit) throws IOException {
-        this.executor = Executors.newScheduledThreadPool(threadPoolSize + 1); // need to allow for long running ChannelDispatcher thread
-        this.serverSocketSelector = Selector.open();
-        this.socketChannelSelector = Selector.open();
-        this.bufferPool = bufferPool;
-        this.initialBufferPoolSize = bufferPool.size();
-        channelDispatcher = new ChannelDispatcher(serverSocketSelector, socketChannelSelector, executor, consumerFactory, bufferPool,
-                timeout, unit);
-        executor.schedule(channelDispatcher, 50, TimeUnit.MILLISECONDS);
-    }
-
-    public void setChannelReaderSchedulingPeriod(final long period, final TimeUnit unit) {
-        channelReaderFrequencyMSecs = TimeUnit.MILLISECONDS.convert(period, unit);
-        channelDispatcher.setChannelReaderFrequency(period, unit);
-    }
-
-    /**
-     * Adds a server socket channel for listening to connections.
-     *
-     * @param nicIPAddress - if null binds to wildcard address
-     * @param port - port to bind to
-     * @param receiveBufferSize - size of OS receive buffer to request. If less
-     * than 0 then will not be set and OS default will win.
-     * @throws IOException
-     */
-    public void addServerSocket(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
-            throws IOException {
-        final ServerSocketChannel ssChannel = ServerSocketChannel.open();
-        ssChannel.configureBlocking(false);
-        if (receiveBufferSize > 0) {
-            ssChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
-            final int actualReceiveBufSize = ssChannel.getOption(StandardSocketOptions.SO_RCVBUF);
-            if (actualReceiveBufSize < receiveBufferSize) {
-                LOGGER.warn(this + " attempted to set TCP Receive Buffer Size to "
-                        + receiveBufferSize + " bytes but could only set to " + actualReceiveBufSize
-                        + "bytes. You may want to consider changing the Operating System's "
-                        + "maximum receive buffer");
-            }
-        }
-        ssChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
-        ssChannel.bind(new InetSocketAddress(nicIPAddress, port));
-        ssChannel.register(serverSocketSelector, SelectionKey.OP_ACCEPT);
-    }
-
-    /**
-     * Binds to listen for data grams on the given local IPAddress/port
-     *
-     * @param nicIPAddress - if null will listen on wildcard address, which
-     * means datagrams will be received on all local network interfaces.
-     * Otherwise, will bind to the provided IP address associated with some NIC.
-     * @param port - the port to listen on
-     * @param receiveBufferSize - the number of bytes to request for a receive
-     * buffer from OS
-     * @throws IOException
-     */
-    public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
-            throws IOException {
-        final DatagramChannel dChannel = createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize);
-        dChannel.register(socketChannelSelector, SelectionKey.OP_READ);
-    }
-
-    /**
-     * Binds to listen for data grams on the given local IPAddress/port and
-     * restricts receipt of datagrams to those from the provided host and port,
-     * must specify both. This improves performance for datagrams coming from a
-     * sender that is known a-priori.
-     *
-     * @param nicIPAddress - if null will listen on wildcard address, which
-     * means datagrams will be received on all local network interfaces.
-     * Otherwise, will bind to the provided IP address associated with some NIC.
-     * @param port - the port to listen on. This is used to provide a well-known
-     * destination for a sender.
-     * @param receiveBufferSize - the number of bytes to request for a receive
-     * buffer from OS
-     * @param sendingHost - the hostname, or IP address, of the sender of
-     * datagrams. Only datagrams from this host will be received. If this is
-     * null the wildcard ip is used, which means datagrams may be received from
-     * any network interface on the local host.
-     * @param sendingPort - the port used by the sender of datagrams. Only
-     * datagrams from this port will be received.
-     * @throws IOException
-     */
-    public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize, final String sendingHost,
-            final Integer sendingPort) throws IOException {
-
-        if (sendingHost == null || sendingPort == null) {
-            addDatagramChannel(nicIPAddress, port, receiveBufferSize);
-            return;
-        }
-        final DatagramChannel dChannel = createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize);
-        dChannel.connect(new InetSocketAddress(sendingHost, sendingPort));
-        dChannel.register(socketChannelSelector, SelectionKey.OP_READ);
-    }
-
-    private DatagramChannel createAndBindDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
-            throws IOException {
-        final DatagramChannel dChannel = DatagramChannel.open();
-        dChannel.configureBlocking(false);
-        if (receiveBufferSize > 0) {
-            dChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
-            final int actualReceiveBufSize = dChannel.getOption(StandardSocketOptions.SO_RCVBUF);
-            if (actualReceiveBufSize < receiveBufferSize) {
-                LOGGER.warn(this + " attempted to set UDP Receive Buffer Size to "
-                        + receiveBufferSize + " bytes but could only set to " + actualReceiveBufSize
-                        + "bytes. You may want to consider changing the Operating System's "
-                        + "maximum receive buffer");
-            }
-        }
-        dChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
-        dChannel.bind(new InetSocketAddress(nicIPAddress, port));
-        return dChannel;
-    }
-
-    public void shutdown(final long period, final TimeUnit timeUnit) {
-        channelDispatcher.stop();
-        for (SelectionKey selectionKey : socketChannelSelector.keys()) {
-            final AbstractChannelReader reader = (AbstractChannelReader) selectionKey.attachment();
-            selectionKey.cancel();
-            if (reader != null) {
-                while (!reader.isClosed()) {
-                    try {
-                        Thread.sleep(channelReaderFrequencyMSecs);
-                    } catch (InterruptedException e) {
-                    }
-                }
-                final ScheduledFuture<?> readerFuture = reader.getScheduledFuture();
-                readerFuture.cancel(false);
-            }
-            IOUtils.closeQuietly(selectionKey.channel()); // should already be closed via reader, but if reader did not exist...
-        }
-        IOUtils.closeQuietly(socketChannelSelector);
-
-        for (SelectionKey selectionKey : serverSocketSelector.keys()) {
-            selectionKey.cancel();
-            IOUtils.closeQuietly(selectionKey.channel());
-        }
-        IOUtils.closeQuietly(serverSocketSelector);
-        executor.shutdown();
-        try {
-            executor.awaitTermination(period, timeUnit);
-        } catch (final InterruptedException ex) {
-            LOGGER.warn("Interrupted while trying to shutdown executor");
-        }
-        final int currentBufferPoolSize = bufferPool.size();
-        final String warning = (currentBufferPoolSize != initialBufferPoolSize) ? "Initial buffer count=" + initialBufferPoolSize
-                + " Current buffer count=" + currentBufferPoolSize
-                + " Could indicate a buffer leak.  Ensure all consumers are executed until they complete." : "";
-        LOGGER.info("Channel listener shutdown. " + warning);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
deleted file mode 100644
index 1eb5c7e..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.SelectionKey;
-
-import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
-
-/**
- *
- * @author none
- */
-public final class DatagramChannelReader extends AbstractChannelReader {
-
-    public static final int MAX_UDP_PACKET_SIZE = 65507;
-
-    public DatagramChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) {
-        super(id, key, empties, consumerFactory);
-    }
-
-    /**
-     * Will receive UDP data from channel and won't receive anything unless the
-     * given buffer has enough space for at least one full max udp packet.
-     *
-     * @param key
-     * @param buffer
-     * @return
-     * @throws IOException
-     */
-    @Override
-    protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException {
-        final DatagramChannel dChannel = (DatagramChannel) key.channel();
-        final int initialBufferPosition = buffer.position();
-        while (buffer.remaining() > MAX_UDP_PACKET_SIZE && key.isValid() && key.isReadable()) {
-            if (dChannel.receive(buffer) == null) {
-                break;
-            }
-        }
-        return buffer.position() - initialBufferPosition;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
deleted file mode 100644
index db2c102..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
-
-/**
- *
- * @author none
- */
-public final class SocketChannelReader extends AbstractChannelReader {
-
-    public SocketChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) {
-        super(id, key, empties, consumerFactory);
-    }
-
-    /**
-     * Receives TCP data from the socket channel for the given key.
-     *
-     * @param key
-     * @param buffer
-     * @return
-     * @throws IOException
-     */
-    @Override
-    protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException {
-        int bytesRead = 0;
-        final SocketChannel sChannel = (SocketChannel) key.channel();
-        while (key.isValid() && key.isReadable()) {
-            bytesRead = sChannel.read(buffer);
-            if (bytesRead <= 0) {
-                break;
-            }
-        }
-        return bytesRead;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java
deleted file mode 100644
index fce59c6..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio.consumer;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.nifi.io.nio.BufferPool;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
-
-/**
- *
- * @author none
- */
-public abstract class AbstractStreamConsumer implements StreamConsumer {
-
-    private final String uniqueId;
-    private BufferPool bufferPool = null;
-    private final BlockingQueue<ByteBuffer> filledBuffers = new LinkedBlockingQueue<>();
-    private final AtomicBoolean streamEnded = new AtomicBoolean(false);
-    private final AtomicBoolean consumerEnded = new AtomicBoolean(false);
-
-    public AbstractStreamConsumer(final String id) {
-        uniqueId = id;
-    }
-
-    @Override
-    public final void setReturnBufferQueue(final BufferPool returnQueue) {
-        bufferPool = returnQueue;
-    }
-
-    @Override
-    public final void addFilledBuffer(final ByteBuffer buffer) {
-        if (isConsumerFinished()) {
-            buffer.clear();
-            bufferPool.returnBuffer(buffer, buffer.remaining());
-        } else {
-            filledBuffers.add(buffer);
-        }
-    }
-
-    @Override
-    public final void process() throws IOException {
-        if (isConsumerFinished()) {
-            return;
-        }
-        if (streamEnded.get() && filledBuffers.isEmpty()) {
-            consumerEnded.set(true);
-            onConsumerDone();
-            return;
-        }
-        final ByteBuffer buffer = filledBuffers.poll();
-        if (buffer != null) {
-            final int bytesToProcess = buffer.remaining();
-            try {
-                processBuffer(buffer);
-            } finally {
-                buffer.clear();
-                bufferPool.returnBuffer(buffer, bytesToProcess);
-            }
-        }
-    }
-
-    protected abstract void processBuffer(ByteBuffer buffer) throws IOException;
-
-    @Override
-    public final void signalEndOfStream() {
-        streamEnded.set(true);
-    }
-
-    /**
-     * Convenience method that is called when the consumer is done processing
-     * based on being told the signal is end of stream and has processed all
-     * given buffers.
-     */
-    protected void onConsumerDone() {
-    }
-
-    @Override
-    public final boolean isConsumerFinished() {
-        return consumerEnded.get();
-    }
-
-    @Override
-    public final String getId() {
-        return uniqueId;
-    }
-
-    @Override
-    public final boolean equals(final Object obj) {
-        if (obj == null) {
-            return false;
-        }
-        if (obj == this) {
-            return true;
-        }
-        if (obj.getClass() != getClass()) {
-            return false;
-        }
-        AbstractStreamConsumer rhs = (AbstractStreamConsumer) obj;
-        return new EqualsBuilder().appendSuper(super.equals(obj)).append(uniqueId, rhs.uniqueId).isEquals();
-    }
-
-    @Override
-    public final int hashCode() {
-        return new HashCodeBuilder(19, 23).append(uniqueId).toHashCode();
-    }
-
-    @Override
-    public final String toString() {
-        return new ToStringBuilder(this, ToStringStyle.NO_FIELD_NAMES_STYLE).append(uniqueId).toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
deleted file mode 100644
index d75b7d7..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio.consumer;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.apache.nifi.io.nio.BufferPool;
-
-/**
- * A StreamConsumer must be thread safe. It may be accessed concurrently by a
- * thread providing data to process and another thread that is processing that
- * data.
- *
- * @author none
- */
-public interface StreamConsumer {
-
-    /**
-     * Will be called once just after construction. It provides the queue to
-     * which processed and emptied and cleared buffers must be returned. For
-     * each time <code>addFilledBuffer</code> is called there should be an
-     * associated add to this given queue. If not, buffers will run out and all
-     * stream processing will halt. READ THIS!!!
-     *
-     * @param returnQueue
-     */
-    void setReturnBufferQueue(BufferPool returnQueue);
-
-    /**
-     * Will be called by the thread that produces byte buffers with available
-     * data to be processed. If the consumer is finished this should simply
-     * return the given buffer to the return buffer queue (after it is cleared)
-     *
-     * @param buffer
-     */
-    void addFilledBuffer(ByteBuffer buffer);
-
-    /**
-     * Will be called by the thread that executes the consumption of data. May
-     * be called many times though once <code>isConsumerFinished</code> returns
-     * true this method will likely do nothing.
-     * @throws java.io.IOException
-     */
-    void process() throws IOException;
-
-    /**
-     * Called once the end of the input stream is detected
-     */
-    void signalEndOfStream();
-
-    /**
-     * If true signals the consumer is done consuming data and will not process
-     * any more buffers.
-     *
-     * @return
-     */
-    boolean isConsumerFinished();
-
-    /**
-     * Uniquely identifies the consumer
-     *
-     * @return
-     */
-    String getId();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java
deleted file mode 100644
index df298d5..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio.consumer;
-
-/**
- *
- * @author none
- */
-public interface StreamConsumerFactory {
-
-    StreamConsumer newInstance(String streamId);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
deleted file mode 100644
index 7ed5ad4..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.security.KeyManagementException;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-
-import org.apache.nifi.util.NiFiProperties;
-
-public class SSLContextFactory {
-
-    private final String keystore;
-    private final char[] keystorePass;
-    private final String keystoreType;
-    private final String truststore;
-    private final char[] truststorePass;
-    private final String truststoreType;
-
-    private final KeyManager[] keyManagers;
-    private final TrustManager[] trustManagers;
-
-    public SSLContextFactory(final NiFiProperties properties) throws NoSuchAlgorithmException, CertificateException, FileNotFoundException, IOException, KeyStoreException, UnrecoverableKeyException {
-        keystore = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE);
-        keystorePass = getPass(properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD));
-        keystoreType = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_TYPE);
-
-        truststore = properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE);
-        truststorePass = getPass(properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD));
-        truststoreType = properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_TYPE);
-
-        // prepare the keystore
-        final KeyStore keyStore = KeyStore.getInstance(keystoreType);
-        keyStore.load(new FileInputStream(keystore), keystorePass);
-        final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
-        keyManagerFactory.init(keyStore, keystorePass);
-
-        // prepare the truststore
-        final KeyStore trustStore = KeyStore.getInstance(truststoreType);
-        trustStore.load(new FileInputStream(truststore), truststorePass);
-        final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
-        trustManagerFactory.init(trustStore);
-
-        keyManagers = keyManagerFactory.getKeyManagers();
-        trustManagers = trustManagerFactory.getTrustManagers();
-    }
-
-    private static char[] getPass(final String password) {
-        return password == null ? null : password.toCharArray();
-    }
-
-    /**
-     * Creates a SSLContext instance using the given information.
-     *
-     *
-     * @return a SSLContext instance
-     * @throws java.security.KeyStoreException
-     * @throws java.io.IOException
-     * @throws java.security.NoSuchAlgorithmException
-     * @throws java.security.cert.CertificateException
-     * @throws java.security.UnrecoverableKeyException
-     * @throws java.security.KeyManagementException
-     */
-    public SSLContext createSslContext() throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException,
-            UnrecoverableKeyException, KeyManagementException {
-
-        // initialize the ssl context
-        final SSLContext sslContext = SSLContext.getInstance("TLS");
-        sslContext.init(keyManagers, trustManagers, new SecureRandom());
-        sslContext.getDefaultSSLParameters().setNeedClientAuth(true);
-
-        return sslContext;
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java
deleted file mode 100644
index fc279fb..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-
-import javax.net.ssl.SSLContext;
-
-/**
- * @author unattributed
- */
-public final class ServerSocketConfiguration {
-
-    private boolean needClientAuth;
-    private Integer socketTimeout;
-    private Boolean reuseAddress;
-    private Integer receiveBufferSize;
-    private SSLContextFactory sslContextFactory;
-
-    public ServerSocketConfiguration() {
-    }
-
-    public SSLContext createSSLContext() throws KeyManagementException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException, FileNotFoundException, IOException {
-        return sslContextFactory == null ? null : sslContextFactory.createSslContext();
-    }
-
-    public void setSSLContextFactory(final SSLContextFactory sslContextFactory) {
-        this.sslContextFactory = sslContextFactory;
-    }
-
-    public Integer getSocketTimeout() {
-        return socketTimeout;
-    }
-
-    public void setSocketTimeout(Integer socketTimeout) {
-        this.socketTimeout = socketTimeout;
-    }
-
-    public boolean getNeedClientAuth() {
-        return needClientAuth;
-    }
-
-    public void setNeedClientAuth(boolean needClientAuth) {
-        this.needClientAuth = needClientAuth;
-    }
-
-    public Boolean getReuseAddress() {
-        return reuseAddress;
-    }
-
-    public void setReuseAddress(Boolean reuseAddress) {
-        this.reuseAddress = reuseAddress;
-    }
-
-    public Integer getReceiveBufferSize() {
-        return receiveBufferSize;
-    }
-
-    public void setReceiveBufferSize(Integer receiveBufferSize) {
-        this.receiveBufferSize = receiveBufferSize;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java
deleted file mode 100644
index c24b540..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-
-import javax.net.ssl.SSLContext;
-
-/**
- * @author unattributed
- */
-public final class SocketConfiguration {
-
-    private Integer socketTimeout;
-    private Integer receiveBufferSize;
-    private Integer sendBufferSize;
-    private Boolean reuseAddress;
-    private Boolean keepAlive;
-    private Boolean oobInline;
-    private Boolean tcpNoDelay;
-    private Integer trafficClass;
-    private SSLContextFactory sslContextFactory;
-
-    public SSLContext createSSLContext() throws KeyManagementException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException, FileNotFoundException, IOException {
-        return sslContextFactory == null ? null : sslContextFactory.createSslContext();
-    }
-
-    public void setSSLContextFactory(final SSLContextFactory sslContextFactory) {
-        this.sslContextFactory = sslContextFactory;
-    }
-
-    public Integer getSocketTimeout() {
-        return socketTimeout;
-    }
-
-    public void setSocketTimeout(Integer socketTimeout) {
-        this.socketTimeout = socketTimeout;
-    }
-
-    public Boolean getReuseAddress() {
-        return reuseAddress;
-    }
-
-    public void setReuseAddress(Boolean reuseAddress) {
-        this.reuseAddress = reuseAddress;
-    }
-
-    public Boolean getKeepAlive() {
-        return keepAlive;
-    }
-
-    public void setKeepAlive(Boolean keepAlive) {
-        this.keepAlive = keepAlive;
-    }
-
-    public Boolean getOobInline() {
-        return oobInline;
-    }
-
-    public void setOobInline(Boolean oobInline) {
-        this.oobInline = oobInline;
-    }
-
-    public Integer getReceiveBufferSize() {
-        return receiveBufferSize;
-    }
-
-    public void setReceiveBufferSize(Integer receiveBufferSize) {
-        this.receiveBufferSize = receiveBufferSize;
-    }
-
-    public Integer getSendBufferSize() {
-        return sendBufferSize;
-    }
-
-    public void setSendBufferSize(Integer sendBufferSize) {
-        this.sendBufferSize = sendBufferSize;
-    }
-
-    public Boolean getTcpNoDelay() {
-        return tcpNoDelay;
-    }
-
-    public void setTcpNoDelay(Boolean tcpNoDelay) {
-        this.tcpNoDelay = tcpNoDelay;
-    }
-
-    public Integer getTrafficClass() {
-        return trafficClass;
-    }
-
-    public void setTrafficClass(Integer trafficClass) {
-        this.trafficClass = trafficClass;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java
deleted file mode 100644
index e02791a..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.nifi.logging.NiFiLog;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements a listener for TCP/IP messages sent over unicast socket.
- *
- * @author unattributed
- */
-public abstract class SocketListener {
-
-    private static final int DEFAULT_SHUTDOWN_LISTENER_SECONDS = 5;
-    private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(SocketListener.class));
-    private volatile ExecutorService executorService;  // volatile to guarantee most current value is visible
-    private volatile ServerSocket serverSocket;        // volatile to guarantee most current value is visible
-    private final int numThreads;
-    private final int port;
-    private final ServerSocketConfiguration configuration;
-    private final AtomicInteger shutdownListenerSeconds = new AtomicInteger(DEFAULT_SHUTDOWN_LISTENER_SECONDS);
-
-    public SocketListener(
-            final int numThreads,
-            final int port,
-            final ServerSocketConfiguration configuration) {
-
-        if (numThreads <= 0) {
-            throw new IllegalArgumentException("Number of threads may not be less than or equal to zero.");
-        } else if (configuration == null) {
-            throw new IllegalArgumentException("Server socket configuration may not be null.");
-        }
-
-        this.numThreads = numThreads;
-        this.port = port;
-        this.configuration = configuration;
-    }
-
-    /**
-     * Implements the action to perform when a new socket request is received.
-     * This class will close the socket.
-     *
-     * @param socket the socket
-     */
-    public abstract void dispatchRequest(final Socket socket);
-
-    public void start() throws IOException {
-
-        if (isRunning()) {
-            return;
-        }
-
-        try {
-            serverSocket = SocketUtils.createServerSocket(port, configuration);
-        } catch (KeyManagementException | UnrecoverableKeyException | NoSuchAlgorithmException | KeyStoreException | CertificateException e) {
-            throw new IOException(e);
-        }
-
-        final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
-        executorService = Executors.newFixedThreadPool(numThreads, new ThreadFactory() {
-            private final AtomicLong threadCounter = new AtomicLong(0L);
-
-            @Override
-            public Thread newThread(final Runnable r) {
-                final Thread newThread = defaultThreadFactory.newThread(r);
-                newThread.setName("Process NCM Request-" + threadCounter.incrementAndGet());
-                return newThread;
-            }
-        });
-
-        final ExecutorService runnableExecServiceRef = executorService;
-        final ServerSocket runnableServerSocketRef = serverSocket;
-
-        final Thread t = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                while (runnableExecServiceRef.isShutdown() == false) {
-                    Socket socket = null;
-                    try {
-                        try {
-                            socket = runnableServerSocketRef.accept();
-                            if (configuration.getSocketTimeout() != null) {
-                                socket.setSoTimeout(configuration.getSocketTimeout());
-                            }
-                        } catch (final SocketTimeoutException ste) {
-                            // nobody connected to us. Go ahead and call closeQuietly just to make sure we don't leave
-                            // any sockets lingering
-                            SocketUtils.closeQuietly(socket);
-                            continue;
-                        } catch (final SocketException se) {
-                            logger.warn("Failed to communicate with " + (socket == null ? "Unknown Host" : socket.getInetAddress().getHostName()) + " due to " + se, se);
-                            SocketUtils.closeQuietly(socket);
-                            continue;
-                        } catch (final Throwable t) {
-                            logger.warn("Socket Listener encountered exception: " + t, t);
-                            SocketUtils.closeQuietly(socket);
-                            continue;
-                        }
-
-                        final Socket finalSocket = socket;
-                        runnableExecServiceRef.execute(new Runnable() {
-                            @Override
-                            public void run() {
-                                try {
-                                    dispatchRequest(finalSocket);
-                                } catch (final Throwable t) {
-                                    logger.warn("Dispatching socket request encountered exception due to: " + t, t);
-                                } finally {
-                                    SocketUtils.closeQuietly(finalSocket);
-                                }
-                            }
-                        });
-                    } catch (final Throwable t) {
-                        logger.error("Socket Listener encountered exception: " + t, t);
-                        SocketUtils.closeQuietly(socket);
-                    }
-                }
-            }
-        });
-        t.setName("Cluster Socket Listener");
-        t.start();
-    }
-
-    public boolean isRunning() {
-        return (executorService != null && executorService.isShutdown() == false);
-    }
-
-    public void stop() throws IOException {
-
-        if (isRunning() == false) {
-            return;
-        }
-
-        // shutdown executor service
-        try {
-            if (getShutdownListenerSeconds() <= 0) {
-                executorService.shutdownNow();
-            } else {
-                executorService.shutdown();
-            }
-            executorService.awaitTermination(getShutdownListenerSeconds(), TimeUnit.SECONDS);
-        } catch (final InterruptedException ex) {
-            Thread.currentThread().interrupt();
-        } finally {
-            if (executorService.isTerminated()) {
-                logger.info("Socket Listener has been terminated successfully.");
-            } else {
-                logger.warn("Socket Listener has not terminated properly.  There exists an uninterruptable thread that will take an indeterminate amount of time to stop.");
-            }
-        }
-
-        // shutdown server socket
-        SocketUtils.closeQuietly(serverSocket);
-
-    }
-
-    public int getShutdownListenerSeconds() {
-        return shutdownListenerSeconds.get();
-    }
-
-    public void setShutdownListenerSeconds(final int shutdownListenerSeconds) {
-        this.shutdownListenerSeconds.set(shutdownListenerSeconds);
-    }
-
-    public ServerSocketConfiguration getConfiguration() {
-        return configuration;
-    }
-
-    public int getPort() {
-        if (isRunning()) {
-            return serverSocket.getLocalPort();
-        } else {
-            return port;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java
deleted file mode 100644
index fb6a00c..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLSocket;
-
-import org.apache.nifi.logging.NiFiLog;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author unattributed
- */
-public final class SocketUtils {
-
-    private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(SocketUtils.class));
-
-    public static Socket createSocket(final InetSocketAddress address, final SocketConfiguration config) throws IOException {
-        if (address == null) {
-            throw new IllegalArgumentException("Socket address may not be null.");
-        } else if (config == null) {
-            throw new IllegalArgumentException("Configuration may not be null.");
-        }
-
-        final Socket socket;
-
-        final SSLContext sslContext;
-        try {
-            sslContext = config.createSSLContext();
-        } catch (final Exception e) {
-            throw new IOException("Could not create SSLContext", e);
-        }
-
-        if (sslContext == null) {
-            socket = new Socket(address.getHostName(), address.getPort());
-        } else {
-            socket = sslContext.getSocketFactory().createSocket(address.getHostName(), address.getPort());
-        }
-
-        if (config.getSocketTimeout() != null) {
-            socket.setSoTimeout(config.getSocketTimeout());
-        }
-
-        if (config.getReuseAddress() != null) {
-            socket.setReuseAddress(config.getReuseAddress());
-        }
-
-        if (config.getReceiveBufferSize() != null) {
-            socket.setReceiveBufferSize(config.getReceiveBufferSize());
-        }
-
-        if (config.getSendBufferSize() != null) {
-            socket.setSendBufferSize(config.getSendBufferSize());
-        }
-
-        if (config.getTrafficClass() != null) {
-            socket.setTrafficClass(config.getTrafficClass());
-        }
-
-        if (config.getKeepAlive() != null) {
-            socket.setKeepAlive(config.getKeepAlive());
-        }
-
-        if (config.getOobInline() != null) {
-            socket.setOOBInline(config.getOobInline());
-        }
-
-        if (config.getTcpNoDelay() != null) {
-            socket.setTcpNoDelay(config.getTcpNoDelay());
-        }
-
-        return socket;
-    }
-
-    public static ServerSocket createServerSocket(final int port, final ServerSocketConfiguration config) throws IOException, KeyManagementException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, CertificateException {
-        if (config == null) {
-            throw new NullPointerException("Configuration may not be null.");
-        }
-
-        final SSLContext sslContext = config.createSSLContext();
-        final ServerSocket serverSocket;
-        if (sslContext == null) {
-            serverSocket = new ServerSocket(port);
-        } else {
-            serverSocket = sslContext.getServerSocketFactory().createServerSocket(port);
-            ((SSLServerSocket) serverSocket).setNeedClientAuth(config.getNeedClientAuth());
-        }
-
-        if (config.getSocketTimeout() != null) {
-            serverSocket.setSoTimeout(config.getSocketTimeout());
-        }
-
-        if (config.getReuseAddress() != null) {
-            serverSocket.setReuseAddress(config.getReuseAddress());
-        }
-
-        if (config.getReceiveBufferSize() != null) {
-            serverSocket.setReceiveBufferSize(config.getReceiveBufferSize());
-        }
-
-        return serverSocket;
-    }
-
-    public static void closeQuietly(final Socket socket) {
-        if (socket == null) {
-            return;
-        }
-
-        try {
-            try {
-                // can't shudown input/output individually with secure sockets
-                if ((socket instanceof SSLSocket) == false) {
-                    if (socket.isInputShutdown() == false) {
-                        socket.shutdownInput();
-                    }
-                    if (socket.isOutputShutdown() == false) {
-                        socket.shutdownOutput();
-                    }
-                }
-            } finally {
-                if (socket.isClosed() == false) {
-                    socket.close();
-                }
-            }
-        } catch (final Exception ex) {
-            logger.debug("Failed to close socket due to: " + ex, ex);
-        }
-    }
-
-    public static void closeQuietly(final ServerSocket serverSocket) {
-        if (serverSocket == null) {
-            return;
-        }
-
-        try {
-            serverSocket.close();
-        } catch (final Exception ex) {
-            logger.debug("Failed to close server socket due to: " + ex, ex);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java
deleted file mode 100644
index 7a62813..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-import java.net.InetSocketAddress;
-
-/**
- * A service that may be discovered at runtime. A service is defined as having a
- * unique case-sensitive service name and a socket address where it is
- * available.
- *
- * @author unattributed
- */
-public interface DiscoverableService {
-
-    /**
-     * The service's name. Two services are considered equal if they have the
-     * same case sensitive service name.
-     *
-     * @return the service's name
-     */
-    String getServiceName();
-
-    /**
-     * @return the service's address
-     */
-    InetSocketAddress getServiceAddress();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java
deleted file mode 100644
index 5f378b9..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-import java.net.InetSocketAddress;
-import org.apache.commons.lang3.StringUtils;
-
-/**
- * A basic implementation of the DiscoverableService interface. To services are
- * considered equal if they have the same case-sensitive service name.
- *
- * @author unattributed
- */
-public class DiscoverableServiceImpl implements DiscoverableService {
-
-    private final String serviceName;
-
-    private final InetSocketAddress serviceAddress;
-
-    public DiscoverableServiceImpl(final String serviceName, final InetSocketAddress serviceAddress) {
-        if (StringUtils.isBlank(serviceName)) {
-            throw new IllegalArgumentException("Service name may not be null or empty.");
-        } else if (serviceAddress == null) {
-            throw new IllegalArgumentException("Service address may not be null.");
-        }
-        this.serviceName = serviceName;
-        this.serviceAddress = serviceAddress;
-    }
-
-    @Override
-    public InetSocketAddress getServiceAddress() {
-        return serviceAddress;
-    }
-
-    @Override
-    public String getServiceName() {
-        return serviceName;
-    }
-
-    @Override
-    public String toString() {
-        return String.format("[Discoverable Service: %s available at %s:%d]", serviceName, serviceAddress.getHostName(), serviceAddress.getPort());
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj == null) {
-            return false;
-        }
-        if (!(obj instanceof DiscoverableService)) {
-            return false;
-        }
-        final DiscoverableService other = (DiscoverableService) obj;
-        return !((this.serviceName == null) ? (other.getServiceName() != null) : !this.serviceName.equals(other.getServiceName()));
-    }
-
-    @Override
-    public int hashCode() {
-        int hash = 5;
-        hash = 53 * hash + (this.serviceName != null ? this.serviceName.hashCode() : 0);
-        return hash;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java
deleted file mode 100644
index ea0b72a..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-/**
- * @author unattributed
- */
-public final class MulticastConfiguration {
-
-    private MulticastTimeToLive ttl = DEFAULT_MULTICAST_TTL;
-
-    private Integer socketTimeout;
-
-    private Integer receiveBufferSize;
-
-    private Integer sendBufferSize;
-
-    private Boolean reuseAddress;
-
-    private Integer trafficClass;
-
-    private Boolean loopbackMode;
-
-    public static final MulticastTimeToLive DEFAULT_MULTICAST_TTL = MulticastTimeToLive.SAME_SUBNET;
-
-    public MulticastTimeToLive getTtl() {
-        return ttl;
-    }
-
-    public void setTtl(final MulticastTimeToLive ttl) {
-        if (ttl == null) {
-            throw new NullPointerException("Multicast TTL may not be null.");
-        }
-        this.ttl = ttl;
-    }
-
-    public Integer getSocketTimeout() {
-        return socketTimeout;
-    }
-
-    public void setSocketTimeout(Integer socketTimeout) {
-        this.socketTimeout = socketTimeout;
-    }
-
-    public Boolean getReuseAddress() {
-        return reuseAddress;
-    }
-
-    public void setReuseAddress(Boolean reuseAddress) {
-        this.reuseAddress = reuseAddress;
-    }
-
-    public Integer getReceiveBufferSize() {
-        return receiveBufferSize;
-    }
-
-    public void setReceiveBufferSize(Integer receiveBufferSize) {
-        this.receiveBufferSize = receiveBufferSize;
-    }
-
-    public Integer getSendBufferSize() {
-        return sendBufferSize;
-    }
-
-    public void setSendBufferSize(Integer sendBufferSize) {
-        this.sendBufferSize = sendBufferSize;
-    }
-
-    public Integer getTrafficClass() {
-        return trafficClass;
-    }
-
-    public void setTrafficClass(Integer trafficClass) {
-        this.trafficClass = trafficClass;
-    }
-
-    public Boolean getLoopbackMode() {
-        return loopbackMode;
-    }
-
-    public void setLoopbackMode(Boolean loopbackMode) {
-        this.loopbackMode = loopbackMode;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java
deleted file mode 100644
index e562c25..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.InetSocketAddress;
-import java.net.MulticastSocket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements a listener for protocol messages sent over multicast. If a message
- * is of type MulticastProtocolMessage, then the underlying protocol message is
- * passed to the handler. If the receiving handler produces a message response,
- * then the message is wrapped with a MulticastProtocolMessage before being sent
- * to the originator.
- *
- * @author unattributed
- */
-public abstract class MulticastListener {
-
-    // constants
-    private static final int DEFAULT_SHUTDOWN_LISTENER_SECONDS = 5;
-    private static final int DEFAULT_MAX_PACKET_SIZE_BYTES = 512;
-
-    private static final Logger logger = new org.apache.nifi.logging.NiFiLog(LoggerFactory.getLogger(MulticastListener.class));
-
-    // immutable members
-    private final int numThreads;
-    private final InetSocketAddress multicastAddress;
-    private final MulticastConfiguration configuration;
-
-    private volatile ExecutorService executorService;     // volatile to guarantee most current value is visible
-    private volatile MulticastSocket multicastSocket;     // volatile to guarantee most current value is visible
-
-    private int shutdownListenerSeconds = DEFAULT_SHUTDOWN_LISTENER_SECONDS;
-    private int maxPacketSizeBytes = DEFAULT_MAX_PACKET_SIZE_BYTES;
-
-    public MulticastListener(
-            final int numThreads,
-            final InetSocketAddress multicastAddress,
-            final MulticastConfiguration configuration) {
-
-        if (numThreads <= 0) {
-            throw new IllegalArgumentException("Number of threads may not be less than or equal to zero.");
-        } else if (multicastAddress == null) {
-            throw new IllegalArgumentException("Multicast address may not be null.");
-        } else if (multicastAddress.getAddress().isMulticastAddress() == false) {
-            throw new IllegalArgumentException("Multicast group must be a Class D address.");
-        } else if (configuration == null) {
-            throw new IllegalArgumentException("Multicast configuration may not be null.");
-        }
-
-        this.numThreads = numThreads;
-        this.multicastAddress = multicastAddress;
-        this.configuration = configuration;
-    }
-
-    /**
-     * Implements the action to perform when a new datagram is received. This
-     * class must not close the multicast socket.
-     *
-     * @param multicastSocket
-     * @param packet the datagram socket
-     */
-    public abstract void dispatchRequest(final MulticastSocket multicastSocket, final DatagramPacket packet);
-
-    public void start() throws IOException {
-
-        if (isRunning()) {
-            return;
-        }
-
-        multicastSocket = MulticastUtils.createMulticastSocket(multicastAddress.getPort(), configuration);
-        multicastSocket.joinGroup(multicastAddress.getAddress());
-
-        executorService = Executors.newFixedThreadPool(numThreads);
-
-        final ExecutorService runnableExecServiceRef = executorService;
-        final MulticastSocket runnableMulticastSocketRef = multicastSocket;
-
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                while (runnableExecServiceRef.isShutdown() == false) {
-                    try {
-                        final byte[] buf = new byte[maxPacketSizeBytes];
-                        final DatagramPacket packet = new DatagramPacket(buf, maxPacketSizeBytes);
-                        runnableMulticastSocketRef.receive(packet);
-                        runnableExecServiceRef.execute(new Runnable() {
-                            @Override
-                            public void run() {
-                                dispatchRequest(multicastSocket, packet);
-                            }
-                        });
-                    } catch (final SocketException | SocketTimeoutException ste) {
-                        /* ignore so that we can accept connections in approximately a non-blocking fashion */
-                    } catch (final Exception e) {
-                        logger.warn("Cluster protocol receiver encountered exception: " + e, e);
-                    }
-                }
-            }
-        }).start();
-    }
-
-    public boolean isRunning() {
-        return (executorService != null && executorService.isShutdown() == false);
-    }
-
-    public void stop() throws IOException {
-
-        if (isRunning() == false) {
-            return;
-        }
-
-        // shutdown executor service
-        try {
-            if (getShutdownListenerSeconds() <= 0) {
-                executorService.shutdownNow();
-            } else {
-                executorService.shutdown();
-            }
-            executorService.awaitTermination(getShutdownListenerSeconds(), TimeUnit.SECONDS);
-        } catch (final InterruptedException ex) {
-            Thread.currentThread().interrupt();
-        } finally {
-            if (executorService.isTerminated()) {
-                logger.info("Multicast Listener has been terminated successfully.");
-            } else {
-                logger.warn("Multicast Listener has not terminated properly.  There exists an uninterruptable thread that will take an indeterminate amount of time to stop.");
-            }
-        }
-
-        // shutdown server socket
-        if (multicastSocket.isClosed() == false) {
-            multicastSocket.leaveGroup(multicastAddress.getAddress());
-            multicastSocket.close();
-        }
-
-    }
-
-    public int getShutdownListenerSeconds() {
-        return shutdownListenerSeconds;
-    }
-
-    public void setShutdownListenerSeconds(final int shutdownListenerSeconds) {
-        this.shutdownListenerSeconds = shutdownListenerSeconds;
-    }
-
-    public int getMaxPacketSizeBytes() {
-        return maxPacketSizeBytes;
-    }
-
-    public void setMaxPacketSizeBytes(int maxPacketSizeBytes) {
-        if (maxPacketSizeBytes <= 0) {
-            throw new IllegalArgumentException("Max packet size must be greater than zero bytes.");
-        }
-        this.maxPacketSizeBytes = maxPacketSizeBytes;
-    }
-
-    public MulticastConfiguration getConfiguration() {
-        return configuration;
-    }
-
-    public InetSocketAddress getMulticastAddress() {
-        return multicastAddress;
-    }
-
-    public int getNumThreads() {
-        return numThreads;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java
deleted file mode 100644
index c254c11..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-import java.net.InetSocketAddress;
-
-/**
- * Defines the interface for discovering services based on name. Services are
- * expected to be exposed via socket address and port.
- *
- * @author unattributed
- */
-public interface MulticastServiceDiscovery extends ServiceDiscovery {
-
-    /**
-     * @return the multicast address
-     */
-    InetSocketAddress getMulticastAddress();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java
deleted file mode 100644
index a3cff9b..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-import java.net.InetSocketAddress;
-
-/**
- * Defines the interface for broadcasting a service via multicast.
- *
- * @author unattributed
- */
-public interface MulticastServicesBroadcaster extends ServicesBroadcaster {
-
-    /**
-     * @return the multicast address
-     */
-    InetSocketAddress getMulticastAddress();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java
deleted file mode 100644
index dad1173..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-/**
- * @author unattributed
- */
-public enum MulticastTimeToLive {
-
-    SAME_HOST(0),
-    SAME_SUBNET(1),
-    SAME_SITE(32),
-    SAME_REGION(64),
-    SAME_CONTINENT(128),
-    UNRESTRICTED(255);
-
-    private final int ttl;
-
-    MulticastTimeToLive(final int ttl) {
-        this.ttl = ttl;
-    }
-
-    public int getTtl() {
-        return ttl;
-    }
-
-    public MulticastTimeToLive valueOfByTtl(final int ttl) {
-        for (final MulticastTimeToLive value : values()) {
-            if (value.getTtl() == ttl) {
-                return value;
-            }
-        }
-        return null;
-    }
-
-}