You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/04/22 05:39:49 UTC
[5/7] incubator-nifi git commit: NIFI-271 checkpoint
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 1b5412c..36d8bac 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -91,21 +91,22 @@ import org.slf4j.LoggerFactory;
import org.slf4j.helpers.MessageFormatter;
public class EndpointConnectionPool {
+
public static final long PEER_REFRESH_PERIOD = 60000L;
public static final String CATEGORY = "Site-to-Site";
public static final long REMOTE_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
- private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class);
-
- private final ConcurrentMap<PeerDescription, BlockingQueue<EndpointConnection>> connectionQueueMap = new ConcurrentHashMap<>();
+ private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class);
+
+ private final ConcurrentMap<PeerDescription, BlockingQueue<EndpointConnection>> connectionQueueMap = new ConcurrentHashMap<>();
private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
private final URI clusterUrl;
private final String apiUri;
-
+
private final AtomicLong peerIndex = new AtomicLong(0L);
-
+
private final ReentrantLock peerRefreshLock = new ReentrantLock();
private volatile List<PeerStatus> peerStatuses;
private volatile long peerRefreshTime = 0L;
@@ -118,132 +119,129 @@ public class EndpointConnectionPool {
private final ScheduledExecutorService taskExecutor;
private final int idleExpirationMillis;
private final RemoteDestination remoteDestination;
-
+
private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock();
private final Lock remoteInfoReadLock = listeningPortRWLock.readLock();
private final Lock remoteInfoWriteLock = listeningPortRWLock.writeLock();
private Integer siteToSitePort;
private Boolean siteToSiteSecure;
private long remoteRefreshTime;
- private final Map<String, String> inputPortMap = new HashMap<>(); // map input port name to identifier
- private final Map<String, String> outputPortMap = new HashMap<>(); // map output port name to identifier
-
+ private final Map<String, String> inputPortMap = new HashMap<>(); // map input port name to identifier
+ private final Map<String, String> outputPortMap = new HashMap<>(); // map output port name to identifier
+
private volatile int commsTimeout;
private volatile boolean shutdown = false;
-
-
- public EndpointConnectionPool(final String clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis,
- final int idleExpirationMillis, final EventReporter eventReporter, final File persistenceFile)
- {
- this(clusterUrl, remoteDestination, commsTimeoutMillis, idleExpirationMillis, null, eventReporter, persistenceFile);
- }
-
+
+ public EndpointConnectionPool(final String clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis,
+ final int idleExpirationMillis, final EventReporter eventReporter, final File persistenceFile) {
+ this(clusterUrl, remoteDestination, commsTimeoutMillis, idleExpirationMillis, null, eventReporter, persistenceFile);
+ }
+
public EndpointConnectionPool(final String clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis,
- final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile)
- {
+ final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) {
Objects.requireNonNull(clusterUrl, "URL cannot be null");
Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null");
- try {
- this.clusterUrl = new URI(clusterUrl);
- } catch (final URISyntaxException e) {
- throw new IllegalArgumentException("Invalid Cluster URL: " + clusterUrl);
- }
-
- // Trim the trailing /
+ try {
+ this.clusterUrl = new URI(clusterUrl);
+ } catch (final URISyntaxException e) {
+ throw new IllegalArgumentException("Invalid Cluster URL: " + clusterUrl);
+ }
+
+ // Trim the trailing /
String uriPath = this.clusterUrl.getPath();
if (uriPath.endsWith("/")) {
uriPath = uriPath.substring(0, uriPath.length() - 1);
}
apiUri = this.clusterUrl.getScheme() + "://" + this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api";
-
+
this.remoteDestination = remoteDestination;
- this.sslContext = sslContext;
- this.peersFile = persistenceFile;
- this.eventReporter = eventReporter;
- this.commsTimeout = commsTimeoutMillis;
- this.idleExpirationMillis = idleExpirationMillis;
-
- Set<PeerStatus> recoveredStatuses;
- if ( persistenceFile != null && persistenceFile.exists() ) {
- try {
- recoveredStatuses = recoverPersistedPeerStatuses(peersFile);
- this.peerStatusCache = new PeerStatusCache(recoveredStatuses, peersFile.lastModified());
- } catch (final IOException ioe) {
- logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", persistenceFile, ioe);
- }
- } else {
- peerStatusCache = null;
- }
-
- // Initialize a scheduled executor and run some maintenance tasks in the background to kill off old, unused
- // connections and keep our list of peers up-to-date.
- taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
- private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
-
- @Override
- public Thread newThread(final Runnable r) {
- final Thread thread = defaultFactory.newThread(r);
- thread.setName("NiFi Site-to-Site Connection Pool Maintenance");
- return thread;
- }
- });
-
- taskExecutor.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- refreshPeers();
- }
- }, 0, 5, TimeUnit.SECONDS);
-
- taskExecutor.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- cleanupExpiredSockets();
- }
- }, 5, 5, TimeUnit.SECONDS);
- }
-
+ this.sslContext = sslContext;
+ this.peersFile = persistenceFile;
+ this.eventReporter = eventReporter;
+ this.commsTimeout = commsTimeoutMillis;
+ this.idleExpirationMillis = idleExpirationMillis;
+
+ Set<PeerStatus> recoveredStatuses;
+ if (persistenceFile != null && persistenceFile.exists()) {
+ try {
+ recoveredStatuses = recoverPersistedPeerStatuses(peersFile);
+ this.peerStatusCache = new PeerStatusCache(recoveredStatuses, peersFile.lastModified());
+ } catch (final IOException ioe) {
+ logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", persistenceFile, ioe);
+ }
+ } else {
+ peerStatusCache = null;
+ }
+
+ // Initialize a scheduled executor and run some maintenance tasks in the background to kill off old, unused
+ // connections and keep our list of peers up-to-date.
+ taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+ private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
+
+ @Override
+ public Thread newThread(final Runnable r) {
+ final Thread thread = defaultFactory.newThread(r);
+ thread.setName("NiFi Site-to-Site Connection Pool Maintenance");
+ return thread;
+ }
+ });
+
+ taskExecutor.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ refreshPeers();
+ }
+ }, 0, 5, TimeUnit.SECONDS);
+
+ taskExecutor.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ cleanupExpiredSockets();
+ }
+ }, 5, 5, TimeUnit.SECONDS);
+ }
+
void warn(final String msg, final Object... args) {
- logger.warn(msg, args);
- if ( eventReporter != null ) {
- eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", MessageFormatter.arrayFormat(msg, args).getMessage());
- }
+ logger.warn(msg, args);
+ if (eventReporter != null) {
+ eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", MessageFormatter.arrayFormat(msg, args).getMessage());
+ }
}
-
+
void warn(final String msg, final Throwable t) {
- logger.warn(msg, t);
-
- if ( eventReporter != null ) {
- eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", msg + ": " + t.toString());
- }
+ logger.warn(msg, t);
+
+ if (eventReporter != null) {
+ eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", msg + ": " + t.toString());
+ }
}
-
+
void error(final String msg, final Object... args) {
- logger.error(msg, args);
- if ( eventReporter != null ) {
- eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", MessageFormatter.arrayFormat(msg, args).getMessage());
- }
+ logger.error(msg, args);
+ if (eventReporter != null) {
+ eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", MessageFormatter.arrayFormat(msg, args).getMessage());
+ }
}
-
+
private String getPortIdentifier(final TransferDirection transferDirection) throws IOException {
- if ( remoteDestination.getIdentifier() != null ) {
+ if (remoteDestination.getIdentifier() != null) {
return remoteDestination.getIdentifier();
}
-
- if ( transferDirection == TransferDirection.RECEIVE ) {
+
+ if (transferDirection == TransferDirection.RECEIVE) {
return getOutputPortIdentifier(remoteDestination.getName());
} else {
return getInputPortIdentifier(remoteDestination.getName());
}
}
-
+
public EndpointConnection getEndpointConnection(final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
return getEndpointConnection(direction, null);
}
-
-
- public EndpointConnection getEndpointConnection(final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
- //
+
+ public EndpointConnection getEndpointConnection(final TransferDirection direction, final SiteToSiteClientConfig config)
+ throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
+ //
// Attempt to get a connection state that already exists for this URL.
//
FlowFileCodec codec = null;
@@ -255,42 +253,42 @@ public class EndpointConnectionPool {
logger.debug("{} getting next peer status", this);
final PeerStatus peerStatus = getNextPeerStatus(direction);
logger.debug("{} next peer status = {}", this, peerStatus);
- if ( peerStatus == null ) {
+ if (peerStatus == null) {
return null;
}
final PeerDescription peerDescription = peerStatus.getPeerDescription();
BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peerStatus);
- if ( connectionQueue == null ) {
+ if (connectionQueue == null) {
connectionQueue = new LinkedBlockingQueue<>();
BlockingQueue<EndpointConnection> existing = connectionQueueMap.putIfAbsent(peerDescription, connectionQueue);
- if ( existing != null ) {
+ if (existing != null) {
connectionQueue = existing;
}
}
-
+
final List<EndpointConnection> addBack = new ArrayList<>();
try {
do {
connection = connectionQueue.poll();
logger.debug("{} Connection State for {} = {}", this, clusterUrl, connection);
final String portId = getPortIdentifier(direction);
-
- if ( connection == null && !addBack.isEmpty() ) {
+
+ if (connection == null && !addBack.isEmpty()) {
// all available connections have been penalized.
logger.debug("{} all Connections for {} are penalized; returning no Connection", this, portId);
return null;
}
-
- if ( connection != null && connection.getPeer().isPenalized(portId) ) {
+
+ if (connection != null && connection.getPeer().isPenalized(portId)) {
// we have a connection, but it's penalized. We want to add it back to the queue
// when we've found one to use.
addBack.add(connection);
continue;
}
-
+
// if we can't get an existing Connection, create one
- if ( connection == null ) {
+ if (connection == null) {
logger.debug("{} No Connection available for Port {}; creating new Connection", this, portId);
protocol = new SocketClientProtocol();
protocol.setDestination(new IdEnrichedRemoteDestination(remoteDestination, portId));
@@ -304,7 +302,7 @@ public class EndpointConnectionPool {
penalize(peerStatus.getPeerDescription(), penalizationMillis);
throw ioe;
}
-
+
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
try {
@@ -314,72 +312,72 @@ public class EndpointConnectionPool {
try {
commsSession.close();
} catch (final IOException ioe) {
- throw e;
+ throw e;
}
}
-
+
final String peerUrl = "nifi://" + peerDescription.getHostname() + ":" + peerDescription.getPort();
peer = new Peer(peerDescription, commsSession, peerUrl, clusterUrl.toString());
-
+
// set properties based on config
- if ( config != null ) {
+ if (config != null) {
protocol.setTimeout((int) config.getTimeout(TimeUnit.MILLISECONDS));
protocol.setPreferredBatchCount(config.getPreferredBatchCount());
protocol.setPreferredBatchSize(config.getPreferredBatchSize());
protocol.setPreferredBatchDuration(config.getPreferredBatchDuration(TimeUnit.MILLISECONDS));
}
-
+
// perform handshake
try {
logger.debug("{} performing handshake", this);
protocol.handshake(peer);
-
+
// handle error cases
- if ( protocol.isDestinationFull() ) {
- logger.warn("{} {} indicates that port {}'s destination is full; penalizing peer",
- this, peer, config.getPortName() == null ? config.getPortIdentifier() : config.getPortName());
-
+ if (protocol.isDestinationFull()) {
+ logger.warn("{} {} indicates that port {}'s destination is full; penalizing peer",
+ this, peer, config.getPortName() == null ? config.getPortIdentifier() : config.getPortName());
+
penalize(peer, penalizationMillis);
try {
- peer.close();
+ peer.close();
} catch (final IOException ioe) {
}
-
+
continue;
- } else if ( protocol.isPortInvalid() ) {
- penalize(peer, penalizationMillis);
- cleanup(protocol, peer);
- throw new PortNotRunningException(peer.toString() + " indicates that port " + portId + " is not running");
- } else if ( protocol.isPortUnknown() ) {
- penalize(peer, penalizationMillis);
- cleanup(protocol, peer);
- throw new UnknownPortException(peer.toString() + " indicates that port " + portId + " is not known");
+ } else if (protocol.isPortInvalid()) {
+ penalize(peer, penalizationMillis);
+ cleanup(protocol, peer);
+ throw new PortNotRunningException(peer.toString() + " indicates that port " + portId + " is not running");
+ } else if (protocol.isPortUnknown()) {
+ penalize(peer, penalizationMillis);
+ cleanup(protocol, peer);
+ throw new UnknownPortException(peer.toString() + " indicates that port " + portId + " is not known");
}
-
+
// negotiate the FlowFileCodec to use
logger.debug("{} negotiating codec", this);
codec = protocol.negotiateCodec(peer);
logger.debug("{} negotiated codec is {}", this, codec);
} catch (final PortNotRunningException | UnknownPortException e) {
- throw e;
+ throw e;
} catch (final Exception e) {
penalize(peer, penalizationMillis);
cleanup(protocol, peer);
-
+
final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? clusterUrl : peer, e.toString());
error(message);
- if ( logger.isDebugEnabled() ) {
+ if (logger.isDebugEnabled()) {
logger.error("", e);
}
throw e;
}
-
+
connection = new EndpointConnection(peer, protocol, codec);
} else {
final long lastTimeUsed = connection.getLastTimeUsed();
final long millisSinceLastUse = System.currentTimeMillis() - lastTimeUsed;
-
- if ( commsTimeout > 0L && millisSinceLastUse >= commsTimeout ) {
+
+ if (commsTimeout > 0L && millisSinceLastUse >= commsTimeout) {
cleanup(connection.getSocketClientProtocol(), connection.getPeer());
connection = null;
} else {
@@ -389,68 +387,70 @@ public class EndpointConnectionPool {
protocol = connection.getSocketClientProtocol();
}
}
- } while ( connection == null || codec == null || commsSession == null || protocol == null );
+ } while (connection == null || codec == null || commsSession == null || protocol == null);
} catch (final Throwable t) {
- if ( commsSession != null ) {
- try {
- commsSession.close();
- } catch (final IOException ioe) {
- }
- }
-
- throw t;
+ if (commsSession != null) {
+ try {
+ commsSession.close();
+ } catch (final IOException ioe) {
+ }
+ }
+
+ throw t;
} finally {
- if ( !addBack.isEmpty() ) {
+ if (!addBack.isEmpty()) {
connectionQueue.addAll(addBack);
}
}
-
+
activeConnections.add(connection);
return connection;
}
-
-
+
public boolean offer(final EndpointConnection endpointConnection) {
- final Peer peer = endpointConnection.getPeer();
- if ( peer == null ) {
- return false;
- }
-
- final BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peer.getDescription());
- if ( connectionQueue == null ) {
- return false;
- }
-
- activeConnections.remove(endpointConnection);
- if ( shutdown ) {
- terminate(endpointConnection);
- return false;
- } else {
- endpointConnection.setLastTimeUsed();
- return connectionQueue.offer(endpointConnection);
- }
- }
-
+ final Peer peer = endpointConnection.getPeer();
+ if (peer == null) {
+ return false;
+ }
+
+ final BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peer.getDescription());
+ if (connectionQueue == null) {
+ return false;
+ }
+
+ activeConnections.remove(endpointConnection);
+ if (shutdown) {
+ terminate(endpointConnection);
+ return false;
+ } else {
+ endpointConnection.setLastTimeUsed();
+ return connectionQueue.offer(endpointConnection);
+ }
+ }
+
private void penalize(final PeerDescription peerDescription, final long penalizationMillis) {
Long expiration = peerTimeoutExpirations.get(peerDescription);
- if ( expiration == null ) {
+ if (expiration == null) {
expiration = Long.valueOf(0L);
}
-
+
final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis);
peerTimeoutExpirations.put(peerDescription, Long.valueOf(newExpiration));
}
-
+
/**
- * Updates internal state map to penalize a PeerStatus that points to the specified peer
- * @param peer
+ * Updates internal state map to penalize a PeerStatus that points to the
+ * specified peer
+ *
+ * @param peer the peer
+ * @param penalizationMillis period of time to penalize a given peer
*/
public void penalize(final Peer peer, final long penalizationMillis) {
penalize(peer.getDescription(), penalizationMillis);
}
-
+
private void cleanup(final SocketClientProtocol protocol, final Peer peer) {
- if ( protocol != null && peer != null ) {
+ if (protocol != null && peer != null) {
try {
protocol.shutdown(peer);
} catch (final TransmissionDisabledException e) {
@@ -459,8 +459,8 @@ public class EndpointConnectionPool {
} catch (IOException e1) {
}
}
-
- if ( peer != null ) {
+
+ if (peer != null) {
try {
peer.close();
} catch (final TransmissionDisabledException e) {
@@ -470,15 +470,14 @@ public class EndpointConnectionPool {
}
}
}
-
+
private boolean isPeerRefreshNeeded(final List<PeerStatus> peerList) {
return (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
}
-
-
+
private PeerStatus getNextPeerStatus(final TransferDirection direction) {
List<PeerStatus> peerList = peerStatuses;
- if ( isPeerRefreshNeeded(peerList) ) {
+ if (isPeerRefreshNeeded(peerList)) {
peerRefreshLock.lock();
try {
// now that we have the lock, check again that we need to refresh (because another thread
@@ -490,15 +489,15 @@ public class EndpointConnectionPool {
} catch (final Exception e) {
final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString());
warn(message);
- if ( logger.isDebugEnabled() ) {
+ if (logger.isDebugEnabled()) {
logger.warn("", e);
}
-
- if ( eventReporter != null ) {
- eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
+
+ if (eventReporter != null) {
+ eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
}
}
-
+
this.peerStatuses = peerList;
peerRefreshTime = System.currentTimeMillis();
}
@@ -507,46 +506,46 @@ public class EndpointConnectionPool {
}
}
- if ( peerList == null || peerList.isEmpty() ) {
+ if (peerList == null || peerList.isEmpty()) {
return null;
}
PeerStatus peerStatus;
- for (int i=0; i < peerList.size(); i++) {
+ for (int i = 0; i < peerList.size(); i++) {
final long idx = peerIndex.getAndIncrement();
final int listIndex = (int) (idx % peerList.size());
peerStatus = peerList.get(listIndex);
-
- if ( isPenalized(peerStatus) ) {
+
+ if (isPenalized(peerStatus)) {
logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus);
} else {
return peerStatus;
}
}
-
+
logger.debug("{} All peers appear to be penalized; returning null", this);
return null;
}
-
+
private boolean isPenalized(final PeerStatus peerStatus) {
final Long expirationEnd = peerTimeoutExpirations.get(peerStatus.getPeerDescription());
- return (expirationEnd == null ? false : expirationEnd > System.currentTimeMillis() );
+ return (expirationEnd == null ? false : expirationEnd > System.currentTimeMillis());
}
-
+
private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException {
Set<PeerStatus> statuses = getPeerStatuses();
- if ( statuses == null ) {
+ if (statuses == null) {
refreshPeers();
statuses = getPeerStatuses();
- if ( statuses == null ) {
+ if (statuses == null) {
logger.debug("{} found no peers to connect to", this);
return Collections.emptyList();
}
}
-
+
final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
final List<NodeInformation> nodeInfos = new ArrayList<>();
- for ( final PeerStatus peerStatus : statuses ) {
+ for (final PeerStatus peerStatus : statuses) {
final PeerDescription description = peerStatus.getPeerDescription();
final NodeInformation nodeInfo = new NodeInformation(description.getHostname(), description.getPort(), 0, description.isSecure(), peerStatus.getFlowFileCount());
nodeInfos.add(nodeInfo);
@@ -554,8 +553,7 @@ public class EndpointConnectionPool {
clusterNodeInfo.setNodeInformation(nodeInfos);
return formulateDestinationList(clusterNodeInfo, direction);
}
-
-
+
private Set<PeerStatus> getPeerStatuses() {
final PeerStatusCache cache = this.peerStatusCache;
if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) {
@@ -576,14 +574,14 @@ public class EndpointConnectionPool {
}
private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException, HandshakeException, UnknownPortException, PortNotRunningException {
- final String hostname = clusterUrl.getHost();
+ final String hostname = clusterUrl.getHost();
final Integer port = getSiteToSitePort();
- if ( port == null ) {
+ if (port == null) {
throw new IOException("Remote instance of NiFi is not configured to allow site-to-site communications");
}
-
+
final PeerDescription clusterPeerDescription = new PeerDescription(hostname, port, clusterUrl.toString().startsWith("https://"));
- final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port);
+ final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port);
final Peer peer = new Peer(clusterPeerDescription, commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString());
final SocketClientProtocol clientProtocol = new SocketClientProtocol();
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
@@ -593,11 +591,11 @@ public class EndpointConnectionPool {
clientProtocol.setTimeout(commsTimeout);
if (clientProtocol.getVersionNegotiator().getVersion() < 5) {
String portId = getPortIdentifier(TransferDirection.RECEIVE);
- if ( portId == null ) {
+ if (portId == null) {
portId = getPortIdentifier(TransferDirection.SEND);
}
-
- if ( portId == null ) {
+
+ if (portId == null) {
peer.close();
throw new IOException("Failed to determine the identifier of port " + remoteDestination.getName());
}
@@ -605,7 +603,7 @@ public class EndpointConnectionPool {
} else {
clientProtocol.handshake(peer, null);
}
-
+
final Set<PeerStatus> peerStatuses = clientProtocol.getPeerStatuses(peer);
persistPeerStatuses(peerStatuses);
@@ -632,14 +630,13 @@ public class EndpointConnectionPool {
return peerStatuses;
}
-
private void persistPeerStatuses(final Set<PeerStatus> statuses) {
- if ( peersFile == null ) {
- return;
- }
-
+ if (peersFile == null) {
+ return;
+ }
+
try (final OutputStream fos = new FileOutputStream(peersFile);
- final OutputStream out = new BufferedOutputStream(fos)) {
+ final OutputStream out = new BufferedOutputStream(fos)) {
for (final PeerStatus status : statuses) {
final PeerDescription description = status.getPeerDescription();
@@ -679,53 +676,52 @@ public class EndpointConnectionPool {
return statuses;
}
-
-
+
private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException {
final PeerDescription description = peerStatus.getPeerDescription();
- return establishSiteToSiteConnection(description.getHostname(), description.getPort());
+ return establishSiteToSiteConnection(description.getHostname(), description.getPort());
}
-
+
private CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port) throws IOException {
- final boolean siteToSiteSecure = isSecure();
+ final boolean siteToSiteSecure = isSecure();
final String destinationUri = "nifi://" + hostname + ":" + port;
CommunicationsSession commsSession = null;
try {
- if ( siteToSiteSecure ) {
- if ( sslContext == null ) {
- throw new IOException("Unable to communicate with " + hostname + ":" + port + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications");
- }
-
- final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
- socketChannel.connect();
-
- commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
-
- try {
- commsSession.setUserDn(socketChannel.getDn());
- } catch (final CertificateNotYetValidException | CertificateExpiredException ex) {
- throw new IOException(ex);
- }
- } else {
- final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
- commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri);
- }
-
- commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
- commsSession.setUri(destinationUri);
+ if (siteToSiteSecure) {
+ if (sslContext == null) {
+ throw new IOException("Unable to communicate with " + hostname + ":" + port
+ + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications");
+ }
+
+ final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
+ socketChannel.connect();
+
+ commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
+
+ try {
+ commsSession.setUserDn(socketChannel.getDn());
+ } catch (final CertificateNotYetValidException | CertificateExpiredException ex) {
+ throw new IOException(ex);
+ }
+ } else {
+ final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
+ commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri);
+ }
+
+ commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
+ commsSession.setUri(destinationUri);
} catch (final IOException ioe) {
- if ( commsSession != null ) {
+ if (commsSession != null) {
commsSession.close();
}
-
+
throw ioe;
}
-
+
return commsSession;
}
-
-
+
static List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo, final TransferDirection direction) {
final Collection<NodeInformation> nodeInfoSet = clusterNodeInfo.getNodeInformation();
final int numDestinations = Math.max(128, nodeInfoSet.size());
@@ -743,26 +739,26 @@ public class EndpointConnectionPool {
final double percentageOfFlowFiles = Math.min(0.8D, ((double) flowFileCount / (double) totalFlowFileCount));
final double relativeWeighting = (direction == TransferDirection.SEND) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles;
final int entries = Math.max(1, (int) (numDestinations * relativeWeighting));
-
+
entryCountMap.put(nodeInfo, Math.max(1, entries));
totalEntries += entries;
}
-
+
final List<PeerStatus> destinations = new ArrayList<>(totalEntries);
- for (int i=0; i < totalEntries; i++) {
+ for (int i = 0; i < totalEntries; i++) {
destinations.add(null);
}
- for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) {
+ for (final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet()) {
final NodeInformation nodeInfo = entry.getKey();
final int numEntries = entry.getValue();
-
+
int skipIndex = numEntries;
- for (int i=0; i < numEntries; i++) {
+ for (int i = 0; i < numEntries; i++) {
int n = (skipIndex * i);
while (true) {
final int index = n % destinations.size();
PeerStatus status = destinations.get(index);
- if ( status == null ) {
+ if (status == null) {
final PeerDescription description = new PeerDescription(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure());
status = new PeerStatus(description, nodeInfo.getTotalFlowFiles());
destinations.set(index, status);
@@ -776,7 +772,7 @@ public class EndpointConnectionPool {
final StringBuilder distributionDescription = new StringBuilder();
distributionDescription.append("New Weighted Distribution of Nodes:");
- for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) {
+ for (final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet()) {
final double percentage = entry.getValue() * 100D / (double) destinations.size();
distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data");
}
@@ -785,55 +781,54 @@ public class EndpointConnectionPool {
// Jumble the list of destinations.
return destinations;
}
-
-
+
private void cleanupExpiredSockets() {
- for ( final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values()) {
+ for (final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values()) {
final List<EndpointConnection> connections = new ArrayList<>();
-
+
EndpointConnection connection;
while ((connection = connectionQueue.poll()) != null) {
// If the socket has not been used in 10 seconds, shut it down.
final long lastUsed = connection.getLastTimeUsed();
- if ( lastUsed < System.currentTimeMillis() - idleExpirationMillis ) {
+ if (lastUsed < System.currentTimeMillis() - idleExpirationMillis) {
try {
connection.getSocketClientProtocol().shutdown(connection.getPeer());
} catch (final Exception e) {
- logger.debug("Failed to shut down {} using {} due to {}",
- new Object[] {connection.getSocketClientProtocol(), connection.getPeer(), e} );
+ logger.debug("Failed to shut down {} using {} due to {}",
+ new Object[]{connection.getSocketClientProtocol(), connection.getPeer(), e});
}
-
+
terminate(connection);
} else {
connections.add(connection);
}
}
-
+
connectionQueue.addAll(connections);
}
}
-
+
public void shutdown() {
shutdown = true;
- taskExecutor.shutdown();
- peerTimeoutExpirations.clear();
-
- for ( final EndpointConnection conn : activeConnections ) {
- conn.getPeer().getCommunicationsSession().interrupt();
+ taskExecutor.shutdown();
+ peerTimeoutExpirations.clear();
+
+ for (final EndpointConnection conn : activeConnections) {
+ conn.getPeer().getCommunicationsSession().interrupt();
}
- for ( final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values() ) {
+ for (final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values()) {
EndpointConnection state;
- while ( (state = connectionQueue.poll()) != null) {
+ while ((state = connectionQueue.poll()) != null) {
cleanup(state.getSocketClientProtocol(), state.getPeer());
}
}
}
-
+
public void terminate(final EndpointConnection connection) {
cleanup(connection.getSocketClientProtocol(), connection.getPeer());
}
-
+
private void refreshPeers() {
final PeerStatusCache existingCache = peerStatusCache;
if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis())) {
@@ -851,69 +846,66 @@ public class EndpointConnectionPool {
}
}
}
-
-
+
public String getInputPortIdentifier(final String portName) throws IOException {
return getPortIdentifier(portName, inputPortMap);
}
-
+
public String getOutputPortIdentifier(final String portName) throws IOException {
- return getPortIdentifier(portName, outputPortMap);
+ return getPortIdentifier(portName, outputPortMap);
}
-
-
+
private String getPortIdentifier(final String portName, final Map<String, String> portMap) throws IOException {
- String identifier;
- remoteInfoReadLock.lock();
+ String identifier;
+ remoteInfoReadLock.lock();
try {
- identifier = portMap.get(portName);
+ identifier = portMap.get(portName);
} finally {
- remoteInfoReadLock.unlock();
+ remoteInfoReadLock.unlock();
}
-
- if ( identifier != null ) {
- return identifier;
+
+ if (identifier != null) {
+ return identifier;
}
-
+
refreshRemoteInfo();
- remoteInfoReadLock.lock();
+ remoteInfoReadLock.lock();
try {
- return portMap.get(portName);
+ return portMap.get(portName);
} finally {
- remoteInfoReadLock.unlock();
+ remoteInfoReadLock.unlock();
}
}
-
-
+
private ControllerDTO refreshRemoteInfo() throws IOException {
- final boolean webInterfaceSecure = clusterUrl.toString().startsWith("https");
+ final boolean webInterfaceSecure = clusterUrl.toString().startsWith("https");
final NiFiRestApiUtil utils = new NiFiRestApiUtil(webInterfaceSecure ? sslContext : null);
- final ControllerDTO controller = utils.getController(apiUri + "/controller", commsTimeout);
-
+ final ControllerDTO controller = utils.getController(apiUri + "/controller", commsTimeout);
+
remoteInfoWriteLock.lock();
try {
this.siteToSitePort = controller.getRemoteSiteListeningPort();
this.siteToSiteSecure = controller.isSiteToSiteSecure();
-
+
inputPortMap.clear();
for (final PortDTO inputPort : controller.getInputPorts()) {
- inputPortMap.put(inputPort.getName(), inputPort.getId());
+ inputPortMap.put(inputPort.getName(), inputPort.getId());
}
-
+
outputPortMap.clear();
- for ( final PortDTO outputPort : controller.getOutputPorts()) {
- outputPortMap.put(outputPort.getName(), outputPort.getId());
+ for (final PortDTO outputPort : controller.getOutputPorts()) {
+ outputPortMap.put(outputPort.getName(), outputPort.getId());
}
-
+
this.remoteRefreshTime = System.currentTimeMillis();
} finally {
- remoteInfoWriteLock.unlock();
+ remoteInfoWriteLock.unlock();
}
-
+
return controller;
}
-
+
/**
* @return the port that the remote instance is listening on for
* site-to-site communication, or <code>null</code> if the remote instance
@@ -930,7 +922,7 @@ public class EndpointConnectionPool {
return listeningPort;
}
} finally {
- remoteInfoReadLock.unlock();
+ remoteInfoReadLock.unlock();
}
final ControllerDTO controller = refreshRemoteInfo();
@@ -938,19 +930,16 @@ public class EndpointConnectionPool {
return listeningPort;
}
-
+
@Override
public String toString() {
return "EndpointConnectionPool[Cluster URL=" + clusterUrl + "]";
}
-
-
+
/**
- * Returns {@code true} if the remote instance is configured for secure site-to-site communications,
- * {@code false} otherwise.
- *
- * @return
- * @throws IOException
+ * @return {@code true} if the remote instance is configured for secure
+ * site-to-site communications, {@code false} otherwise
+ * @throws IOException if unable to check if secure
*/
public boolean isSecure() throws IOException {
remoteInfoReadLock.lock();
@@ -960,23 +949,23 @@ public class EndpointConnectionPool {
return secure;
}
} finally {
- remoteInfoReadLock.unlock();
+ remoteInfoReadLock.unlock();
}
final ControllerDTO controller = refreshRemoteInfo();
final Boolean isSecure = controller.isSiteToSiteSecure();
- if ( isSecure == null ) {
+ if (isSecure == null) {
throw new IOException("Remote NiFi instance " + clusterUrl + " is not currently configured to accept site-to-site connections");
}
-
+
return isSecure;
}
-
-
+
private class IdEnrichedRemoteDestination implements RemoteDestination {
+
private final RemoteDestination original;
private final String identifier;
-
+
public IdEnrichedRemoteDestination(final RemoteDestination original, final String identifier) {
this.original = original;
this.identifier = identifier;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index 4aab3f7..33e4a66 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -33,71 +33,71 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SocketClient implements SiteToSiteClient {
+
private static final Logger logger = LoggerFactory.getLogger(SocketClient.class);
-
+
private final SiteToSiteClientConfig config;
- private final EndpointConnectionPool pool;
- private final boolean compress;
- private final String portName;
- private final long penalizationNanos;
- private volatile String portIdentifier;
- private volatile boolean closed = false;
-
- public SocketClient(final SiteToSiteClientConfig config) {
- pool = new EndpointConnectionPool(config.getUrl(),
- createRemoteDestination(config.getPortIdentifier(), config.getPortName()),
- (int) config.getTimeout(TimeUnit.MILLISECONDS),
- (int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
- config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile());
-
- this.config = config;
- this.compress = config.isUseCompression();
- this.portIdentifier = config.getPortIdentifier();
- this.portName = config.getPortName();
- this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
- }
-
- @Override
- public SiteToSiteClientConfig getConfig() {
- return config;
- }
-
- @Override
- public boolean isSecure() throws IOException {
- return pool.isSecure();
- }
-
- private String getPortIdentifier(final TransferDirection direction) throws IOException {
- final String id = this.portIdentifier;
- if ( id != null ) {
- return id;
- }
-
- final String portId;
- if ( direction == TransferDirection.SEND ) {
- portId = pool.getInputPortIdentifier(this.portName);
- } else {
- portId = pool.getOutputPortIdentifier(this.portName);
- }
-
- if (portId == null) {
- logger.debug("Unable to resolve port [{}] to an identifier", portName);
- } else {
- logger.debug("Resolved port [{}] to identifier [{}]", portName, portId);
- this.portIdentifier = portId;
- }
-
- return portId;
- }
-
-
- private RemoteDestination createRemoteDestination(final String portId, final String portName) {
- return new RemoteDestination() {
+ private final EndpointConnectionPool pool;
+ private final boolean compress;
+ private final String portName;
+ private final long penalizationNanos;
+ private volatile String portIdentifier;
+ private volatile boolean closed = false;
+
+ public SocketClient(final SiteToSiteClientConfig config) {
+ pool = new EndpointConnectionPool(config.getUrl(),
+ createRemoteDestination(config.getPortIdentifier(), config.getPortName()),
+ (int) config.getTimeout(TimeUnit.MILLISECONDS),
+ (int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
+ config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile());
+
+ this.config = config;
+ this.compress = config.isUseCompression();
+ this.portIdentifier = config.getPortIdentifier();
+ this.portName = config.getPortName();
+ this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public SiteToSiteClientConfig getConfig() {
+ return config;
+ }
+
+ @Override
+ public boolean isSecure() throws IOException {
+ return pool.isSecure();
+ }
+
+ private String getPortIdentifier(final TransferDirection direction) throws IOException {
+ final String id = this.portIdentifier;
+ if (id != null) {
+ return id;
+ }
+
+ final String portId;
+ if (direction == TransferDirection.SEND) {
+ portId = pool.getInputPortIdentifier(this.portName);
+ } else {
+ portId = pool.getOutputPortIdentifier(this.portName);
+ }
+
+ if (portId == null) {
+ logger.debug("Unable to resolve port [{}] to an identifier", portName);
+ } else {
+ logger.debug("Resolved port [{}] to identifier [{}]", portName, portId);
+ this.portIdentifier = portId;
+ }
+
+ return portId;
+ }
+
+ private RemoteDestination createRemoteDestination(final String portId, final String portName) {
+ return new RemoteDestination() {
@Override
public String getIdentifier() {
return portId;
}
-
+
@Override
public String getName() {
return portName;
@@ -113,113 +113,112 @@ public class SocketClient implements SiteToSiteClient {
return compress;
}
};
- }
-
- @Override
- public Transaction createTransaction(final TransferDirection direction) throws IOException {
- if ( closed ) {
- throw new IllegalStateException("Client is closed");
- }
- final String portId = getPortIdentifier(direction);
-
- if ( portId == null ) {
- throw new IOException("Could not find Port with name '" + portName + "' for remote NiFi instance");
- }
-
- final EndpointConnection connectionState = pool.getEndpointConnection(direction, getConfig());
- if ( connectionState == null ) {
- return null;
- }
-
- final Transaction transaction;
- try {
- transaction = connectionState.getSocketClientProtocol().startTransaction(
- connectionState.getPeer(), connectionState.getCodec(), direction);
- } catch (final Throwable t) {
- pool.terminate(connectionState);
- throw new IOException("Unable to create Transaction to communicate with " + connectionState.getPeer(), t);
- }
-
- // Wrap the transaction in a new one that will return the EndpointConnectionState back to the pool whenever
- // the transaction is either completed or canceled.
- final ObjectHolder<EndpointConnection> connectionStateRef = new ObjectHolder<>(connectionState);
- return new Transaction() {
- @Override
- public void confirm() throws IOException {
- transaction.confirm();
- }
-
- @Override
- public TransactionCompletion complete() throws IOException {
- try {
- return transaction.complete();
- } finally {
- final EndpointConnection state = connectionStateRef.get();
- if ( state != null ) {
- pool.offer(connectionState);
- connectionStateRef.set(null);
- }
- }
- }
-
- @Override
- public void cancel(final String explanation) throws IOException {
- try {
- transaction.cancel(explanation);
- } finally {
+ }
+
+ @Override
+ public Transaction createTransaction(final TransferDirection direction) throws IOException {
+ if (closed) {
+ throw new IllegalStateException("Client is closed");
+ }
+ final String portId = getPortIdentifier(direction);
+
+ if (portId == null) {
+ throw new IOException("Could not find Port with name '" + portName + "' for remote NiFi instance");
+ }
+
+ final EndpointConnection connectionState = pool.getEndpointConnection(direction, getConfig());
+ if (connectionState == null) {
+ return null;
+ }
+
+ final Transaction transaction;
+ try {
+ transaction = connectionState.getSocketClientProtocol().startTransaction(
+ connectionState.getPeer(), connectionState.getCodec(), direction);
+ } catch (final Throwable t) {
+ pool.terminate(connectionState);
+ throw new IOException("Unable to create Transaction to communicate with " + connectionState.getPeer(), t);
+ }
+
+ // Wrap the transaction in a new one that will return the EndpointConnectionState back to the pool whenever
+ // the transaction is either completed or canceled.
+ final ObjectHolder<EndpointConnection> connectionStateRef = new ObjectHolder<>(connectionState);
+ return new Transaction() {
+ @Override
+ public void confirm() throws IOException {
+ transaction.confirm();
+ }
+
+ @Override
+ public TransactionCompletion complete() throws IOException {
+ try {
+ return transaction.complete();
+ } finally {
final EndpointConnection state = connectionStateRef.get();
- if ( state != null ) {
+ if (state != null) {
+ pool.offer(connectionState);
+ connectionStateRef.set(null);
+ }
+ }
+ }
+
+ @Override
+ public void cancel(final String explanation) throws IOException {
+ try {
+ transaction.cancel(explanation);
+ } finally {
+ final EndpointConnection state = connectionStateRef.get();
+ if (state != null) {
pool.terminate(connectionState);
connectionStateRef.set(null);
}
- }
- }
-
- @Override
- public void error() {
- try {
- transaction.error();
- } finally {
+ }
+ }
+
+ @Override
+ public void error() {
+ try {
+ transaction.error();
+ } finally {
final EndpointConnection state = connectionStateRef.get();
- if ( state != null ) {
+ if (state != null) {
pool.terminate(connectionState);
connectionStateRef.set(null);
}
- }
- }
-
- @Override
- public void send(final DataPacket dataPacket) throws IOException {
- transaction.send(dataPacket);
- }
-
- @Override
- public void send(final byte[] content, final Map<String, String> attributes) throws IOException {
- transaction.send(content, attributes);
- }
-
- @Override
- public DataPacket receive() throws IOException {
- return transaction.receive();
- }
-
- @Override
- public TransactionState getState() throws IOException {
- return transaction.getState();
- }
-
- @Override
- public Communicant getCommunicant() {
- return transaction.getCommunicant();
- }
- };
- }
-
-
- @Override
- public void close() throws IOException {
- closed = true;
- pool.shutdown();
- }
-
+ }
+ }
+
+ @Override
+ public void send(final DataPacket dataPacket) throws IOException {
+ transaction.send(dataPacket);
+ }
+
+ @Override
+ public void send(final byte[] content, final Map<String, String> attributes) throws IOException {
+ transaction.send(content, attributes);
+ }
+
+ @Override
+ public DataPacket receive() throws IOException {
+ return transaction.receive();
+ }
+
+ @Override
+ public TransactionState getState() throws IOException {
+ return transaction.getState();
+ }
+
+ @Override
+ public Communicant getCommunicant() {
+ return transaction.getCommunicant();
+ }
+ };
+ }
+
+ @Override
+ public void close() throws IOException {
+ closed = true;
+ pool.shutdown();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
index 1380e1b..e79fc47 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
@@ -38,13 +38,13 @@ public interface FlowFileCodec extends VersionedRemoteResource {
* Returns a List of all versions that this codec is able to support, in the
* order that they are preferred by the codec
*
- * @return
+ * @return all supported versions
*/
public List<Integer> getSupportedVersions();
/**
- * Encodes a DataPacket and its content as a single stream of data and writes
- * that stream to the output.
+ * Encodes a DataPacket and its content as a single stream of data and
+ * writes that stream to the output.
*
* @param dataPacket the data to serialize
* @param outStream the stream to write the data to
@@ -58,12 +58,13 @@ public interface FlowFileCodec extends VersionedRemoteResource {
* Decodes the contents of the InputStream, interpreting the data to
* determine the next DataPacket's attributes and content.
*
- * @param stream an InputStream containing DataPacket's content and attributes
+ * @param stream an InputStream containing DataPacket's content and
+ * attributes
*
- * @return the DataPacket that was created, or <code>null</code> if the stream
- * was out of data
+ * @return the DataPacket that was created, or <code>null</code> if the
+ * stream was out of data
*
- * @throws IOException
+ * @throws IOException if unable to read stream
* @throws ProtocolException if the input is malformed
* @throws TransmissionDisabledException if a user terminates the connection
*/
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
index 6fd92de..0bee537 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
@@ -34,7 +34,8 @@ import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.stream.io.StreamUtils;
public class StandardFlowFileCodec implements FlowFileCodec {
- public static final int MAX_NUM_ATTRIBUTES = 25000;
+
+ public static final int MAX_NUM_ATTRIBUTES = 25000;
public static final String DEFAULT_FLOWFILE_PATH = "./";
@@ -43,30 +44,29 @@ public class StandardFlowFileCodec implements FlowFileCodec {
public StandardFlowFileCodec() {
versionNegotiator = new StandardVersionNegotiator(1);
}
-
+
@Override
public void encode(final DataPacket dataPacket, final OutputStream encodedOut) throws IOException {
final DataOutputStream out = new DataOutputStream(encodedOut);
-
+
final Map<String, String> attributes = dataPacket.getAttributes();
out.writeInt(attributes.size());
- for ( final Map.Entry<String, String> entry : attributes.entrySet() ) {
+ for (final Map.Entry<String, String> entry : attributes.entrySet()) {
writeString(entry.getKey(), out);
writeString(entry.getValue(), out);
}
-
+
out.writeLong(dataPacket.getSize());
-
+
final InputStream in = dataPacket.getData();
StreamUtils.copy(in, encodedOut);
encodedOut.flush();
}
-
@Override
public DataPacket decode(final InputStream stream) throws IOException, ProtocolException {
final DataInputStream in = new DataInputStream(stream);
-
+
final int numAttributes;
try {
numAttributes = in.readInt();
@@ -74,22 +74,22 @@ public class StandardFlowFileCodec implements FlowFileCodec {
// we're out of data.
return null;
}
-
+
// This is here because if the stream is not properly formed, we could get up to Integer.MAX_VALUE attributes, which will
// generally result in an OutOfMemoryError.
- if ( numAttributes > MAX_NUM_ATTRIBUTES ) {
- throw new ProtocolException("FlowFile exceeds maximum number of attributes with a total of " + numAttributes);
+ if (numAttributes > MAX_NUM_ATTRIBUTES) {
+ throw new ProtocolException("FlowFile exceeds maximum number of attributes with a total of " + numAttributes);
}
-
+
final Map<String, String> attributes = new HashMap<>(numAttributes);
- for (int i=0; i < numAttributes; i++) {
+ for (int i = 0; i < numAttributes; i++) {
final String attrName = readString(in);
final String attrValue = readString(in);
attributes.put(attrName, attrValue);
}
-
+
final long numBytes = in.readLong();
-
+
return new StandardDataPacket(attributes, stream, numBytes);
}
@@ -99,14 +99,13 @@ public class StandardFlowFileCodec implements FlowFileCodec {
out.write(bytes);
}
-
private String readString(final DataInputStream in) throws IOException {
final int numBytes = in.readInt();
final byte[] bytes = new byte[numBytes];
StreamUtils.fillBuffer(in, bytes, true);
return new String(bytes, "UTF-8");
}
-
+
@Override
public List<Integer> getSupportedVersions() {
return versionNegotiator.getSupportedVersions();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
index d4d55e1..198aaef 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
@@ -18,13 +18,14 @@ package org.apache.nifi.remote.exception;
import java.io.IOException;
-
/**
- * A HandshakeException occurs when the client and the remote NiFi instance do not agree
- * on some condition during the handshake. For example, if the NiFi instance does not recognize
- * one of the parameters that the client passes during the Handshaking phase.
+ * A HandshakeException occurs when the client and the remote NiFi instance do
+ * not agree on some condition during the handshake. For example, if the NiFi
+ * instance does not recognize one of the parameters that the client passes
+ * during the Handshaking phase.
*/
public class HandshakeException extends IOException {
+
private static final long serialVersionUID = 178192341908726L;
public HandshakeException(final String message) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
index 8b97832..09fc05c 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
@@ -17,11 +17,12 @@
package org.apache.nifi.remote.exception;
/**
- * PortNotRunningException occurs when the remote NiFi instance reports
- * that the Port that the client is attempting to communicate with is not
- * currently running and therefore communications with that Port are not allowed.
+ * PortNotRunningException occurs when the remote NiFi instance reports that the
+ * Port that the client is attempting to communicate with is not currently
+ * running and therefore communications with that Port are not allowed.
*/
public class PortNotRunningException extends ProtocolException {
+
private static final long serialVersionUID = -2790940982005516375L;
public PortNotRunningException(final String message) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
index 45a4e15..cc6ae50 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
@@ -19,8 +19,8 @@ package org.apache.nifi.remote.exception;
import java.io.IOException;
/**
- * A ProtocolException occurs when unexpected data is received, for example
- * an invalid Response Code.
+ * A ProtocolException occurs when unexpected data is received, for example an
+ * invalid Response Code.
*/
public class ProtocolException extends IOException {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
index 592a1b3..4249075 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
@@ -17,10 +17,12 @@
package org.apache.nifi.remote.exception;
/**
- * An UnknownPortException indicates that the remote NiFi instance has reported that
- * the endpoint that the client attempted to communicate with does not exist.
+ * An UnknownPortException indicates that the remote NiFi instance has reported
+ * that the endpoint that the client attempted to communicate with does not
+ * exist.
*/
public class UnknownPortException extends ProtocolException {
+
private static final long serialVersionUID = -2790940982005516375L;
public UnknownPortException(final String message) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
index 8065f57..6180c3c 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
@@ -22,11 +22,12 @@ import java.nio.channels.SocketChannel;
import org.apache.nifi.remote.AbstractCommunicationsSession;
public class SocketChannelCommunicationsSession extends AbstractCommunicationsSession {
+
private final SocketChannel channel;
private final SocketChannelInput request;
private final SocketChannelOutput response;
private int timeout = 30000;
-
+
public SocketChannelCommunicationsSession(final SocketChannel socketChannel, final String uri) throws IOException {
super(uri);
request = new SocketChannelInput(socketChannel);
@@ -34,12 +35,12 @@ public class SocketChannelCommunicationsSession extends AbstractCommunicationsSe
channel = socketChannel;
socketChannel.configureBlocking(false);
}
-
+
@Override
public boolean isClosed() {
return !channel.isConnected();
}
-
+
@Override
public SocketChannelInput getInput() {
return request;
@@ -65,28 +66,28 @@ public class SocketChannelCommunicationsSession extends AbstractCommunicationsSe
@Override
public void close() throws IOException {
IOException suppressed = null;
-
+
try {
request.consume();
} catch (final IOException ioe) {
suppressed = ioe;
}
-
+
try {
channel.close();
} catch (final IOException ioe) {
- if ( suppressed != null ) {
+ if (suppressed != null) {
ioe.addSuppressed(suppressed);
}
-
+
throw ioe;
}
-
- if ( suppressed != null ) {
+
+ if (suppressed != null) {
throw suppressed;
}
}
-
+
@Override
public boolean isDataAvailable() {
return request.isDataAvailable();
@@ -101,7 +102,7 @@ public class SocketChannelCommunicationsSession extends AbstractCommunicationsSe
public long getBytesRead() {
return request.getBytesRead();
}
-
+
@Override
public void interrupt() {
request.interrupt();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
index 7dffddd..68a8dc4 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
@@ -26,18 +26,19 @@ import org.apache.nifi.remote.io.InterruptableInputStream;
import org.apache.nifi.remote.protocol.CommunicationsInput;
public class SocketChannelInput implements CommunicationsInput {
+
private final SocketChannelInputStream socketIn;
private final ByteCountingInputStream countingIn;
private final InputStream bufferedIn;
private final InterruptableInputStream interruptableIn;
-
+
public SocketChannelInput(final SocketChannel socketChannel) throws IOException {
this.socketIn = new SocketChannelInputStream(socketChannel);
countingIn = new ByteCountingInputStream(socketIn);
bufferedIn = new BufferedInputStream(countingIn);
interruptableIn = new InterruptableInputStream(bufferedIn);
}
-
+
@Override
public InputStream getInputStream() throws IOException {
return interruptableIn;
@@ -46,7 +47,7 @@ public class SocketChannelInput implements CommunicationsInput {
public void setTimeout(final int millis) {
socketIn.setTimeout(millis);
}
-
+
public boolean isDataAvailable() {
try {
return interruptableIn.available() > 0;
@@ -54,12 +55,12 @@ public class SocketChannelInput implements CommunicationsInput {
return false;
}
}
-
+
@Override
public long getBytesRead() {
return countingIn.getBytesRead();
}
-
+
public void interrupt() {
interruptableIn.interrupt();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
index 26c0164..13974a5 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
@@ -26,32 +26,33 @@ import org.apache.nifi.remote.io.InterruptableOutputStream;
import org.apache.nifi.remote.protocol.CommunicationsOutput;
public class SocketChannelOutput implements CommunicationsOutput {
+
private final SocketChannelOutputStream socketOutStream;
private final ByteCountingOutputStream countingOut;
private final OutputStream bufferedOut;
private final InterruptableOutputStream interruptableOut;
-
+
public SocketChannelOutput(final SocketChannel socketChannel) throws IOException {
socketOutStream = new SocketChannelOutputStream(socketChannel);
countingOut = new ByteCountingOutputStream(socketOutStream);
bufferedOut = new BufferedOutputStream(countingOut);
interruptableOut = new InterruptableOutputStream(bufferedOut);
}
-
+
@Override
public OutputStream getOutputStream() throws IOException {
return interruptableOut;
}
-
+
public void setTimeout(final int timeout) {
socketOutStream.setTimeout(timeout);
}
-
+
@Override
public long getBytesWritten() {
return countingOut.getBytesWritten();
}
-
+
public void interrupt() {
interruptableOut.interrupt();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
index 50e9162..5e5abc7 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
@@ -21,17 +21,18 @@ import java.io.IOException;
import org.apache.nifi.remote.AbstractCommunicationsSession;
public class SSLSocketChannelCommunicationsSession extends AbstractCommunicationsSession {
+
private final SSLSocketChannel channel;
private final SSLSocketChannelInput request;
private final SSLSocketChannelOutput response;
-
+
public SSLSocketChannelCommunicationsSession(final SSLSocketChannel channel, final String uri) {
super(uri);
request = new SSLSocketChannelInput(channel);
response = new SSLSocketChannelOutput(channel);
this.channel = channel;
}
-
+
@Override
public SSLSocketChannelInput getInput() {
return request;
@@ -55,33 +56,33 @@ public class SSLSocketChannelCommunicationsSession extends AbstractCommunication
@Override
public void close() throws IOException {
IOException suppressed = null;
-
+
try {
request.consume();
} catch (final IOException ioe) {
suppressed = ioe;
}
-
+
try {
channel.close();
} catch (final IOException ioe) {
- if ( suppressed != null ) {
+ if (suppressed != null) {
ioe.addSuppressed(suppressed);
}
-
+
throw ioe;
}
-
- if ( suppressed != null ) {
+
+ if (suppressed != null) {
throw suppressed;
}
}
-
+
@Override
public boolean isClosed() {
return channel.isClosed();
}
-
+
@Override
public boolean isDataAvailable() {
try {
@@ -105,7 +106,7 @@ public class SSLSocketChannelCommunicationsSession extends AbstractCommunication
public void interrupt() {
channel.interrupt();
}
-
+
@Override
public String toString() {
return super.toString() + "[SSLSocketChannel=" + channel + "]";
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
index 01fb9f2..6cd2344 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
@@ -24,25 +24,26 @@ import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.remote.protocol.CommunicationsInput;
public class SSLSocketChannelInput implements CommunicationsInput {
+
private final SSLSocketChannelInputStream in;
private final ByteCountingInputStream countingIn;
private final InputStream bufferedIn;
-
+
public SSLSocketChannelInput(final SSLSocketChannel socketChannel) {
in = new SSLSocketChannelInputStream(socketChannel);
countingIn = new ByteCountingInputStream(in);
this.bufferedIn = new BufferedInputStream(countingIn);
}
-
+
@Override
public InputStream getInputStream() throws IOException {
return bufferedIn;
}
-
+
public boolean isDataAvailable() throws IOException {
return bufferedIn.available() > 0;
}
-
+
@Override
public long getBytesRead() {
return countingIn.getBytesRead();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
index dc3d68f..33d13cb 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
@@ -24,9 +24,10 @@ import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.remote.protocol.CommunicationsOutput;
public class SSLSocketChannelOutput implements CommunicationsOutput {
+
private final OutputStream out;
private final ByteCountingOutputStream countingOut;
-
+
public SSLSocketChannelOutput(final SSLSocketChannel channel) {
countingOut = new ByteCountingOutputStream(new SSLSocketChannelOutputStream(channel));
out = new BufferedOutputStream(countingOut);
@@ -36,7 +37,7 @@ public class SSLSocketChannelOutput implements CommunicationsOutput {
public OutputStream getOutputStream() throws IOException {
return out;
}
-
+
@Override
public long getBytesWritten() {
return countingOut.getBytesWritten();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
index 36a0e8d..2efea11 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
@@ -48,37 +48,27 @@ public interface ClientProtocol extends VersionedRemoteResource {
boolean isReadyForFileTransfer();
-
-
-
Transaction startTransaction(Peer peer, FlowFileCodec codec, TransferDirection direction) throws IOException;
-
-
+
/**
- * returns <code>true</code> if remote instance indicates that the port is
+ * @return <code>true</code> if remote instance indicates that the port is
* invalid
- *
- * @return
* @throws IllegalStateException if a handshake has not successfully
* completed
*/
boolean isPortInvalid() throws IllegalStateException;
/**
- * returns <code>true</code> if remote instance indicates that the port is
+ * @return <code>true</code> if remote instance indicates that the port is
* unknown
- *
- * @return
* @throws IllegalStateException if a handshake has not successfully
* completed
*/
boolean isPortUnknown();
/**
- * returns <code>true</code> if remote instance indicates that the port's
+ * @return <code>true</code> if remote instance indicates that the port's
* destination is full
- *
- * @return
* @throws IllegalStateException if a handshake has not successfully
* completed
*/
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
index 5e56902..3fa3e96 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
@@ -23,10 +23,11 @@ public interface CommunicationsInput {
/**
* Reads all data currently on the socket and throws it away
- * @throws IOException
+ *
+ * @throws IOException if unable to consume
*/
void consume() throws IOException;
-
+
InputStream getInputStream() throws IOException;
long getBytesRead();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
index d009cec..aff73ba 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
@@ -55,10 +55,8 @@ public interface CommunicationsSession extends Closeable {
void interrupt();
/**
- * Returns <code>true</code> if the connection is closed, <code>false</code>
- * otherwise.
- *
- * @return
+ * @return <code>true</code> if the connection is closed, <code>false</code>
+ * otherwise
*/
boolean isClosed();
}