You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/22 18:04:28 UTC
[39/79] [abbrv] [partial] incubator-nifi git commit: NIFI-270 Made
all changes identified by adam, mark, joey to prep for a cleaner build
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java
deleted file mode 100644
index 8a8b7c0..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.MulticastSocket;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author unattributed
- */
-public final class MulticastUtils {
-
- private static final Logger logger = new org.apache.nifi.logging.NiFiLog(LoggerFactory.getLogger(MulticastUtils.class));
-
- public static MulticastSocket createMulticastSocket(final MulticastConfiguration config) throws IOException {
- return createMulticastSocket(0, config);
- }
-
- public static MulticastSocket createMulticastSocket(final int port, final MulticastConfiguration config) throws IOException {
- if (config == null) {
- throw new IllegalArgumentException("Configuration may not be null.");
- }
-
- final MulticastSocket socket;
- if (port <= 0) {
- socket = new MulticastSocket();
- } else {
- socket = new MulticastSocket(port);
- }
- socket.setTimeToLive(config.getTtl().getTtl());
-
- if (config.getSocketTimeout() != null) {
- socket.setSoTimeout(config.getSocketTimeout());
- }
-
- if (config.getReuseAddress() != null) {
- socket.setReuseAddress(config.getReuseAddress());
- }
-
- if (config.getReceiveBufferSize() != null) {
- socket.setReceiveBufferSize(config.getReceiveBufferSize());
- }
-
- if (config.getSendBufferSize() != null) {
- socket.setSendBufferSize(config.getSendBufferSize());
- }
-
- if (config.getTrafficClass() != null) {
- socket.setTrafficClass(config.getTrafficClass());
- }
-
- if (config.getLoopbackMode() != null) {
- socket.setLoopbackMode(config.getLoopbackMode());
- }
-
- return socket;
- }
-
- public static void closeQuietly(final MulticastSocket socket) {
-
- if (socket == null) {
- return;
- }
-
- try {
- socket.close();
- } catch (final Exception ex) {
- logger.debug("Failed to close multicast socket due to: " + ex, ex);
- }
-
- }
-
- public static void closeQuietly(final MulticastSocket socket, final InetAddress groupAddress) {
-
- if (socket == null) {
- return;
- }
-
- try {
- socket.leaveGroup(groupAddress);
- } catch (final Exception ex) {
- logger.debug("Failed to leave multicast group due to: " + ex, ex);
- }
-
- try {
- socket.close();
- } catch (final Exception ex) {
- logger.debug("Failed to close multicast socket due to: " + ex, ex);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java
deleted file mode 100644
index 173146e..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-/**
- * Defines a generic interface for discovering services.
- *
- * @author unattributed
- */
-public interface ServiceDiscovery {
-
- /**
- * @return the discovered service
- */
- DiscoverableService getService();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java
deleted file mode 100644
index 86260d8..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-import java.util.Set;
-
-/**
- * Defines the interface for broadcasting a collection of services for client
- * discovery.
- *
- * @author unattributed
- */
-public interface ServicesBroadcaster {
-
- /**
- * @return the delay in milliseconds to wait between successive broadcasts
- */
- int getBroadcastDelayMs();
-
- /**
- * @return the broadcasted services
- */
- Set<DiscoverableService> getServices();
-
- /**
- * Adds the given service to the set of broadcasted services.
- *
- * @param service a service
- * @return true if the service was added to the set; false a service with
- * the given service name already exists in the set.
- */
- boolean addService(DiscoverableService service);
-
- /**
- * Removes the service with the given service name from the set.
- *
- * @param serviceName a service name
- * @return true if the service was removed; false otherwise
- */
- boolean removeService(String serviceName);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java b/nifi/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java
deleted file mode 100644
index b5240c9..0000000
--- a/nifi/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio.example;
-
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.io.nio.BufferPool;
-import org.apache.nifi.io.nio.ChannelListener;
-import org.apache.nifi.io.nio.consumer.StreamConsumer;
-import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * @author none
- */
-public final class ServerMain {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ServerMain.class);
-
- public static void main(final String[] args) throws IOException {
- System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
- System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
-
- final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
- final Map<StreamConsumer, ScheduledFuture<?>> consumerMap = new ConcurrentHashMap<>();
- final BufferPool bufferPool = new BufferPool(10, 5 << 20, false, 40.0);
- ChannelListener listener = null;
- try {
- executor.scheduleWithFixedDelay(bufferPool, 0L, 5L, TimeUnit.SECONDS);
- listener = new ChannelListener(5, new ExampleStreamConsumerFactory(executor, consumerMap), bufferPool, 5, TimeUnit.MILLISECONDS);
- listener.setChannelReaderSchedulingPeriod(50L, TimeUnit.MILLISECONDS);
- listener.addDatagramChannel(null, 20000, 32 << 20);
- LOGGER.info("Listening for UDP data on port 20000");
- listener.addServerSocket(null, 20001, 64 << 20);
- LOGGER.info("listening for TCP connections on port 20001");
- listener.addServerSocket(null, 20002, 64 << 20);
- LOGGER.info("listening for TCP connections on port 20002");
- final Calendar endTime = Calendar.getInstance();
- endTime.add(Calendar.MINUTE, 30);
- while (true) {
- processAllConsumers(consumerMap);
- if (endTime.before(Calendar.getInstance())) {
- break; // time to shut down
- }
- }
- } finally {
- if (listener != null) {
- LOGGER.info("Shutting down server....");
- listener.shutdown(1L, TimeUnit.SECONDS);
- LOGGER.info("Consumer map size = " + consumerMap.size());
- while (consumerMap.size() > 0) {
- processAllConsumers(consumerMap);
- }
- LOGGER.info("Consumer map size = " + consumerMap.size());
- }
- executor.shutdown();
- }
- }
-
- private static void processAllConsumers(final Map<StreamConsumer, ScheduledFuture<?>> consumerMap) {
- final Set<StreamConsumer> deadConsumers = new HashSet<>();
- for (final Map.Entry<StreamConsumer, ScheduledFuture<?>> entry : consumerMap.entrySet()) {
- if (entry.getKey().isConsumerFinished()) {
- entry.getValue().cancel(true);
- deadConsumers.add(entry.getKey());
- }
- }
- for (final StreamConsumer consumer : deadConsumers) {
- LOGGER.debug("removing consumer " + consumer);
- consumerMap.remove(consumer);
- }
- }
-
- public static final class ConsumerRunner implements Runnable {
-
- private final StreamConsumer consumer;
-
- public ConsumerRunner(final StreamConsumer consumer) {
- this.consumer = consumer;
- }
-
- @Override
- public void run() {
- if (consumer.isConsumerFinished()) {
- return;
- }
- try {
- consumer.process();
- } catch (IOException ex) {
- LOGGER.error("", ex);
- }
- }
- }
-
- public static final class ExampleStreamConsumerFactory implements StreamConsumerFactory {
-
- final ScheduledExecutorService executor;
- final Map<StreamConsumer, ScheduledFuture<?>> consumerMap;
-
- public ExampleStreamConsumerFactory(final ScheduledExecutorService executor, final Map<StreamConsumer, ScheduledFuture<?>> consumerMap) {
- this.executor = executor;
- this.consumerMap = consumerMap;
- }
-
- @Override
- public StreamConsumer newInstance(final String streamId) {
- final StreamConsumer consumer = new UselessStreamConsumer(streamId);
- final ScheduledFuture<?> future = executor.scheduleWithFixedDelay(new ConsumerRunner(consumer), 0L, 10L, TimeUnit.MILLISECONDS);
- consumerMap.put(consumer, future);
- LOGGER.info("Added consumer: " + consumer);
- return consumer;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java b/nifi/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java
deleted file mode 100644
index b3d214e..0000000
--- a/nifi/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio.example;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.net.SocketException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author none
- */
-public class TCPClient {
-
- private static final Logger logger = LoggerFactory.getLogger(TCPClient.class);
-
- public static void main(final String[] args) throws Exception {
- final byte[] bytes = TCPClient.makeBytes();
- Thread first = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- for (int i = 0; i < 10; i++) {
- sendData(20001, bytes);
- }
- } catch (Exception e) {
- logger.error("Blew exception", e);
- }
- }
- });
- Thread second = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- for (int i = 0; i < 10; i++) {
- sendData(20002, bytes);
- }
- } catch (Exception e) {
- logger.error("Blew exception", e);
- }
- }
- });
- first.start();
- second.start();
- }
-
- public static byte[] makeBytes() {
- byte[] bytes = new byte[2 << 20];
- return bytes;
- }
-
- private static void sendData(final int port, final byte[] bytes) throws SocketException, IOException, InterruptedException {
- long totalBytes;
- try (Socket sock = new Socket("localhost", port)) {
- sock.setTcpNoDelay(true);
- sock.setSoTimeout(2000);
- totalBytes = 0L;
- logger.info("socket established " + sock + " to port " + port + " now waiting 5 seconds to send anything...");
- Thread.sleep(5000L);
- for (int i = 0; i < 1000; i++) {
- sock.getOutputStream().write(bytes);
- totalBytes += bytes.length;
- } sock.getOutputStream().flush();
- }
- logger.info("Total bytes sent: " + totalBytes + " to port " + port);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java b/nifi/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java
deleted file mode 100644
index 90f4c42..0000000
--- a/nifi/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio.example;
-
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetSocketAddress;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author none
- */
-public class UDPClient {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(UDPClient.class);
-
- public static void main(final String[] args) throws Exception {
- final byte[] buffer = UDPClient.makeBytes();
- final DatagramPacket packet = new DatagramPacket(buffer, buffer.length, new InetSocketAddress("localhost", 20000));
- final DatagramSocket socket = new DatagramSocket();
- final long startTime = System.nanoTime();
- for (int i = 0; i < 819200; i++) { // 100 MB
- socket.send(packet);
- }
- final long endTime = System.nanoTime();
- final long durationMillis = (endTime - startTime) / 1000000;
- LOGGER.info("Sent all UDP packets without any obvious errors | duration ms= " + durationMillis);
- }
-
- public static byte[] makeBytes() {
- byte[] bytes = new byte[128];
- return bytes;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java b/nifi/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java
deleted file mode 100644
index 9ec26e9..0000000
--- a/nifi/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio.example;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.nifi.io.nio.consumer.AbstractStreamConsumer;
-
-/**
- *
- * @author none
- */
-public class UselessStreamConsumer extends AbstractStreamConsumer {
-
- public UselessStreamConsumer(final String id) {
- super(id);
- }
-
- @Override
- protected void processBuffer(final ByteBuffer buffer) throws IOException {
- }
-
- @Override
- protected void onConsumerDone() {
- System.err.println("IN consumer done");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/test/resources/log4j.xml b/nifi/commons/nifi-socket-utils/src/test/resources/log4j.xml
deleted file mode 100644
index 8e93769..0000000
--- a/nifi/commons/nifi-socket-utils/src/test/resources/log4j.xml
+++ /dev/null
@@ -1,36 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
-
-<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
-
- <!-- Appender for printing formatted log statements to the console. -->
- <appender name="console" class="org.apache.log4j.ConsoleAppender">
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %-5p [%t] %40.40c - %m%n"/>
- </layout>
- </appender>
-
- <!-- Logger for managing logging statements for nifi -->
- <logger name="nifi">
- <level value="debug"/>
- </logger>
-
- <root>
- <level value="warn"/>
- <appender-ref ref="console"/>
- </root>
-</log4j:configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/.gitignore
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/.gitignore b/nifi/commons/nifi-utils/.gitignore
deleted file mode 100755
index 12c5231..0000000
--- a/nifi/commons/nifi-utils/.gitignore
+++ /dev/null
@@ -1,8 +0,0 @@
-/target
-/target
-/target
-/target
-/target
-/target
-/target
-/target
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/pom.xml b/nifi/commons/nifi-utils/pom.xml
deleted file mode 100644
index 7f2dc42..0000000
--- a/nifi/commons/nifi-utils/pom.xml
+++ /dev/null
@@ -1,33 +0,0 @@
-<?xml version="1.0"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-commons-parent</artifactId>
- <version>0.0.1-incubating-SNAPSHOT</version>
- </parent>
-
- <artifactId>nifi-utils</artifactId>
- <version>0.0.1-incubating-SNAPSHOT</version>
- <packaging>jar</packaging>
- <name>NiFi Utils</name>
- <!--
- This project intentionally has no additional dependencies beyond that pulled in by the parent. It is a general purpose utility library
- and should keep its surface/tension minimal.
- -->
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java
deleted file mode 100644
index 24f43ca..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.flowfile.attributes;
-
-public enum CoreAttributes implements FlowFileAttributeKey {
- /**
- * The flowfile's path indicates the relative directory to which a FlowFile belongs and does not
- * contain the filename
- */
- PATH("path"),
-
- /**
- * The flowfile's absolute path indicates the absolute directory to which a FlowFile belongs and does not
- * contain the filename
- */
- ABSOLUTE_PATH("absolute.path"),
-
- /**
- * The filename of the FlowFile. The filename should not contain any directory structure.
- */
- FILENAME("filename"),
-
- /**
- * A unique UUID assigned to this FlowFile
- */
- UUID("uuid"),
-
- /**
- * A numeric value indicating the FlowFile priority
- */
- PRIORITY("priority"),
-
- /**
- * The MIME Type of this FlowFile
- */
- MIME_TYPE("mime.type"),
-
- /**
- * Specifies the reason that a FlowFile is being discarded
- */
- DISCARD_REASON("discard.reason"),
-
- /**
- * Indicates an identifier other than the FlowFile's UUID that is known to refer to this FlowFile.
- */
- ALTERNATE_IDENTIFIER("alternate.identifier");
-
- private final String key;
- private CoreAttributes(final String key) {
- this.key = key;
- }
-
- @Override
- public String key() {
- return key;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java
deleted file mode 100644
index cc6c28e..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.flowfile.attributes;
-
-public interface FlowFileAttributeKey {
- String key();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java
deleted file mode 100644
index 77c34c9..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-
-public class StandardVersionNegotiator implements VersionNegotiator {
-
- private final List<Integer> versions;
- private int curVersion;
-
- public StandardVersionNegotiator(final int... supportedVersions) {
- if (Objects.requireNonNull(supportedVersions).length == 0) {
- throw new IllegalArgumentException("At least one version must be supported");
- }
-
- final List<Integer> supported = new ArrayList<>();
- for (final int version : supportedVersions) {
- supported.add(version);
- }
- this.versions = Collections.unmodifiableList(supported);
- this.curVersion = supportedVersions[0];
- }
-
- @Override
- public int getVersion() {
- return curVersion;
- }
-
- @Override
- public void setVersion(final int version) throws IllegalArgumentException {
- if (!isVersionSupported(version)) {
- throw new IllegalArgumentException("Version " + version + " is not supported");
- }
-
- this.curVersion = version;
- }
-
- @Override
- public int getPreferredVersion() {
- return versions.get(0);
- }
-
- @Override
- public Integer getPreferredVersion(final int maxVersion) {
- for (final Integer version : this.versions) {
- if (maxVersion >= version) {
- return version;
- }
- }
- return null;
- }
-
- @Override
- public boolean isVersionSupported(final int version) {
- return versions.contains(version);
- }
-
- @Override
- public List<Integer> getSupportedVersions() {
- return versions;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java
deleted file mode 100644
index 74f9b3d..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-import java.util.List;
-
-public interface VersionNegotiator {
-
- /**
- * @return the currently configured Version of this resource
- */
- int getVersion();
-
- /**
- * Sets the version of this resource to the specified version. Only the
- * lower byte of the version is relevant.
- *
- * @param version
- * @throws IllegalArgumentException if the given Version is not supported by
- * this resource, as is indicated by the {@link #isVersionSupported(int)}
- * method
- */
- void setVersion(int version) throws IllegalArgumentException;
-
- /**
- *
- * @return the Version of this resource that is preferred
- */
- int getPreferredVersion();
-
- /**
- * Gets the preferred version of this resource that is no greater than the
- * given maxVersion. If no acceptable version exists that is less than
- * <code>maxVersion</code>, then <code>null</code> is returned
- *
- * @param maxVersion
- * @return
- */
- Integer getPreferredVersion(int maxVersion);
-
- /**
- * Indicates whether or not the specified version is supported by this
- * resource
- *
- * @param version
- * @return
- */
- boolean isVersionSupported(int version);
-
- List<Integer> getSupportedVersions();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java
deleted file mode 100644
index 05fd915..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-/**
- * Indicates that the user disabled transmission while communications were
- * taking place with a peer
- */
-public class TransmissionDisabledException extends RuntimeException {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
deleted file mode 100644
index 71cf894..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.zip.DataFormatException;
-import java.util.zip.Inflater;
-
-public class CompressionInputStream extends InputStream {
-
- private final InputStream in;
- private final Inflater inflater;
-
- private byte[] compressedBuffer;
- private byte[] buffer;
-
- private int bufferIndex;
- private boolean eos = false; // whether or not we've reached the end of stream
- private boolean allDataRead = false; // different from eos b/c eos means allDataRead == true && buffer is empty
-
- private final byte[] fourByteBuffer = new byte[4];
-
- public CompressionInputStream(final InputStream in) {
- this.in = in;
- inflater = new Inflater();
-
- buffer = new byte[0];
- compressedBuffer = new byte[0];
- bufferIndex = 1;
- }
-
- private String toHex(final byte[] array) {
- final StringBuilder sb = new StringBuilder("0x");
- for (final byte b : array) {
- final String hex = Integer.toHexString(b).toUpperCase();
- if (hex.length() == 1) {
- sb.append("0");
- }
- sb.append(hex);
- }
- return sb.toString();
- }
-
- protected void readChunkHeader() throws IOException {
- // Ensure that we have a valid SYNC chunk
- fillBuffer(fourByteBuffer);
- if (!Arrays.equals(CompressionOutputStream.SYNC_BYTES, fourByteBuffer)) {
- throw new IOException("Invalid CompressionInputStream. Expected first 4 bytes to be 'SYNC' but were " + toHex(fourByteBuffer));
- }
-
- // determine the size of the decompressed buffer
- fillBuffer(fourByteBuffer);
- buffer = new byte[toInt(fourByteBuffer)];
-
- // determine the size of the compressed buffer
- fillBuffer(fourByteBuffer);
- compressedBuffer = new byte[toInt(fourByteBuffer)];
-
- bufferIndex = buffer.length; // indicate that buffer is empty
- }
-
- private int toInt(final byte[] data) {
- return ((data[0] & 0xFF) << 24)
- | ((data[1] & 0xFF) << 16)
- | ((data[2] & 0xFF) << 8)
- | (data[3] & 0xFF);
- }
-
- protected void bufferAndDecompress() throws IOException {
- if (allDataRead) {
- eos = true;
- return;
- }
-
- readChunkHeader();
- fillBuffer(compressedBuffer);
-
- inflater.setInput(compressedBuffer);
- try {
- inflater.inflate(buffer);
- } catch (final DataFormatException e) {
- throw new IOException(e);
- }
- inflater.reset();
-
- bufferIndex = 0;
- final int moreDataByte = in.read();
- if (moreDataByte < 1) {
- allDataRead = true;
- } else if (moreDataByte > 1) {
- throw new IOException("Expected indicator of whether or not more data was to come (-1, 0, or 1) but got " + moreDataByte);
- }
- }
-
- private void fillBuffer(final byte[] buffer) throws IOException {
- int len;
- int bytesLeft = buffer.length;
- int bytesRead = 0;
- while (bytesLeft > 0 && (len = in.read(buffer, bytesRead, bytesLeft)) > 0) {
- bytesLeft -= len;
- bytesRead += len;
- }
-
- if (bytesRead < buffer.length) {
- throw new EOFException();
- }
- }
-
- private boolean isBufferEmpty() {
- return bufferIndex >= buffer.length;
- }
-
- @Override
- public int read() throws IOException {
- if (eos) {
- return -1;
- }
-
- if (isBufferEmpty()) {
- bufferAndDecompress();
- }
-
- if (isBufferEmpty()) {
- eos = true;
- return -1;
- }
-
- return buffer[bufferIndex++];
- }
-
- @Override
- public int read(final byte[] b) throws IOException {
- return read(b, 0, b.length);
- }
-
- @Override
- public int read(final byte[] b, final int off, final int len) throws IOException {
- if (eos) {
- return -1;
- }
-
- if (isBufferEmpty()) {
- bufferAndDecompress();
- }
-
- if (isBufferEmpty()) {
- eos = true;
- return -1;
- }
-
- final int free = buffer.length - bufferIndex;
- final int bytesToTransfer = Math.min(len, free);
- System.arraycopy(buffer, bufferIndex, b, off, bytesToTransfer);
- bufferIndex += bytesToTransfer;
-
- return bytesToTransfer;
- }
-
- /**
- * Does nothing. Does NOT close underlying InputStream
- * @throws java.io.IOException
- */
- @Override
- public void close() throws IOException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
deleted file mode 100644
index bc46b0f..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.zip.Deflater;
-
-public class CompressionOutputStream extends OutputStream {
-
- public static final byte[] SYNC_BYTES = new byte[]{'S', 'Y', 'N', 'C'};
-
- public static final int DEFAULT_COMPRESSION_LEVEL = 1;
- public static final int DEFAULT_BUFFER_SIZE = 64 << 10;
- public static final int MIN_BUFFER_SIZE = 8 << 10;
-
- private final OutputStream out;
- private final Deflater deflater;
-
- private final byte[] buffer;
- private final byte[] compressed;
-
- private int bufferIndex = 0;
- private boolean dataWritten = false;
-
- public CompressionOutputStream(final OutputStream outStream) {
- this(outStream, DEFAULT_BUFFER_SIZE);
- }
-
- public CompressionOutputStream(final OutputStream outStream, final int bufferSize) {
- this(outStream, bufferSize, DEFAULT_COMPRESSION_LEVEL, Deflater.DEFAULT_STRATEGY);
- }
-
- public CompressionOutputStream(final OutputStream outStream, final int bufferSize, final int level, final int strategy) {
- if (bufferSize < MIN_BUFFER_SIZE) {
- throw new IllegalArgumentException("Buffer size must be at least " + MIN_BUFFER_SIZE);
- }
-
- this.out = outStream;
- this.deflater = new Deflater(level);
- this.deflater.setStrategy(strategy);
- buffer = new byte[bufferSize];
- compressed = new byte[bufferSize + 64];
- }
-
- /**
- * Compresses the currently buffered chunk of data and sends it to the
- * output stream
- *
- * @throws IOException
- */
- protected void compressAndWrite() throws IOException {
- if (bufferIndex <= 0) {
- return;
- }
-
- deflater.setInput(buffer, 0, bufferIndex);
- deflater.finish();
- final int compressedBytes = deflater.deflate(compressed);
-
- writeChunkHeader(compressedBytes);
- out.write(compressed, 0, compressedBytes);
-
- bufferIndex = 0;
- deflater.reset();
- }
-
- private void writeChunkHeader(final int compressedBytes) throws IOException {
- // If we have already written data, write out a '1' to indicate that we have more data; when we close
- // the stream, we instead write a '0' to indicate that we are finished sending data.
- if (dataWritten) {
- out.write(1);
- }
- out.write(SYNC_BYTES);
- dataWritten = true;
-
- writeInt(out, bufferIndex);
- writeInt(out, compressedBytes);
- }
-
- private void writeInt(final OutputStream out, final int val) throws IOException {
- out.write(val >>> 24);
- out.write(val >>> 16);
- out.write(val >>> 8);
- out.write(val);
- }
-
- protected boolean bufferFull() {
- return bufferIndex >= buffer.length;
- }
-
- @Override
- public void write(final int b) throws IOException {
- buffer[bufferIndex++] = (byte) (b & 0xFF);
- if (bufferFull()) {
- compressAndWrite();
- }
- }
-
- @Override
- public void write(final byte[] b) throws IOException {
- write(b, 0, b.length);
- }
-
- @Override
- public void write(final byte[] b, final int off, final int len) throws IOException {
- int bytesLeft = len;
- while (bytesLeft > 0) {
- final int free = buffer.length - bufferIndex;
- final int bytesThisIteration = Math.min(bytesLeft, free);
- System.arraycopy(b, off + len - bytesLeft, buffer, bufferIndex, bytesThisIteration);
- bufferIndex += bytesThisIteration;
-
- bytesLeft -= bytesThisIteration;
- if (bufferFull()) {
- compressAndWrite();
- }
- }
- }
-
- @Override
- public void flush() throws IOException {
- compressAndWrite();
- super.flush();
- }
-
- @Override
- public void close() throws IOException {
- compressAndWrite();
- out.write(0); // indicate that the stream is finished.
- out.flush();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
deleted file mode 100644
index e03dfbf..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
-
-public class InterruptableInputStream extends InputStream {
-
- private volatile boolean interrupted = false;
- private final InputStream in;
-
- public InterruptableInputStream(final InputStream in) {
- this.in = in;
- }
-
- @Override
- public int read() throws IOException {
- if (interrupted) {
- throw new TransmissionDisabledException();
- }
-
- return in.read();
- }
-
- @Override
- public int read(byte[] b) throws IOException {
- if (interrupted) {
- throw new TransmissionDisabledException();
- }
-
- return in.read(b);
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- if (interrupted) {
- throw new TransmissionDisabledException();
- }
-
- return in.read(b, off, len);
- }
-
- @Override
- public int available() throws IOException {
- if (interrupted) {
- throw new TransmissionDisabledException();
- }
-
- return in.available();
- }
-
- @Override
- public void close() throws IOException {
- if (interrupted) {
- throw new TransmissionDisabledException();
- }
-
- in.close();
- }
-
- @Override
- public synchronized void mark(int readlimit) {
- if (interrupted) {
- throw new TransmissionDisabledException();
- }
-
- in.mark(readlimit);
- }
-
- @Override
- public boolean markSupported() {
- if (interrupted) {
- throw new TransmissionDisabledException();
- }
-
- return in.markSupported();
- }
-
- @Override
- public synchronized void reset() throws IOException {
- if (interrupted) {
- throw new TransmissionDisabledException();
- }
-
- in.reset();
- }
-
- @Override
- public long skip(long n) throws IOException {
- if (interrupted) {
- throw new TransmissionDisabledException();
- }
-
- return in.skip(n);
- }
-
- public void interrupt() {
- interrupted = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
deleted file mode 100644
index cba5be6..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
-
-public class InterruptableOutputStream extends OutputStream {
-
- private final OutputStream out;
- private volatile boolean interrupted = false;
-
- public InterruptableOutputStream(final OutputStream out) {
- this.out = out;
- }
-
- @Override
- public void write(int b) throws IOException {
- if (interrupted) {
- throw new TransmissionDisabledException();
- }
-
- out.write(b);
- }
-
- @Override
- public void write(byte[] b) throws IOException {
- if (interrupted) {
- throw new TransmissionDisabledException();
- }
-
- out.write(b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- if (interrupted) {
- throw new TransmissionDisabledException();
- }
-
- out.write(b, off, len);
- }
-
- @Override
- public void close() throws IOException {
- if (interrupted) {
- throw new TransmissionDisabledException();
- }
-
- out.close();
- }
-
- @Override
- public void flush() throws IOException {
- if (interrupted) {
- throw new TransmissionDisabledException();
- }
-
- out.flush();
- }
-
- public void interrupt() {
- this.interrupted = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
deleted file mode 100644
index 68913bd..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket;
-
-import java.nio.ByteBuffer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BufferStateManager {
-
- private static final Logger logger = LoggerFactory.getLogger(BufferStateManager.class);
-
- private ByteBuffer buffer;
- private Direction direction = Direction.WRITE;
-
- public BufferStateManager(final ByteBuffer buffer) {
- this.buffer = buffer;
- }
-
- public BufferStateManager(final ByteBuffer buffer, final Direction direction) {
- this.buffer = buffer;
- this.direction = direction;
- }
-
- /**
- * Ensures that the buffer is at least as big as the size specified,
- * resizing the buffer if necessary. This operation MAY change the direction
- * of the buffer.
- *
- * @param requiredSize
- */
- public void ensureSize(final int requiredSize) {
- if (buffer.capacity() < requiredSize) {
- final ByteBuffer newBuffer = ByteBuffer.allocate(requiredSize);
-
- // we have to read buffer so make sure the direction is correct.
- if (direction == Direction.WRITE) {
- buffer.flip();
- }
-
- // Copy from buffer to newBuffer
- newBuffer.put(buffer);
-
- // Swap the buffers
- buffer = newBuffer;
-
- // the new buffer is ready to be written to
- direction = Direction.WRITE;
- }
- }
-
- public ByteBuffer prepareForWrite(final int requiredSize) {
- ensureSize(requiredSize);
-
- if (direction == Direction.READ) {
- direction = Direction.WRITE;
- buffer.position(buffer.limit());
- }
-
- buffer.limit(buffer.capacity());
- return buffer;
- }
-
- public ByteBuffer prepareForRead(final int requiredSize) {
- ensureSize(requiredSize);
-
- if (direction == Direction.WRITE) {
- direction = Direction.READ;
- buffer.flip();
- }
-
- return buffer;
- }
-
- /**
- * Clears the contents of the buffer and sets direction to WRITE
- */
- public void clear() {
- logger.debug("Clearing {}", buffer);
- buffer.clear();
- direction = Direction.WRITE;
- }
-
- public void compact() {
- final String before = buffer.toString();
- buffer.compact();
- logger.debug("Before compact: {}, after: {}", before, buffer);
- direction = Direction.WRITE;
- }
-
- public static enum Direction {
-
- READ, WRITE;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
deleted file mode 100644
index 32a3f26..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.SocketChannel;
-import java.util.concurrent.TimeUnit;
-
-public class SocketChannelInputStream extends InputStream {
-
- private static final long CHANNEL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
- private final SocketChannel channel;
- private volatile int timeoutMillis = 30000;
-
- private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
- private Byte bufferedByte = null;
-
- public SocketChannelInputStream(final SocketChannel socketChannel) throws IOException {
- // this class expects a non-blocking channel
- socketChannel.configureBlocking(false);
- this.channel = socketChannel;
- }
-
- public void setTimeout(final int timeoutMillis) {
- this.timeoutMillis = timeoutMillis;
- }
-
- @Override
- public int read() throws IOException {
- if (bufferedByte != null) {
- final int retVal = bufferedByte & 0xFF;
- bufferedByte = null;
- return retVal;
- }
-
- oneByteBuffer.flip();
- oneByteBuffer.clear();
-
- final long maxTime = System.currentTimeMillis() + timeoutMillis;
- int bytesRead;
- do {
- bytesRead = channel.read(oneByteBuffer);
- if (bytesRead == 0) {
- if (System.currentTimeMillis() > maxTime) {
- throw new SocketTimeoutException("Timed out reading from socket");
- }
- try {
- TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS);
- } catch (InterruptedException e) {
- close();
- Thread.currentThread().interrupt(); // set the interrupt status
- throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation
- }
- }
- } while (bytesRead == 0);
-
- if (bytesRead == -1) {
- return -1;
- }
- oneByteBuffer.flip();
- return oneByteBuffer.get() & 0xFF;
- }
-
- @Override
- public int read(final byte[] b) throws IOException {
- return read(b, 0, b.length);
- }
-
- @Override
- public int read(final byte[] b, final int off, final int len) throws IOException {
- if (bufferedByte != null) {
- final byte retVal = bufferedByte;
- bufferedByte = null;
- b[off] = retVal;
- return 1;
- }
-
- final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
-
- final long maxTime = System.currentTimeMillis() + timeoutMillis;
- int bytesRead;
- do {
- bytesRead = channel.read(buffer);
- if (bytesRead == 0) {
- if (System.currentTimeMillis() > maxTime) {
- throw new SocketTimeoutException("Timed out reading from socket");
- }
- try {
- TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS);
- } catch (InterruptedException e) {
- close();
- Thread.currentThread().interrupt(); // set the interrupt status
- throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation
- }
- }
- } while (bytesRead == 0);
-
- return bytesRead;
- }
-
- @Override
- public int available() throws IOException {
- if (bufferedByte != null) {
- return 1;
- }
-
- isDataAvailable(); // attempt to read from socket
- return (bufferedByte == null) ? 0 : 1;
- }
-
- public boolean isDataAvailable() throws IOException {
- if (bufferedByte != null) {
- return true;
- }
-
- oneByteBuffer.flip();
- oneByteBuffer.clear();
- final int bytesRead = channel.read(oneByteBuffer);
- if (bytesRead == -1) {
- throw new EOFException("Peer has closed the stream");
- }
- if (bytesRead > 0) {
- oneByteBuffer.flip();
- bufferedByte = oneByteBuffer.get();
- return true;
- }
- return false;
- }
-
- /**
- * Closes the underlying socket channel.
- * @throws java.io.IOException
- */
- @Override
- public void close() throws IOException {
- channel.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
deleted file mode 100644
index 77049ad..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.SocketChannel;
-import java.util.concurrent.TimeUnit;
-
-public class SocketChannelOutputStream extends OutputStream {
-
- private static final long CHANNEL_FULL_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
- private final SocketChannel channel;
- private volatile int timeout = 30000;
-
- private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
-
- public SocketChannelOutputStream(final SocketChannel socketChannel) throws IOException {
- // this class expects a non-blocking channel
- socketChannel.configureBlocking(false);
- this.channel = socketChannel;
- }
-
- public void setTimeout(final int timeoutMillis) {
- this.timeout = timeoutMillis;
- }
-
- @Override
- public void write(final int b) throws IOException {
- oneByteBuffer.flip();
- oneByteBuffer.clear();
- oneByteBuffer.put((byte) b);
- oneByteBuffer.flip();
-
- final int timeoutMillis = this.timeout;
- long maxTime = System.currentTimeMillis() + timeoutMillis;
- int bytesWritten;
- while (oneByteBuffer.hasRemaining()) {
- bytesWritten = channel.write(oneByteBuffer);
- if (bytesWritten == 0) {
- if (System.currentTimeMillis() > maxTime) {
- throw new SocketTimeoutException("Timed out writing to socket");
- }
- try {
- TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS);
- } catch (InterruptedException e) {
- close();
- Thread.currentThread().interrupt(); // set the interrupt status
- throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
- }
- } else {
- return;
- }
- }
- }
-
- @Override
- public void write(final byte[] b) throws IOException {
- write(b, 0, b.length);
- }
-
- @Override
- public void write(final byte[] b, final int off, final int len) throws IOException {
- final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
-
- final int timeoutMillis = this.timeout;
- long maxTime = System.currentTimeMillis() + timeoutMillis;
- int bytesWritten;
- while (buffer.hasRemaining()) {
- bytesWritten = channel.write(buffer);
- if (bytesWritten == 0) {
- if (System.currentTimeMillis() > maxTime) {
- throw new SocketTimeoutException("Timed out writing to socket");
- }
- try {
- TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS);
- } catch (InterruptedException e) {
- close();
- Thread.currentThread().interrupt(); // set the interrupt status
- throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
- }
- } else {
- maxTime = System.currentTimeMillis() + timeoutMillis;
- }
- }
- }
-
- /**
- * Closes the underlying SocketChannel
- * @throws java.io.IOException
- */
- @Override
- public void close() throws IOException {
- channel.close();
- }
-}