You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/01/31 04:44:17 UTC
[40/62] [abbrv] [partial] incubator-nifi git commit: NIFI-270 Made
all changes identified by adam, mark, joey to prep for a cleaner build
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
deleted file mode 100644
index 2ae2c07..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio;
-
-import java.io.IOException;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.UUID;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * @author none
- */
-public final class ChannelDispatcher implements Runnable {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ChannelDispatcher.class);
- private final Selector serverSocketSelector;
- private final Selector socketChannelSelector;
- private final ScheduledExecutorService executor;
- private final BufferPool emptyBuffers;
- private final StreamConsumerFactory factory;
- private final AtomicLong channelReaderFrequencyMilliseconds = new AtomicLong(DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS);
- private final long timeout;
- private volatile boolean stop = false;
- public static final long DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS = 100L;
-
- public ChannelDispatcher(final Selector serverSocketSelector, final Selector socketChannelSelector, final ScheduledExecutorService service,
- final StreamConsumerFactory factory, final BufferPool buffers, final long timeout, final TimeUnit unit) {
- this.serverSocketSelector = serverSocketSelector;
- this.socketChannelSelector = socketChannelSelector;
- this.executor = service;
- this.factory = factory;
- emptyBuffers = buffers;
- this.timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
- }
-
- public void setChannelReaderFrequency(final long period, final TimeUnit timeUnit) {
- channelReaderFrequencyMilliseconds.set(TimeUnit.MILLISECONDS.convert(period, timeUnit));
- }
-
- @Override
- public void run() {
- while (!stop) {
- try {
- selectServerSocketKeys();
- selectSocketChannelKeys();
- } catch (final Exception ex) {
- LOGGER.warn("Key selection failed: {} Normal during shutdown.", new Object[]{ex});
- }
- }
- }
-
- /*
- * When serverSocketsChannels are registered with the selector, want each invoke of this method to loop through all
- * channels' keys.
- *
- * @throws IOException
- */
- private void selectServerSocketKeys() throws IOException {
- int numSelected = serverSocketSelector.select(timeout);
- if (numSelected == 0) {
- return;
- }
-
- // for each registered server socket - see if any connections are waiting to be established
- final Iterator<SelectionKey> itr = serverSocketSelector.selectedKeys().iterator();
- while (itr.hasNext()) {
- SelectionKey serverSocketkey = itr.next();
- final SelectableChannel channel = serverSocketkey.channel();
- AbstractChannelReader reader = null;
- if (serverSocketkey.isValid() && serverSocketkey.isAcceptable()) {
- final ServerSocketChannel ssChannel = (ServerSocketChannel) serverSocketkey.channel();
- final SocketChannel sChannel = ssChannel.accept();
- if (sChannel != null) {
- sChannel.configureBlocking(false);
- final SelectionKey socketChannelKey = sChannel.register(socketChannelSelector, SelectionKey.OP_READ);
- final String readerId = sChannel.socket().toString();
- reader = new SocketChannelReader(readerId, socketChannelKey, emptyBuffers, factory);
- final ScheduledFuture<?> readerFuture = executor.scheduleWithFixedDelay(reader, 10L,
- channelReaderFrequencyMilliseconds.get(), TimeUnit.MILLISECONDS);
- reader.setScheduledFuture(readerFuture);
- socketChannelKey.attach(reader);
- }
- }
- itr.remove(); // do this so that the next select operation returns a positive value; otherwise, it will return 0.
- if (reader != null && LOGGER.isDebugEnabled()) {
- LOGGER.debug(this + " New Connection established. Server channel: " + channel + " Reader: " + reader);
- }
- }
- }
-
- /*
- * When invoking this method, only want to iterate through the selected keys once. When a key is entered into the selectors
- * selected key set, select will return a positive value. The next select will return 0 if nothing has changed. Note that
- * the selected key set is not manually changed via a remove operation.
- *
- * @throws IOException
- */
- private void selectSocketChannelKeys() throws IOException {
- // once a channel associated with a key in this selector is 'ready', it causes this select to immediately return.
- // thus, for each trip through the run() we only get hit with one real timeout...the one in selectServerSocketKeys.
- int numSelected = socketChannelSelector.select(timeout);
- if (numSelected == 0) {
- return;
- }
-
- for (SelectionKey socketChannelKey : socketChannelSelector.selectedKeys()) {
- final SelectableChannel channel = socketChannelKey.channel();
- AbstractChannelReader reader = null;
- // there are 2 kinds of channels in this selector, both which have their own readers and are executed in their own
- // threads. We will get here whenever a new SocketChannel is created due to an incoming connection. However,
- // for a DatagramChannel we don't want to create a new reader unless it is a new DatagramChannel. The only
- // way to tell if it's new is the lack of an attachment.
- if (channel instanceof DatagramChannel && socketChannelKey.attachment() == null) {
- reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory);
- socketChannelKey.attach(reader);
- final ScheduledFuture<?> readerFuture = executor.scheduleWithFixedDelay(reader, 10L, channelReaderFrequencyMilliseconds.get(),
- TimeUnit.MILLISECONDS);
- reader.setScheduledFuture(readerFuture);
- }
- if (reader != null && LOGGER.isDebugEnabled()) {
- LOGGER.debug(this + " New Connection established. Server channel: " + channel + " Reader: " + reader);
- }
- }
-
- }
-
- public void stop() {
- stop = true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
deleted file mode 100644
index b0a1cfb..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.StandardSocketOptions;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
-
-import org.apache.commons.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class provides the entry point to NIO based socket listeners for NiFi
- * processors and services. There are 2 supported types of Listeners, Datagram
- * (UDP based transmissions) and ServerSocket (TCP based transmissions). This
- * will create the ChannelDispatcher, which is a Runnable and is controlled via
- * the ScheduledExecutorService, which is also created by this class. The
- * ChannelDispatcher handles connections to the ServerSocketChannels and creates
- * the readers associated with the resulting SocketChannels. Additionally, this
- * creates and manages two Selectors, one for ServerSocketChannels and another
- * for SocketChannels and DatagramChannels.
- *
- * The threading model for this consists of one thread for the
- * ChannelDispatcher, one thread per added SocketChannel reader, one thread per
- * added DatagramChannel reader. The ChannelDispatcher is not scheduled with
- * fixed delay as the others are. It is throttled by the provided timeout value.
- * Within the ChannelDispatcher there are two blocking operations which will
- * block for the given timeout each time through the enclosing loop.
- *
- * All channels are cached in one of the two Selectors via their SelectionKey.
- * The serverSocketSelector maintains all the added ServerSocketChannels; the
- * socketChannelSelector maintains the all the add DatagramChannels and the
- * created SocketChannels. Further, the SelectionKey of the DatagramChannel and
- * the SocketChannel is injected with the channel's associated reader.
- *
- * All ChannelReaders will get throttled by the unavailability of buffers in the
- * provided BufferPool. This is designed to create back pressure.
- *
- * @author none
- */
-public final class ChannelListener {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ChannelListener.class);
- private final ScheduledExecutorService executor;
- private final Selector serverSocketSelector; // used to listen for new connections
- private final Selector socketChannelSelector; // used to listen on existing connections
- private final ChannelDispatcher channelDispatcher;
- private final BufferPool bufferPool;
- private final int initialBufferPoolSize;
- private volatile long channelReaderFrequencyMSecs = 50;
-
- public ChannelListener(final int threadPoolSize, final StreamConsumerFactory consumerFactory, final BufferPool bufferPool, int timeout,
- TimeUnit unit) throws IOException {
- this.executor = Executors.newScheduledThreadPool(threadPoolSize + 1); // need to allow for long running ChannelDispatcher thread
- this.serverSocketSelector = Selector.open();
- this.socketChannelSelector = Selector.open();
- this.bufferPool = bufferPool;
- this.initialBufferPoolSize = bufferPool.size();
- channelDispatcher = new ChannelDispatcher(serverSocketSelector, socketChannelSelector, executor, consumerFactory, bufferPool,
- timeout, unit);
- executor.schedule(channelDispatcher, 50, TimeUnit.MILLISECONDS);
- }
-
- public void setChannelReaderSchedulingPeriod(final long period, final TimeUnit unit) {
- channelReaderFrequencyMSecs = TimeUnit.MILLISECONDS.convert(period, unit);
- channelDispatcher.setChannelReaderFrequency(period, unit);
- }
-
- /**
- * Adds a server socket channel for listening to connections.
- *
- * @param nicIPAddress - if null binds to wildcard address
- * @param port - port to bind to
- * @param receiveBufferSize - size of OS receive buffer to request. If less
- * than 0 then will not be set and OS default will win.
- * @throws IOException
- */
- public void addServerSocket(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
- throws IOException {
- final ServerSocketChannel ssChannel = ServerSocketChannel.open();
- ssChannel.configureBlocking(false);
- if (receiveBufferSize > 0) {
- ssChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
- final int actualReceiveBufSize = ssChannel.getOption(StandardSocketOptions.SO_RCVBUF);
- if (actualReceiveBufSize < receiveBufferSize) {
- LOGGER.warn(this + " attempted to set TCP Receive Buffer Size to "
- + receiveBufferSize + " bytes but could only set to " + actualReceiveBufSize
- + "bytes. You may want to consider changing the Operating System's "
- + "maximum receive buffer");
- }
- }
- ssChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
- ssChannel.bind(new InetSocketAddress(nicIPAddress, port));
- ssChannel.register(serverSocketSelector, SelectionKey.OP_ACCEPT);
- }
-
- /**
- * Binds to listen for data grams on the given local IPAddress/port
- *
- * @param nicIPAddress - if null will listen on wildcard address, which
- * means datagrams will be received on all local network interfaces.
- * Otherwise, will bind to the provided IP address associated with some NIC.
- * @param port - the port to listen on
- * @param receiveBufferSize - the number of bytes to request for a receive
- * buffer from OS
- * @throws IOException
- */
- public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
- throws IOException {
- final DatagramChannel dChannel = createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize);
- dChannel.register(socketChannelSelector, SelectionKey.OP_READ);
- }
-
- /**
- * Binds to listen for data grams on the given local IPAddress/port and
- * restricts receipt of datagrams to those from the provided host and port,
- * must specify both. This improves performance for datagrams coming from a
- * sender that is known a-priori.
- *
- * @param nicIPAddress - if null will listen on wildcard address, which
- * means datagrams will be received on all local network interfaces.
- * Otherwise, will bind to the provided IP address associated with some NIC.
- * @param port - the port to listen on. This is used to provide a well-known
- * destination for a sender.
- * @param receiveBufferSize - the number of bytes to request for a receive
- * buffer from OS
- * @param sendingHost - the hostname, or IP address, of the sender of
- * datagrams. Only datagrams from this host will be received. If this is
- * null the wildcard ip is used, which means datagrams may be received from
- * any network interface on the local host.
- * @param sendingPort - the port used by the sender of datagrams. Only
- * datagrams from this port will be received.
- * @throws IOException
- */
- public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize, final String sendingHost,
- final Integer sendingPort) throws IOException {
-
- if (sendingHost == null || sendingPort == null) {
- addDatagramChannel(nicIPAddress, port, receiveBufferSize);
- return;
- }
- final DatagramChannel dChannel = createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize);
- dChannel.connect(new InetSocketAddress(sendingHost, sendingPort));
- dChannel.register(socketChannelSelector, SelectionKey.OP_READ);
- }
-
- private DatagramChannel createAndBindDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
- throws IOException {
- final DatagramChannel dChannel = DatagramChannel.open();
- dChannel.configureBlocking(false);
- if (receiveBufferSize > 0) {
- dChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
- final int actualReceiveBufSize = dChannel.getOption(StandardSocketOptions.SO_RCVBUF);
- if (actualReceiveBufSize < receiveBufferSize) {
- LOGGER.warn(this + " attempted to set UDP Receive Buffer Size to "
- + receiveBufferSize + " bytes but could only set to " + actualReceiveBufSize
- + "bytes. You may want to consider changing the Operating System's "
- + "maximum receive buffer");
- }
- }
- dChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
- dChannel.bind(new InetSocketAddress(nicIPAddress, port));
- return dChannel;
- }
-
- public void shutdown(final long period, final TimeUnit timeUnit) {
- channelDispatcher.stop();
- for (SelectionKey selectionKey : socketChannelSelector.keys()) {
- final AbstractChannelReader reader = (AbstractChannelReader) selectionKey.attachment();
- selectionKey.cancel();
- if (reader != null) {
- while (!reader.isClosed()) {
- try {
- Thread.sleep(channelReaderFrequencyMSecs);
- } catch (InterruptedException e) {
- }
- }
- final ScheduledFuture<?> readerFuture = reader.getScheduledFuture();
- readerFuture.cancel(false);
- }
- IOUtils.closeQuietly(selectionKey.channel()); // should already be closed via reader, but if reader did not exist...
- }
- IOUtils.closeQuietly(socketChannelSelector);
-
- for (SelectionKey selectionKey : serverSocketSelector.keys()) {
- selectionKey.cancel();
- IOUtils.closeQuietly(selectionKey.channel());
- }
- IOUtils.closeQuietly(serverSocketSelector);
- executor.shutdown();
- try {
- executor.awaitTermination(period, timeUnit);
- } catch (final InterruptedException ex) {
- LOGGER.warn("Interrupted while trying to shutdown executor");
- }
- final int currentBufferPoolSize = bufferPool.size();
- final String warning = (currentBufferPoolSize != initialBufferPoolSize) ? "Initial buffer count=" + initialBufferPoolSize
- + " Current buffer count=" + currentBufferPoolSize
- + " Could indicate a buffer leak. Ensure all consumers are executed until they complete." : "";
- LOGGER.info("Channel listener shutdown. " + warning);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
deleted file mode 100644
index 1eb5c7e..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.SelectionKey;
-
-import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
-
-/**
- *
- * @author none
- */
-public final class DatagramChannelReader extends AbstractChannelReader {
-
- public static final int MAX_UDP_PACKET_SIZE = 65507;
-
- public DatagramChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) {
- super(id, key, empties, consumerFactory);
- }
-
- /**
- * Will receive UDP data from channel and won't receive anything unless the
- * given buffer has enough space for at least one full max udp packet.
- *
- * @param key
- * @param buffer
- * @return
- * @throws IOException
- */
- @Override
- protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException {
- final DatagramChannel dChannel = (DatagramChannel) key.channel();
- final int initialBufferPosition = buffer.position();
- while (buffer.remaining() > MAX_UDP_PACKET_SIZE && key.isValid() && key.isReadable()) {
- if (dChannel.receive(buffer) == null) {
- break;
- }
- }
- return buffer.position() - initialBufferPosition;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
deleted file mode 100644
index db2c102..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
-
-/**
- *
- * @author none
- */
-public final class SocketChannelReader extends AbstractChannelReader {
-
- public SocketChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) {
- super(id, key, empties, consumerFactory);
- }
-
- /**
- * Receives TCP data from the socket channel for the given key.
- *
- * @param key
- * @param buffer
- * @return
- * @throws IOException
- */
- @Override
- protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException {
- int bytesRead = 0;
- final SocketChannel sChannel = (SocketChannel) key.channel();
- while (key.isValid() && key.isReadable()) {
- bytesRead = sChannel.read(buffer);
- if (bytesRead <= 0) {
- break;
- }
- }
- return bytesRead;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java
deleted file mode 100644
index fce59c6..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio.consumer;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.nifi.io.nio.BufferPool;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
-
-/**
- *
- * @author none
- */
-public abstract class AbstractStreamConsumer implements StreamConsumer {
-
- private final String uniqueId;
- private BufferPool bufferPool = null;
- private final BlockingQueue<ByteBuffer> filledBuffers = new LinkedBlockingQueue<>();
- private final AtomicBoolean streamEnded = new AtomicBoolean(false);
- private final AtomicBoolean consumerEnded = new AtomicBoolean(false);
-
- public AbstractStreamConsumer(final String id) {
- uniqueId = id;
- }
-
- @Override
- public final void setReturnBufferQueue(final BufferPool returnQueue) {
- bufferPool = returnQueue;
- }
-
- @Override
- public final void addFilledBuffer(final ByteBuffer buffer) {
- if (isConsumerFinished()) {
- buffer.clear();
- bufferPool.returnBuffer(buffer, buffer.remaining());
- } else {
- filledBuffers.add(buffer);
- }
- }
-
- @Override
- public final void process() throws IOException {
- if (isConsumerFinished()) {
- return;
- }
- if (streamEnded.get() && filledBuffers.isEmpty()) {
- consumerEnded.set(true);
- onConsumerDone();
- return;
- }
- final ByteBuffer buffer = filledBuffers.poll();
- if (buffer != null) {
- final int bytesToProcess = buffer.remaining();
- try {
- processBuffer(buffer);
- } finally {
- buffer.clear();
- bufferPool.returnBuffer(buffer, bytesToProcess);
- }
- }
- }
-
- protected abstract void processBuffer(ByteBuffer buffer) throws IOException;
-
- @Override
- public final void signalEndOfStream() {
- streamEnded.set(true);
- }
-
- /**
- * Convenience method that is called when the consumer is done processing
- * based on being told the signal is end of stream and has processed all
- * given buffers.
- */
- protected void onConsumerDone() {
- }
-
- @Override
- public final boolean isConsumerFinished() {
- return consumerEnded.get();
- }
-
- @Override
- public final String getId() {
- return uniqueId;
- }
-
- @Override
- public final boolean equals(final Object obj) {
- if (obj == null) {
- return false;
- }
- if (obj == this) {
- return true;
- }
- if (obj.getClass() != getClass()) {
- return false;
- }
- AbstractStreamConsumer rhs = (AbstractStreamConsumer) obj;
- return new EqualsBuilder().appendSuper(super.equals(obj)).append(uniqueId, rhs.uniqueId).isEquals();
- }
-
- @Override
- public final int hashCode() {
- return new HashCodeBuilder(19, 23).append(uniqueId).toHashCode();
- }
-
- @Override
- public final String toString() {
- return new ToStringBuilder(this, ToStringStyle.NO_FIELD_NAMES_STYLE).append(uniqueId).toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
deleted file mode 100644
index d75b7d7..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio.consumer;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.apache.nifi.io.nio.BufferPool;
-
-/**
- * A StreamConsumer must be thread safe. It may be accessed concurrently by a
- * thread providing data to process and another thread that is processing that
- * data.
- *
- * @author none
- */
-public interface StreamConsumer {
-
- /**
- * Will be called once just after construction. It provides the queue to
- * which processed and emptied and cleared buffers must be returned. For
- * each time <code>addFilledBuffer</code> is called there should be an
- * associated add to this given queue. If not, buffers will run out and all
- * stream processing will halt. READ THIS!!!
- *
- * @param returnQueue
- */
- void setReturnBufferQueue(BufferPool returnQueue);
-
- /**
- * Will be called by the thread that produces byte buffers with available
- * data to be processed. If the consumer is finished this should simply
- * return the given buffer to the return buffer queue (after it is cleared)
- *
- * @param buffer
- */
- void addFilledBuffer(ByteBuffer buffer);
-
- /**
- * Will be called by the thread that executes the consumption of data. May
- * be called many times though once <code>isConsumerFinished</code> returns
- * true this method will likely do nothing.
- * @throws java.io.IOException
- */
- void process() throws IOException;
-
- /**
- * Called once the end of the input stream is detected
- */
- void signalEndOfStream();
-
- /**
- * If true signals the consumer is done consuming data and will not process
- * any more buffers.
- *
- * @return
- */
- boolean isConsumerFinished();
-
- /**
- * Uniquely identifies the consumer
- *
- * @return
- */
- String getId();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java
deleted file mode 100644
index df298d5..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio.consumer;
-
-/**
- *
- * @author none
- */
-public interface StreamConsumerFactory {
-
- StreamConsumer newInstance(String streamId);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
deleted file mode 100644
index 7ed5ad4..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.security.KeyManagementException;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-
-import org.apache.nifi.util.NiFiProperties;
-
-public class SSLContextFactory {
-
- private final String keystore;
- private final char[] keystorePass;
- private final String keystoreType;
- private final String truststore;
- private final char[] truststorePass;
- private final String truststoreType;
-
- private final KeyManager[] keyManagers;
- private final TrustManager[] trustManagers;
-
- public SSLContextFactory(final NiFiProperties properties) throws NoSuchAlgorithmException, CertificateException, FileNotFoundException, IOException, KeyStoreException, UnrecoverableKeyException {
- keystore = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE);
- keystorePass = getPass(properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD));
- keystoreType = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_TYPE);
-
- truststore = properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE);
- truststorePass = getPass(properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD));
- truststoreType = properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_TYPE);
-
- // prepare the keystore
- final KeyStore keyStore = KeyStore.getInstance(keystoreType);
- keyStore.load(new FileInputStream(keystore), keystorePass);
- final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- keyManagerFactory.init(keyStore, keystorePass);
-
- // prepare the truststore
- final KeyStore trustStore = KeyStore.getInstance(truststoreType);
- trustStore.load(new FileInputStream(truststore), truststorePass);
- final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
- trustManagerFactory.init(trustStore);
-
- keyManagers = keyManagerFactory.getKeyManagers();
- trustManagers = trustManagerFactory.getTrustManagers();
- }
-
- private static char[] getPass(final String password) {
- return password == null ? null : password.toCharArray();
- }
-
- /**
- * Creates a SSLContext instance using the given information.
- *
- *
- * @return a SSLContext instance
- * @throws java.security.KeyStoreException
- * @throws java.io.IOException
- * @throws java.security.NoSuchAlgorithmException
- * @throws java.security.cert.CertificateException
- * @throws java.security.UnrecoverableKeyException
- * @throws java.security.KeyManagementException
- */
- public SSLContext createSslContext() throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException,
- UnrecoverableKeyException, KeyManagementException {
-
- // initialize the ssl context
- final SSLContext sslContext = SSLContext.getInstance("TLS");
- sslContext.init(keyManagers, trustManagers, new SecureRandom());
- sslContext.getDefaultSSLParameters().setNeedClientAuth(true);
-
- return sslContext;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java
deleted file mode 100644
index fc279fb..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-
-import javax.net.ssl.SSLContext;
-
-/**
- * @author unattributed
- */
-public final class ServerSocketConfiguration {
-
- private boolean needClientAuth;
- private Integer socketTimeout;
- private Boolean reuseAddress;
- private Integer receiveBufferSize;
- private SSLContextFactory sslContextFactory;
-
- public ServerSocketConfiguration() {
- }
-
- public SSLContext createSSLContext() throws KeyManagementException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException, FileNotFoundException, IOException {
- return sslContextFactory == null ? null : sslContextFactory.createSslContext();
- }
-
- public void setSSLContextFactory(final SSLContextFactory sslContextFactory) {
- this.sslContextFactory = sslContextFactory;
- }
-
- public Integer getSocketTimeout() {
- return socketTimeout;
- }
-
- public void setSocketTimeout(Integer socketTimeout) {
- this.socketTimeout = socketTimeout;
- }
-
- public boolean getNeedClientAuth() {
- return needClientAuth;
- }
-
- public void setNeedClientAuth(boolean needClientAuth) {
- this.needClientAuth = needClientAuth;
- }
-
- public Boolean getReuseAddress() {
- return reuseAddress;
- }
-
- public void setReuseAddress(Boolean reuseAddress) {
- this.reuseAddress = reuseAddress;
- }
-
- public Integer getReceiveBufferSize() {
- return receiveBufferSize;
- }
-
- public void setReceiveBufferSize(Integer receiveBufferSize) {
- this.receiveBufferSize = receiveBufferSize;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java
deleted file mode 100644
index c24b540..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-
-import javax.net.ssl.SSLContext;
-
-/**
- * @author unattributed
- */
-public final class SocketConfiguration {
-
- private Integer socketTimeout;
- private Integer receiveBufferSize;
- private Integer sendBufferSize;
- private Boolean reuseAddress;
- private Boolean keepAlive;
- private Boolean oobInline;
- private Boolean tcpNoDelay;
- private Integer trafficClass;
- private SSLContextFactory sslContextFactory;
-
- public SSLContext createSSLContext() throws KeyManagementException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException, FileNotFoundException, IOException {
- return sslContextFactory == null ? null : sslContextFactory.createSslContext();
- }
-
- public void setSSLContextFactory(final SSLContextFactory sslContextFactory) {
- this.sslContextFactory = sslContextFactory;
- }
-
- public Integer getSocketTimeout() {
- return socketTimeout;
- }
-
- public void setSocketTimeout(Integer socketTimeout) {
- this.socketTimeout = socketTimeout;
- }
-
- public Boolean getReuseAddress() {
- return reuseAddress;
- }
-
- public void setReuseAddress(Boolean reuseAddress) {
- this.reuseAddress = reuseAddress;
- }
-
- public Boolean getKeepAlive() {
- return keepAlive;
- }
-
- public void setKeepAlive(Boolean keepAlive) {
- this.keepAlive = keepAlive;
- }
-
- public Boolean getOobInline() {
- return oobInline;
- }
-
- public void setOobInline(Boolean oobInline) {
- this.oobInline = oobInline;
- }
-
- public Integer getReceiveBufferSize() {
- return receiveBufferSize;
- }
-
- public void setReceiveBufferSize(Integer receiveBufferSize) {
- this.receiveBufferSize = receiveBufferSize;
- }
-
- public Integer getSendBufferSize() {
- return sendBufferSize;
- }
-
- public void setSendBufferSize(Integer sendBufferSize) {
- this.sendBufferSize = sendBufferSize;
- }
-
- public Boolean getTcpNoDelay() {
- return tcpNoDelay;
- }
-
- public void setTcpNoDelay(Boolean tcpNoDelay) {
- this.tcpNoDelay = tcpNoDelay;
- }
-
- public Integer getTrafficClass() {
- return trafficClass;
- }
-
- public void setTrafficClass(Integer trafficClass) {
- this.trafficClass = trafficClass;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java
deleted file mode 100644
index e02791a..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.nifi.logging.NiFiLog;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements a listener for TCP/IP messages sent over unicast socket.
- *
- * @author unattributed
- */
-public abstract class SocketListener {
-
- private static final int DEFAULT_SHUTDOWN_LISTENER_SECONDS = 5;
- private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(SocketListener.class));
- private volatile ExecutorService executorService; // volatile to guarantee most current value is visible
- private volatile ServerSocket serverSocket; // volatile to guarantee most current value is visible
- private final int numThreads;
- private final int port;
- private final ServerSocketConfiguration configuration;
- private final AtomicInteger shutdownListenerSeconds = new AtomicInteger(DEFAULT_SHUTDOWN_LISTENER_SECONDS);
-
- public SocketListener(
- final int numThreads,
- final int port,
- final ServerSocketConfiguration configuration) {
-
- if (numThreads <= 0) {
- throw new IllegalArgumentException("Number of threads may not be less than or equal to zero.");
- } else if (configuration == null) {
- throw new IllegalArgumentException("Server socket configuration may not be null.");
- }
-
- this.numThreads = numThreads;
- this.port = port;
- this.configuration = configuration;
- }
-
- /**
- * Implements the action to perform when a new socket request is received.
- * This class will close the socket.
- *
- * @param socket the socket
- */
- public abstract void dispatchRequest(final Socket socket);
-
- public void start() throws IOException {
-
- if (isRunning()) {
- return;
- }
-
- try {
- serverSocket = SocketUtils.createServerSocket(port, configuration);
- } catch (KeyManagementException | UnrecoverableKeyException | NoSuchAlgorithmException | KeyStoreException | CertificateException e) {
- throw new IOException(e);
- }
-
- final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
- executorService = Executors.newFixedThreadPool(numThreads, new ThreadFactory() {
- private final AtomicLong threadCounter = new AtomicLong(0L);
-
- @Override
- public Thread newThread(final Runnable r) {
- final Thread newThread = defaultThreadFactory.newThread(r);
- newThread.setName("Process NCM Request-" + threadCounter.incrementAndGet());
- return newThread;
- }
- });
-
- final ExecutorService runnableExecServiceRef = executorService;
- final ServerSocket runnableServerSocketRef = serverSocket;
-
- final Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- while (runnableExecServiceRef.isShutdown() == false) {
- Socket socket = null;
- try {
- try {
- socket = runnableServerSocketRef.accept();
- if (configuration.getSocketTimeout() != null) {
- socket.setSoTimeout(configuration.getSocketTimeout());
- }
- } catch (final SocketTimeoutException ste) {
- // nobody connected to us. Go ahead and call closeQuietly just to make sure we don't leave
- // any sockets lingering
- SocketUtils.closeQuietly(socket);
- continue;
- } catch (final SocketException se) {
- logger.warn("Failed to communicate with " + (socket == null ? "Unknown Host" : socket.getInetAddress().getHostName()) + " due to " + se, se);
- SocketUtils.closeQuietly(socket);
- continue;
- } catch (final Throwable t) {
- logger.warn("Socket Listener encountered exception: " + t, t);
- SocketUtils.closeQuietly(socket);
- continue;
- }
-
- final Socket finalSocket = socket;
- runnableExecServiceRef.execute(new Runnable() {
- @Override
- public void run() {
- try {
- dispatchRequest(finalSocket);
- } catch (final Throwable t) {
- logger.warn("Dispatching socket request encountered exception due to: " + t, t);
- } finally {
- SocketUtils.closeQuietly(finalSocket);
- }
- }
- });
- } catch (final Throwable t) {
- logger.error("Socket Listener encountered exception: " + t, t);
- SocketUtils.closeQuietly(socket);
- }
- }
- }
- });
- t.setName("Cluster Socket Listener");
- t.start();
- }
-
- public boolean isRunning() {
- return (executorService != null && executorService.isShutdown() == false);
- }
-
- public void stop() throws IOException {
-
- if (isRunning() == false) {
- return;
- }
-
- // shutdown executor service
- try {
- if (getShutdownListenerSeconds() <= 0) {
- executorService.shutdownNow();
- } else {
- executorService.shutdown();
- }
- executorService.awaitTermination(getShutdownListenerSeconds(), TimeUnit.SECONDS);
- } catch (final InterruptedException ex) {
- Thread.currentThread().interrupt();
- } finally {
- if (executorService.isTerminated()) {
- logger.info("Socket Listener has been terminated successfully.");
- } else {
- logger.warn("Socket Listener has not terminated properly. There exists an uninterruptable thread that will take an indeterminate amount of time to stop.");
- }
- }
-
- // shutdown server socket
- SocketUtils.closeQuietly(serverSocket);
-
- }
-
- public int getShutdownListenerSeconds() {
- return shutdownListenerSeconds.get();
- }
-
- public void setShutdownListenerSeconds(final int shutdownListenerSeconds) {
- this.shutdownListenerSeconds.set(shutdownListenerSeconds);
- }
-
- public ServerSocketConfiguration getConfiguration() {
- return configuration;
- }
-
- public int getPort() {
- if (isRunning()) {
- return serverSocket.getLocalPort();
- } else {
- return port;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java
deleted file mode 100644
index fb6a00c..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLSocket;
-
-import org.apache.nifi.logging.NiFiLog;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author unattributed
- */
-public final class SocketUtils {
-
- private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(SocketUtils.class));
-
- public static Socket createSocket(final InetSocketAddress address, final SocketConfiguration config) throws IOException {
- if (address == null) {
- throw new IllegalArgumentException("Socket address may not be null.");
- } else if (config == null) {
- throw new IllegalArgumentException("Configuration may not be null.");
- }
-
- final Socket socket;
-
- final SSLContext sslContext;
- try {
- sslContext = config.createSSLContext();
- } catch (final Exception e) {
- throw new IOException("Could not create SSLContext", e);
- }
-
- if (sslContext == null) {
- socket = new Socket(address.getHostName(), address.getPort());
- } else {
- socket = sslContext.getSocketFactory().createSocket(address.getHostName(), address.getPort());
- }
-
- if (config.getSocketTimeout() != null) {
- socket.setSoTimeout(config.getSocketTimeout());
- }
-
- if (config.getReuseAddress() != null) {
- socket.setReuseAddress(config.getReuseAddress());
- }
-
- if (config.getReceiveBufferSize() != null) {
- socket.setReceiveBufferSize(config.getReceiveBufferSize());
- }
-
- if (config.getSendBufferSize() != null) {
- socket.setSendBufferSize(config.getSendBufferSize());
- }
-
- if (config.getTrafficClass() != null) {
- socket.setTrafficClass(config.getTrafficClass());
- }
-
- if (config.getKeepAlive() != null) {
- socket.setKeepAlive(config.getKeepAlive());
- }
-
- if (config.getOobInline() != null) {
- socket.setOOBInline(config.getOobInline());
- }
-
- if (config.getTcpNoDelay() != null) {
- socket.setTcpNoDelay(config.getTcpNoDelay());
- }
-
- return socket;
- }
-
- public static ServerSocket createServerSocket(final int port, final ServerSocketConfiguration config) throws IOException, KeyManagementException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, CertificateException {
- if (config == null) {
- throw new NullPointerException("Configuration may not be null.");
- }
-
- final SSLContext sslContext = config.createSSLContext();
- final ServerSocket serverSocket;
- if (sslContext == null) {
- serverSocket = new ServerSocket(port);
- } else {
- serverSocket = sslContext.getServerSocketFactory().createServerSocket(port);
- ((SSLServerSocket) serverSocket).setNeedClientAuth(config.getNeedClientAuth());
- }
-
- if (config.getSocketTimeout() != null) {
- serverSocket.setSoTimeout(config.getSocketTimeout());
- }
-
- if (config.getReuseAddress() != null) {
- serverSocket.setReuseAddress(config.getReuseAddress());
- }
-
- if (config.getReceiveBufferSize() != null) {
- serverSocket.setReceiveBufferSize(config.getReceiveBufferSize());
- }
-
- return serverSocket;
- }
-
- public static void closeQuietly(final Socket socket) {
- if (socket == null) {
- return;
- }
-
- try {
- try {
- // can't shudown input/output individually with secure sockets
- if ((socket instanceof SSLSocket) == false) {
- if (socket.isInputShutdown() == false) {
- socket.shutdownInput();
- }
- if (socket.isOutputShutdown() == false) {
- socket.shutdownOutput();
- }
- }
- } finally {
- if (socket.isClosed() == false) {
- socket.close();
- }
- }
- } catch (final Exception ex) {
- logger.debug("Failed to close socket due to: " + ex, ex);
- }
- }
-
- public static void closeQuietly(final ServerSocket serverSocket) {
- if (serverSocket == null) {
- return;
- }
-
- try {
- serverSocket.close();
- } catch (final Exception ex) {
- logger.debug("Failed to close server socket due to: " + ex, ex);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java
deleted file mode 100644
index 7a62813..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-import java.net.InetSocketAddress;
-
-/**
- * A service that may be discovered at runtime. A service is defined as having a
- * unique case-sensitive service name and a socket address where it is
- * available.
- *
- * @author unattributed
- */
-public interface DiscoverableService {
-
- /**
- * The service's name. Two services are considered equal if they have the
- * same case sensitive service name.
- *
- * @return the service's name
- */
- String getServiceName();
-
- /**
- * @return the service's address
- */
- InetSocketAddress getServiceAddress();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java
deleted file mode 100644
index 5f378b9..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-import java.net.InetSocketAddress;
-import org.apache.commons.lang3.StringUtils;
-
-/**
- * A basic implementation of the DiscoverableService interface. To services are
- * considered equal if they have the same case-sensitive service name.
- *
- * @author unattributed
- */
-public class DiscoverableServiceImpl implements DiscoverableService {
-
- private final String serviceName;
-
- private final InetSocketAddress serviceAddress;
-
- public DiscoverableServiceImpl(final String serviceName, final InetSocketAddress serviceAddress) {
- if (StringUtils.isBlank(serviceName)) {
- throw new IllegalArgumentException("Service name may not be null or empty.");
- } else if (serviceAddress == null) {
- throw new IllegalArgumentException("Service address may not be null.");
- }
- this.serviceName = serviceName;
- this.serviceAddress = serviceAddress;
- }
-
- @Override
- public InetSocketAddress getServiceAddress() {
- return serviceAddress;
- }
-
- @Override
- public String getServiceName() {
- return serviceName;
- }
-
- @Override
- public String toString() {
- return String.format("[Discoverable Service: %s available at %s:%d]", serviceName, serviceAddress.getHostName(), serviceAddress.getPort());
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null) {
- return false;
- }
- if (!(obj instanceof DiscoverableService)) {
- return false;
- }
- final DiscoverableService other = (DiscoverableService) obj;
- return !((this.serviceName == null) ? (other.getServiceName() != null) : !this.serviceName.equals(other.getServiceName()));
- }
-
- @Override
- public int hashCode() {
- int hash = 5;
- hash = 53 * hash + (this.serviceName != null ? this.serviceName.hashCode() : 0);
- return hash;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java
deleted file mode 100644
index ea0b72a..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-/**
- * @author unattributed
- */
-public final class MulticastConfiguration {
-
- private MulticastTimeToLive ttl = DEFAULT_MULTICAST_TTL;
-
- private Integer socketTimeout;
-
- private Integer receiveBufferSize;
-
- private Integer sendBufferSize;
-
- private Boolean reuseAddress;
-
- private Integer trafficClass;
-
- private Boolean loopbackMode;
-
- public static final MulticastTimeToLive DEFAULT_MULTICAST_TTL = MulticastTimeToLive.SAME_SUBNET;
-
- public MulticastTimeToLive getTtl() {
- return ttl;
- }
-
- public void setTtl(final MulticastTimeToLive ttl) {
- if (ttl == null) {
- throw new NullPointerException("Multicast TTL may not be null.");
- }
- this.ttl = ttl;
- }
-
- public Integer getSocketTimeout() {
- return socketTimeout;
- }
-
- public void setSocketTimeout(Integer socketTimeout) {
- this.socketTimeout = socketTimeout;
- }
-
- public Boolean getReuseAddress() {
- return reuseAddress;
- }
-
- public void setReuseAddress(Boolean reuseAddress) {
- this.reuseAddress = reuseAddress;
- }
-
- public Integer getReceiveBufferSize() {
- return receiveBufferSize;
- }
-
- public void setReceiveBufferSize(Integer receiveBufferSize) {
- this.receiveBufferSize = receiveBufferSize;
- }
-
- public Integer getSendBufferSize() {
- return sendBufferSize;
- }
-
- public void setSendBufferSize(Integer sendBufferSize) {
- this.sendBufferSize = sendBufferSize;
- }
-
- public Integer getTrafficClass() {
- return trafficClass;
- }
-
- public void setTrafficClass(Integer trafficClass) {
- this.trafficClass = trafficClass;
- }
-
- public Boolean getLoopbackMode() {
- return loopbackMode;
- }
-
- public void setLoopbackMode(Boolean loopbackMode) {
- this.loopbackMode = loopbackMode;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java
deleted file mode 100644
index e562c25..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.InetSocketAddress;
-import java.net.MulticastSocket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements a listener for protocol messages sent over multicast. If a message
- * is of type MulticastProtocolMessage, then the underlying protocol message is
- * passed to the handler. If the receiving handler produces a message response,
- * then the message is wrapped with a MulticastProtocolMessage before being sent
- * to the originator.
- *
- * @author unattributed
- */
-public abstract class MulticastListener {
-
- // constants
- private static final int DEFAULT_SHUTDOWN_LISTENER_SECONDS = 5;
- private static final int DEFAULT_MAX_PACKET_SIZE_BYTES = 512;
-
- private static final Logger logger = new org.apache.nifi.logging.NiFiLog(LoggerFactory.getLogger(MulticastListener.class));
-
- // immutable members
- private final int numThreads;
- private final InetSocketAddress multicastAddress;
- private final MulticastConfiguration configuration;
-
- private volatile ExecutorService executorService; // volatile to guarantee most current value is visible
- private volatile MulticastSocket multicastSocket; // volatile to guarantee most current value is visible
-
- private int shutdownListenerSeconds = DEFAULT_SHUTDOWN_LISTENER_SECONDS;
- private int maxPacketSizeBytes = DEFAULT_MAX_PACKET_SIZE_BYTES;
-
- public MulticastListener(
- final int numThreads,
- final InetSocketAddress multicastAddress,
- final MulticastConfiguration configuration) {
-
- if (numThreads <= 0) {
- throw new IllegalArgumentException("Number of threads may not be less than or equal to zero.");
- } else if (multicastAddress == null) {
- throw new IllegalArgumentException("Multicast address may not be null.");
- } else if (multicastAddress.getAddress().isMulticastAddress() == false) {
- throw new IllegalArgumentException("Multicast group must be a Class D address.");
- } else if (configuration == null) {
- throw new IllegalArgumentException("Multicast configuration may not be null.");
- }
-
- this.numThreads = numThreads;
- this.multicastAddress = multicastAddress;
- this.configuration = configuration;
- }
-
- /**
- * Implements the action to perform when a new datagram is received. This
- * class must not close the multicast socket.
- *
- * @param multicastSocket
- * @param packet the datagram socket
- */
- public abstract void dispatchRequest(final MulticastSocket multicastSocket, final DatagramPacket packet);
-
- public void start() throws IOException {
-
- if (isRunning()) {
- return;
- }
-
- multicastSocket = MulticastUtils.createMulticastSocket(multicastAddress.getPort(), configuration);
- multicastSocket.joinGroup(multicastAddress.getAddress());
-
- executorService = Executors.newFixedThreadPool(numThreads);
-
- final ExecutorService runnableExecServiceRef = executorService;
- final MulticastSocket runnableMulticastSocketRef = multicastSocket;
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- while (runnableExecServiceRef.isShutdown() == false) {
- try {
- final byte[] buf = new byte[maxPacketSizeBytes];
- final DatagramPacket packet = new DatagramPacket(buf, maxPacketSizeBytes);
- runnableMulticastSocketRef.receive(packet);
- runnableExecServiceRef.execute(new Runnable() {
- @Override
- public void run() {
- dispatchRequest(multicastSocket, packet);
- }
- });
- } catch (final SocketException | SocketTimeoutException ste) {
- /* ignore so that we can accept connections in approximately a non-blocking fashion */
- } catch (final Exception e) {
- logger.warn("Cluster protocol receiver encountered exception: " + e, e);
- }
- }
- }
- }).start();
- }
-
- public boolean isRunning() {
- return (executorService != null && executorService.isShutdown() == false);
- }
-
- public void stop() throws IOException {
-
- if (isRunning() == false) {
- return;
- }
-
- // shutdown executor service
- try {
- if (getShutdownListenerSeconds() <= 0) {
- executorService.shutdownNow();
- } else {
- executorService.shutdown();
- }
- executorService.awaitTermination(getShutdownListenerSeconds(), TimeUnit.SECONDS);
- } catch (final InterruptedException ex) {
- Thread.currentThread().interrupt();
- } finally {
- if (executorService.isTerminated()) {
- logger.info("Multicast Listener has been terminated successfully.");
- } else {
- logger.warn("Multicast Listener has not terminated properly. There exists an uninterruptable thread that will take an indeterminate amount of time to stop.");
- }
- }
-
- // shutdown server socket
- if (multicastSocket.isClosed() == false) {
- multicastSocket.leaveGroup(multicastAddress.getAddress());
- multicastSocket.close();
- }
-
- }
-
- public int getShutdownListenerSeconds() {
- return shutdownListenerSeconds;
- }
-
- public void setShutdownListenerSeconds(final int shutdownListenerSeconds) {
- this.shutdownListenerSeconds = shutdownListenerSeconds;
- }
-
- public int getMaxPacketSizeBytes() {
- return maxPacketSizeBytes;
- }
-
- public void setMaxPacketSizeBytes(int maxPacketSizeBytes) {
- if (maxPacketSizeBytes <= 0) {
- throw new IllegalArgumentException("Max packet size must be greater than zero bytes.");
- }
- this.maxPacketSizeBytes = maxPacketSizeBytes;
- }
-
- public MulticastConfiguration getConfiguration() {
- return configuration;
- }
-
- public InetSocketAddress getMulticastAddress() {
- return multicastAddress;
- }
-
- public int getNumThreads() {
- return numThreads;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java
deleted file mode 100644
index c254c11..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-import java.net.InetSocketAddress;
-
-/**
- * Defines the interface for discovering services based on name. Services are
- * expected to be exposed via socket address and port.
- *
- * @author unattributed
- */
-public interface MulticastServiceDiscovery extends ServiceDiscovery {
-
- /**
- * @return the multicast address
- */
- InetSocketAddress getMulticastAddress();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java
deleted file mode 100644
index a3cff9b..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-import java.net.InetSocketAddress;
-
-/**
- * Defines the interface for broadcasting a service via multicast.
- *
- * @author unattributed
- */
-public interface MulticastServicesBroadcaster extends ServicesBroadcaster {
-
- /**
- * @return the multicast address
- */
- InetSocketAddress getMulticastAddress();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java
deleted file mode 100644
index dad1173..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-/**
- * @author unattributed
- */
-public enum MulticastTimeToLive {
-
- SAME_HOST(0),
- SAME_SUBNET(1),
- SAME_SITE(32),
- SAME_REGION(64),
- SAME_CONTINENT(128),
- UNRESTRICTED(255);
-
- private final int ttl;
-
- MulticastTimeToLive(final int ttl) {
- this.ttl = ttl;
- }
-
- public int getTtl() {
- return ttl;
- }
-
- public MulticastTimeToLive valueOfByTtl(final int ttl) {
- for (final MulticastTimeToLive value : values()) {
- if (value.getTtl() == ttl) {
- return value;
- }
- }
- return null;
- }
-
-}