You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/10 02:42:02 UTC
[1/3] incubator-nifi git commit: NIFI-282: Refactoring to allow for
separate client
Repository: incubator-nifi
Updated Branches:
refs/heads/nifi-site-to-site-client 05b64593b -> 081471c42
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index a51cdba..1e33e1f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.remote;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -23,6 +24,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -37,10 +39,9 @@ import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.remote.client.socket.EndpointConnectionState;
-import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
+import org.apache.nifi.remote.client.socket.EndpointConnection;
+import org.apache.nifi.remote.client.socket.EndpointConnectionPool;
import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
@@ -50,6 +51,7 @@ import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,9 +68,10 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
private final AtomicBoolean useCompression = new AtomicBoolean(false);
private final AtomicBoolean targetExists = new AtomicBoolean(true);
private final AtomicBoolean targetRunning = new AtomicBoolean(true);
+ private final SSLContext sslContext;
private final TransferDirection transferDirection;
- private final EndpointConnectionStatePool connectionStatePool;
+ private final AtomicReference<EndpointConnectionPool> connectionPoolRef = new AtomicReference<>();
private final Set<CommunicationsSession> activeCommsChannels = new HashSet<>();
private final Lock interruptLock = new ReentrantLock();
@@ -83,9 +86,13 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
this.remoteGroup = remoteGroup;
this.transferDirection = direction;
+ this.sslContext = sslContext;
setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
-
- connectionStatePool = remoteGroup.getConnectionPool();
+ }
+
+ private static File getPeerPersistenceFile(final String portId) {
+ final File stateDir = NiFiProperties.getInstance().getPersistentStateDirectory();
+ return new File(stateDir, portId + ".peers");
}
@Override
@@ -111,6 +118,11 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
} finally {
interruptLock.unlock();
}
+
+ final EndpointConnectionPool pool = connectionPoolRef.get();
+ if ( pool != null ) {
+ pool.shutdown();
+ }
}
@Override
@@ -123,6 +135,11 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
} finally {
interruptLock.unlock();
}
+
+ final EndpointConnectionPool connectionPool = new EndpointConnectionPool(remoteGroup.getTargetUri().toString(),
+ remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS),
+ sslContext, remoteGroup.getEventReporter(), getPeerPersistenceFile(getIdentifier()));
+ connectionPoolRef.set(connectionPool);
}
@@ -140,9 +157,10 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
String url = getRemoteProcessGroup().getTargetUri().toString();
- final EndpointConnectionState connectionState;
+ final EndpointConnectionPool connectionPool = connectionPoolRef.get();
+ final EndpointConnection connection;
try {
- connectionState = connectionStatePool.getEndpointConnectionState(this, transferDirection);
+ connection = connectionPool.getEndpointConnection(this, transferDirection);
} catch (final PortNotRunningException e) {
context.yield();
this.targetRunning.set(false);
@@ -157,7 +175,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
logger.error(message);
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
return;
- } catch (final HandshakeException | IOException e) {
+ } catch (final IOException e) {
final String message = String.format("%s failed to communicate with %s due to %s", this, url, e.toString());
logger.error(message);
if ( logger.isDebugEnabled() ) {
@@ -168,15 +186,15 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
return;
}
- if ( connectionState == null ) {
+ if ( connection == null ) {
logger.debug("{} Unable to determine the next peer to communicate with; all peers must be penalized, so yielding context", this);
context.yield();
return;
}
- FlowFileCodec codec = connectionState.getCodec();
- SocketClientProtocol protocol = connectionState.getSocketClientProtocol();
- final Peer peer = connectionState.getPeer();
+ FlowFileCodec codec = connection.getCodec();
+ SocketClientProtocol protocol = connection.getSocketClientProtocol();
+ final Peer peer = connection.getPeer();
url = peer.getUrl();
try {
@@ -194,7 +212,10 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
transferFlowFiles(peer, protocol, context, session, codec);
} else {
- receiveFlowFiles(peer, protocol, context, session, codec);
+ final int numReceived = receiveFlowFiles(peer, protocol, context, session, codec);
+ if ( numReceived == 0 ) {
+ context.yield();
+ }
}
interruptLock.lock();
@@ -210,13 +231,13 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
session.commit();
- connectionState.setLastTimeUsed();
- connectionStatePool.offer(connectionState);
+ connection.setLastTimeUsed();
+ connectionPool.offer(connection);
} catch (final TransmissionDisabledException e) {
cleanup(protocol, peer);
session.rollback();
} catch (final Exception e) {
- connectionStatePool.penalize(peer, getYieldPeriod(TimeUnit.MILLISECONDS));
+ connectionPool.penalize(peer, getYieldPeriod(TimeUnit.MILLISECONDS));
final String message = String.format("%s failed to communicate with %s (%s) due to %s", this, peer == null ? url : peer, protocol, e.toString());
logger.error(message);
@@ -261,12 +282,12 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
}
- private void transferFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
- protocol.transferFlowFiles(peer, context, session, codec);
+ private int transferFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+ return protocol.transferFlowFiles(peer, context, session, codec);
}
- private void receiveFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
- protocol.receiveFlowFiles(peer, context, session, codec);
+ private int receiveFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+ return protocol.receiveFlowFiles(peer, context, session, codec);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
deleted file mode 100644
index dca1d84..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.IOException;
-
-import org.apache.nifi.remote.AbstractCommunicationsSession;
-
-public class SSLSocketChannelCommunicationsSession extends AbstractCommunicationsSession {
- private final SSLSocketChannel channel;
- private final SSLSocketChannelInput request;
- private final SSLSocketChannelOutput response;
-
- public SSLSocketChannelCommunicationsSession(final SSLSocketChannel channel, final String uri) {
- super(uri);
- request = new SSLSocketChannelInput(channel);
- response = new SSLSocketChannelOutput(channel);
- this.channel = channel;
- }
-
- @Override
- public SSLSocketChannelInput getInput() {
- return request;
- }
-
- @Override
- public SSLSocketChannelOutput getOutput() {
- return response;
- }
-
- @Override
- public void setTimeout(final int millis) throws IOException {
- channel.setTimeout(millis);
- }
-
- @Override
- public int getTimeout() throws IOException {
- return channel.getTimeout();
- }
-
- @Override
- public void close() throws IOException {
- channel.close();
- }
-
- @Override
- public boolean isClosed() {
- return channel.isClosed();
- }
-
- @Override
- public boolean isDataAvailable() {
- try {
- return request.isDataAvailable();
- } catch (final Exception e) {
- return false;
- }
- }
-
- @Override
- public long getBytesWritten() {
- return response.getBytesWritten();
- }
-
- @Override
- public long getBytesRead() {
- return request.getBytesRead();
- }
-
- @Override
- public void interrupt() {
- channel.interrupt();
- }
-
- @Override
- public String toString() {
- return super.toString() + "[SSLSocketChannel=" + channel + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
deleted file mode 100644
index 60ef33f..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.ByteCountingInputStream;
-import org.apache.nifi.remote.protocol.CommunicationsInput;
-
-public class SSLSocketChannelInput implements CommunicationsInput {
- private final SSLSocketChannelInputStream in;
- private final ByteCountingInputStream countingIn;
- private final InputStream bufferedIn;
-
- public SSLSocketChannelInput(final SSLSocketChannel socketChannel) {
- in = new SSLSocketChannelInputStream(socketChannel);
- countingIn = new ByteCountingInputStream(in);
- this.bufferedIn = new BufferedInputStream(countingIn);
- }
-
- @Override
- public InputStream getInputStream() throws IOException {
- return bufferedIn;
- }
-
- public boolean isDataAvailable() throws IOException {
- return bufferedIn.available() > 0;
- }
-
- @Override
- public long getBytesRead() {
- return countingIn.getBytesRead();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
deleted file mode 100644
index dc3d68f..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.ByteCountingOutputStream;
-import org.apache.nifi.remote.protocol.CommunicationsOutput;
-
-public class SSLSocketChannelOutput implements CommunicationsOutput {
- private final OutputStream out;
- private final ByteCountingOutputStream countingOut;
-
- public SSLSocketChannelOutput(final SSLSocketChannel channel) {
- countingOut = new ByteCountingOutputStream(new SSLSocketChannelOutputStream(channel));
- out = new BufferedOutputStream(countingOut);
- }
-
- @Override
- public OutputStream getOutputStream() throws IOException {
- return out;
- }
-
- @Override
- public long getBytesWritten() {
- return countingOut.getBytesWritten();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index eb22b0e..63c960d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -430,7 +430,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
logger.debug("{} received {} from {}", new Object[] {this, transactionResponse, peer});
if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
- peer.penalize(port.getYieldPeriod(TimeUnit.MILLISECONDS));
+ peer.penalize(port.getIdentifier(), port.getYieldPeriod(TimeUnit.MILLISECONDS));
} else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
}
[2/3] incubator-nifi git commit: NIFI-282: Refactoring to allow for
separate client
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
deleted file mode 100644
index 8c23e28..0000000
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
+++ /dev/null
@@ -1,835 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.client.socket;
-
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-
-import javax.net.ssl.SSLContext;
-import javax.security.cert.CertificateExpiredException;
-import javax.security.cert.CertificateNotYetValidException;
-
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.PeerStatus;
-import org.apache.nifi.remote.RemoteDestination;
-import org.apache.nifi.remote.RemoteResourceInitiator;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-import org.apache.nifi.remote.cluster.ClusterNodeInformation;
-import org.apache.nifi.remote.cluster.NodeInformation;
-import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.exception.PortNotRunningException;
-import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
-import org.apache.nifi.remote.exception.UnknownPortException;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
-import org.apache.nifi.remote.util.PeerStatusCache;
-import org.apache.nifi.remote.util.RemoteNiFiUtils;
-import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.web.api.dto.ControllerDTO;
-import org.apache.nifi.web.api.dto.PortDTO;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EndpointConnectionStatePool {
- public static final long PEER_REFRESH_PERIOD = 60000L;
- public static final String CATEGORY = "Site-to-Site";
- public static final long REMOTE_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
-
- private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
-
- private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionStatePool.class);
-
- private final BlockingQueue<EndpointConnectionState> connectionStateQueue = new LinkedBlockingQueue<>();
- private final ConcurrentMap<PeerStatus, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
- private final URI clusterUrl;
- private final String apiUri;
-
- private final AtomicLong peerIndex = new AtomicLong(0L);
-
- private final ReentrantLock peerRefreshLock = new ReentrantLock();
- private volatile List<PeerStatus> peerStatuses;
- private volatile long peerRefreshTime = 0L;
- private volatile PeerStatusCache peerStatusCache;
- private final Set<CommunicationsSession> activeCommsChannels = new HashSet<>();
-
- private final File peersFile;
- private final EventReporter eventReporter;
- private final SSLContext sslContext;
- private final ScheduledExecutorService taskExecutor;
-
- private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock();
- private final Lock remoteInfoReadLock = listeningPortRWLock.readLock();
- private final Lock remoteInfoWriteLock = listeningPortRWLock.writeLock();
- private Integer siteToSitePort;
- private Boolean siteToSiteSecure;
- private long remoteRefreshTime;
- private final Map<String, String> inputPortMap = new HashMap<>(); // map input port name to identifier
- private final Map<String, String> outputPortMap = new HashMap<>(); // map output port name to identifier
-
- private volatile int commsTimeout;
-
- public EndpointConnectionStatePool(final String clusterUrl, final int commsTimeoutMillis, final EventReporter eventReporter, final File persistenceFile) {
- this(clusterUrl, commsTimeoutMillis, null, eventReporter, persistenceFile);
- }
-
- public EndpointConnectionStatePool(final String clusterUrl, final int commsTimeoutMillis, final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) {
- try {
- this.clusterUrl = new URI(clusterUrl);
- } catch (final URISyntaxException e) {
- throw new IllegalArgumentException("Invalid Cluster URL: " + clusterUrl);
- }
-
- // Trim the trailing /
- String uriPath = this.clusterUrl.getPath();
- if (uriPath.endsWith("/")) {
- uriPath = uriPath.substring(0, uriPath.length() - 1);
- }
- apiUri = this.clusterUrl.getScheme() + "://" + this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api";
-
- this.sslContext = sslContext;
- this.peersFile = persistenceFile;
- this.eventReporter = eventReporter;
- this.commsTimeout = commsTimeoutMillis;
-
- Set<PeerStatus> recoveredStatuses;
- if ( persistenceFile != null && persistenceFile.exists() ) {
- try {
- recoveredStatuses = recoverPersistedPeerStatuses(peersFile);
- this.peerStatusCache = new PeerStatusCache(recoveredStatuses, peersFile.lastModified());
- } catch (final IOException ioe) {
- logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", persistenceFile, ioe);
- }
- } else {
- peerStatusCache = null;
- }
-
- // Initialize a scheduled executor and run some maintenance tasks in the background to kill off old, unused
- // connections and keep our list of peers up-to-date.
- taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
- private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
-
- @Override
- public Thread newThread(final Runnable r) {
- final Thread thread = defaultFactory.newThread(r);
- thread.setName("NiFi Site-to-Site Connection Pool Maintenance");
- return thread;
- }
- });
-
- taskExecutor.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- refreshPeers();
- }
- }, 0, 5, TimeUnit.SECONDS);
-
- taskExecutor.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- cleanupExpiredSockets();
- }
- }, 5, 5, TimeUnit.SECONDS);
- }
-
-
- public EndpointConnectionState getEndpointConnectionState(final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
- return getEndpointConnectionState(remoteDestination, direction, null);
- }
-
-
-
- public EndpointConnectionState getEndpointConnectionState(final RemoteDestination remoteDestination, final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
- //
- // Attempt to get a connection state that already exists for this URL.
- //
- FlowFileCodec codec = null;
- CommunicationsSession commsSession = null;
- SocketClientProtocol protocol = null;
- EndpointConnectionState connectionState;
- Peer peer = null;
-
- final List<EndpointConnectionState> addBack = new ArrayList<>();
- try {
- do {
- final PeerStatus peerStatus = getNextPeerStatus(direction);
- if ( peerStatus == null ) {
- return null;
- }
-
- connectionState = connectionStateQueue.poll();
- logger.debug("{} Connection State for {} = {}", this, clusterUrl, connectionState);
-
- if ( connectionState == null && !addBack.isEmpty() ) {
- // all available connections have been penalized.
- return null;
- }
-
- if ( connectionState != null && connectionState.getPeer().isPenalized() ) {
- // we have a connection, but it's penalized. We want to add it back to the queue
- // when we've found one to use.
- addBack.add(connectionState);
- continue;
- }
-
- // if we can't get an existing ConnectionState, create one
- if ( connectionState == null ) {
- protocol = new SocketClientProtocol();
- protocol.setDestination(remoteDestination);
-
- try {
- commsSession = establishSiteToSiteConnection(peerStatus);
- final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
- final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
- try {
- RemoteResourceInitiator.initiateResourceNegotiation(protocol, dis, dos);
- } catch (final HandshakeException e) {
- try {
- commsSession.close();
- } catch (final IOException ioe) {
- throw e;
- }
- }
- } catch (final IOException e) {
- }
-
-
- final String peerUrl = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort();
- peer = new Peer(commsSession, peerUrl, clusterUrl.toString());
-
- // set properties based on config
- if ( config != null ) {
- protocol.setTimeout((int) config.getTimeout(TimeUnit.MILLISECONDS));
- protocol.setPreferredBatchCount(config.getPreferredBatchCount());
- protocol.setPreferredBatchSize(config.getPreferredBatchSize());
- protocol.setPreferredBatchDuration(config.getPreferredBatchDuration(TimeUnit.MILLISECONDS));
- }
-
- // perform handshake
- try {
- protocol.handshake(peer);
-
- // handle error cases
- if ( protocol.isDestinationFull() ) {
- logger.warn("{} {} indicates that port's destination is full; penalizing peer", this, peer);
- penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
- connectionStateQueue.offer(connectionState);
- continue;
- } else if ( protocol.isPortInvalid() ) {
- penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
- cleanup(protocol, peer);
- throw new PortNotRunningException(peer.toString() + " indicates that port " + remoteDestination.getIdentifier() + " is not running");
- } else if ( protocol.isPortUnknown() ) {
- penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
- cleanup(protocol, peer);
- throw new UnknownPortException(peer.toString() + " indicates that port " + remoteDestination.getIdentifier() + " is not known");
- }
-
- // negotiate the FlowFileCodec to use
- codec = protocol.negotiateCodec(peer);
- } catch (final PortNotRunningException | UnknownPortException e) {
- throw e;
- } catch (final Exception e) {
- penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
- cleanup(protocol, peer);
-
- final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? clusterUrl : peer, e.toString());
- logger.error(message);
- if ( logger.isDebugEnabled() ) {
- logger.error("", e);
- }
- throw e;
- }
-
- connectionState = new EndpointConnectionState(peer, protocol, codec);
- } else {
- final long lastTimeUsed = connectionState.getLastTimeUsed();
- final long millisSinceLastUse = System.currentTimeMillis() - lastTimeUsed;
-
- if ( commsTimeout > 0L && millisSinceLastUse >= commsTimeout ) {
- cleanup(connectionState.getSocketClientProtocol(), connectionState.getPeer());
- connectionState = null;
- } else {
- codec = connectionState.getCodec();
- peer = connectionState.getPeer();
- commsSession = peer.getCommunicationsSession();
- protocol = connectionState.getSocketClientProtocol();
- }
- }
- } while ( connectionState == null || codec == null || commsSession == null || protocol == null );
- } finally {
- if ( !addBack.isEmpty() ) {
- connectionStateQueue.addAll(addBack);
- }
- }
-
- return connectionState;
- }
-
-
- public boolean offer(final EndpointConnectionState endpointConnectionState) {
- final Peer peer = endpointConnectionState.getPeer();
- if ( peer == null ) {
- return false;
- }
-
- final String url = peer.getUrl();
- if ( url == null ) {
- return false;
- }
-
- return connectionStateQueue.offer(endpointConnectionState);
- }
-
- /**
- * Updates internal state map to penalize a PeerStatus that points to the specified peer
- * @param peer
- */
- public void penalize(final Peer peer, final long penalizationMillis) {
- String host;
- int port;
- try {
- final URI uri = new URI(peer.getUrl());
- host = uri.getHost();
- port = uri.getPort();
- } catch (final URISyntaxException e) {
- host = peer.getHost();
- port = -1;
- }
-
- final PeerStatus status = new PeerStatus(host, port, true, 1);
- Long expiration = peerTimeoutExpirations.get(status);
- if ( expiration == null ) {
- expiration = Long.valueOf(0L);
- }
-
- final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis);
- peerTimeoutExpirations.put(status, Long.valueOf(newExpiration));
- }
-
- private void cleanup(final SocketClientProtocol protocol, final Peer peer) {
- if ( protocol != null && peer != null ) {
- try {
- protocol.shutdown(peer);
- } catch (final TransmissionDisabledException e) {
- // User disabled transmission.... do nothing.
- logger.debug(this + " Transmission Disabled by User");
- } catch (IOException e1) {
- }
- }
-
- if ( peer != null ) {
- try {
- peer.close();
- } catch (final TransmissionDisabledException e) {
- // User disabled transmission.... do nothing.
- logger.debug(this + " Transmission Disabled by User");
- } catch (IOException e1) {
- }
- }
- }
-
- private PeerStatus getNextPeerStatus(final TransferDirection direction) {
- List<PeerStatus> peerList = peerStatuses;
- if ( (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD) && peerRefreshLock.tryLock() ) {
- try {
- try {
- peerList = createPeerStatusList(direction);
- } catch (final Exception e) {
- final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString());
- logger.warn(message);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", e);
- }
-
- if ( eventReporter != null ) {
- eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
- }
- }
-
- this.peerStatuses = peerList;
- peerRefreshTime = System.currentTimeMillis();
- } finally {
- peerRefreshLock.unlock();
- }
- }
-
- if ( peerList == null || peerList.isEmpty() ) {
- return null;
- }
-
- PeerStatus peerStatus;
- for (int i=0; i < peerList.size(); i++) {
- final long idx = peerIndex.getAndIncrement();
- final int listIndex = (int) (idx % peerList.size());
- peerStatus = peerList.get(listIndex);
-
- if ( isPenalized(peerStatus) ) {
- logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus);
- } else {
- return peerStatus;
- }
- }
-
- logger.debug("{} All peers appear to be penalized; returning null", this);
- return null;
- }
-
- private boolean isPenalized(final PeerStatus peerStatus) {
- final Long expirationEnd = peerTimeoutExpirations.get(peerStatus);
- return (expirationEnd == null ? false : expirationEnd > System.currentTimeMillis() );
- }
-
- private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException {
- final Set<PeerStatus> statuses = getPeerStatuses();
- if ( statuses == null ) {
- return new ArrayList<>();
- }
-
- final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
- final List<NodeInformation> nodeInfos = new ArrayList<>();
- for ( final PeerStatus peerStatus : statuses ) {
- final NodeInformation nodeInfo = new NodeInformation(peerStatus.getHostname(), peerStatus.getPort(), 0, peerStatus.isSecure(), peerStatus.getFlowFileCount());
- nodeInfos.add(nodeInfo);
- }
- clusterNodeInfo.setNodeInformation(nodeInfos);
- return formulateDestinationList(clusterNodeInfo, direction);
- }
-
-
- private Set<PeerStatus> getPeerStatuses() {
- final PeerStatusCache cache = this.peerStatusCache;
- if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) {
- return null;
- }
-
- if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) {
- final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size());
- for (final PeerStatus status : cache.getStatuses()) {
- final PeerStatus equalizedStatus = new PeerStatus(status.getHostname(), status.getPort(), status.isSecure(), 1);
- equalizedSet.add(equalizedStatus);
- }
-
- return equalizedSet;
- }
-
- return cache.getStatuses();
- }
-
- private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException, HandshakeException, UnknownPortException, PortNotRunningException {
- final String hostname = clusterUrl.getHost();
- final int port = getSiteToSitePort();
-
- final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port);
- final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString());
- final SocketClientProtocol clientProtocol = new SocketClientProtocol();
- final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
- final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
- RemoteResourceInitiator.initiateResourceNegotiation(clientProtocol, dis, dos);
-
- clientProtocol.setTimeout(commsTimeout);
- clientProtocol.handshake(peer, null);
- final Set<PeerStatus> peerStatuses = clientProtocol.getPeerStatuses(peer);
- persistPeerStatuses(peerStatuses);
-
- try {
- clientProtocol.shutdown(peer);
- } catch (final IOException e) {
- final String message = String.format("%s Failed to shutdown protocol when updating list of peers due to %s", this, e.toString());
- logger.warn(message);
- if (logger.isDebugEnabled()) {
- logger.warn("", e);
- }
- }
-
- try {
- peer.close();
- } catch (final IOException e) {
- final String message = String.format("%s Failed to close resources when updating list of peers due to %s", this, e.toString());
- logger.warn(message);
- if (logger.isDebugEnabled()) {
- logger.warn("", e);
- }
- }
-
- return peerStatuses;
- }
-
-
- private void persistPeerStatuses(final Set<PeerStatus> statuses) {
- if ( peersFile == null ) {
- return;
- }
-
- try (final OutputStream fos = new FileOutputStream(peersFile);
- final OutputStream out = new BufferedOutputStream(fos)) {
-
- for (final PeerStatus status : statuses) {
- final String line = status.getHostname() + ":" + status.getPort() + ":" + status.isSecure() + "\n";
- out.write(line.getBytes(StandardCharsets.UTF_8));
- }
-
- } catch (final IOException e) {
- logger.error("Failed to persist list of Peers due to {}; if restarted and peer's NCM is down, may be unable to transfer data until communications with NCM are restored", e.toString(), e);
- }
- }
-
- private Set<PeerStatus> recoverPersistedPeerStatuses(final File file) throws IOException {
- if (!file.exists()) {
- return null;
- }
-
- final Set<PeerStatus> statuses = new HashSet<>();
- try (final InputStream fis = new FileInputStream(file);
- final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) {
-
- String line;
- while ((line = reader.readLine()) != null) {
- final String[] splits = line.split(Pattern.quote(":"));
- if (splits.length != 3) {
- continue;
- }
-
- final String hostname = splits[0];
- final int port = Integer.parseInt(splits[1]);
- final boolean secure = Boolean.parseBoolean(splits[2]);
-
- statuses.add(new PeerStatus(hostname, port, secure, 1));
- }
- }
-
- return statuses;
- }
-
-
- private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException {
- return establishSiteToSiteConnection(peerStatus.getHostname(), peerStatus.getPort());
- }
-
- private CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port) throws IOException {
- if ( siteToSiteSecure == null ) {
- throw new IOException("Remote NiFi instance " + clusterUrl + " is not currently configured to accept site-to-site connections");
- }
-
- final String destinationUri = "nifi://" + hostname + ":" + port;
-
- CommunicationsSession commsSession = null;
- try {
- if ( siteToSiteSecure ) {
- if ( sslContext == null ) {
- throw new IOException("Unable to communicate with " + hostname + ":" + port + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications");
- }
-
- final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
- socketChannel.connect();
-
- commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
-
- try {
- commsSession.setUserDn(socketChannel.getDn());
- } catch (final CertificateNotYetValidException | CertificateExpiredException ex) {
- throw new IOException(ex);
- }
- } else {
- final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
- commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri);
- }
-
- commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
- commsSession.setUri(destinationUri);
- } catch (final IOException ioe) {
- if ( commsSession != null ) {
- commsSession.close();
- }
-
- throw ioe;
- }
-
- return commsSession;
- }
-
-
- static List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo, final TransferDirection direction) {
- final Collection<NodeInformation> nodeInfoSet = clusterNodeInfo.getNodeInformation();
- final int numDestinations = Math.max(128, nodeInfoSet.size());
- final Map<NodeInformation, Integer> entryCountMap = new HashMap<>();
-
- long totalFlowFileCount = 0L;
- for (final NodeInformation nodeInfo : nodeInfoSet) {
- totalFlowFileCount += nodeInfo.getTotalFlowFiles();
- }
-
- int totalEntries = 0;
- for (final NodeInformation nodeInfo : nodeInfoSet) {
- final int flowFileCount = nodeInfo.getTotalFlowFiles();
- // don't allow any node to get more than 80% of the data
- final double percentageOfFlowFiles = Math.min(0.8D, ((double) flowFileCount / (double) totalFlowFileCount));
- final double relativeWeighting = (direction == TransferDirection.RECEIVE) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles;
- final int entries = Math.max(1, (int) (numDestinations * relativeWeighting));
-
- entryCountMap.put(nodeInfo, Math.max(1, entries));
- totalEntries += entries;
- }
-
- final List<PeerStatus> destinations = new ArrayList<>(totalEntries);
- for (int i=0; i < totalEntries; i++) {
- destinations.add(null);
- }
- for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) {
- final NodeInformation nodeInfo = entry.getKey();
- final int numEntries = entry.getValue();
-
- int skipIndex = numEntries;
- for (int i=0; i < numEntries; i++) {
- int n = (skipIndex * i);
- while (true) {
- final int index = n % destinations.size();
- PeerStatus status = destinations.get(index);
- if ( status == null ) {
- status = new PeerStatus(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure(), nodeInfo.getTotalFlowFiles());
- destinations.set(index, status);
- break;
- } else {
- n++;
- }
- }
- }
- }
-
- final StringBuilder distributionDescription = new StringBuilder();
- distributionDescription.append("New Weighted Distribution of Nodes:");
- for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) {
- final double percentage = entry.getValue() * 100D / (double) destinations.size();
- distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of FlowFiles");
- }
- logger.info(distributionDescription.toString());
-
- // Jumble the list of destinations.
- return destinations;
- }
-
-
- private void cleanupExpiredSockets() {
- final List<EndpointConnectionState> states = new ArrayList<>();
-
- EndpointConnectionState state;
- while ((state = connectionStateQueue.poll()) != null) {
- // If the socket has not been used in 10 seconds, shut it down.
- final long lastUsed = state.getLastTimeUsed();
- if ( lastUsed < System.currentTimeMillis() - 10000L ) {
- try {
- state.getSocketClientProtocol().shutdown(state.getPeer());
- } catch (final Exception e) {
- logger.debug("Failed to shut down {} using {} due to {}",
- new Object[] {state.getSocketClientProtocol(), state.getPeer(), e} );
- }
-
- cleanup(state.getSocketClientProtocol(), state.getPeer());
- } else {
- states.add(state);
- }
- }
-
- connectionStateQueue.addAll(states);
- }
-
- public void shutdown() {
- taskExecutor.shutdown();
- peerTimeoutExpirations.clear();
-
- for ( final CommunicationsSession commsSession : activeCommsChannels ) {
- commsSession.interrupt();
- }
-
- EndpointConnectionState state;
- while ( (state = connectionStateQueue.poll()) != null) {
- cleanup(state.getSocketClientProtocol(), state.getPeer());
- }
- }
-
- public void terminate(final EndpointConnectionState state) {
- cleanup(state.getSocketClientProtocol(), state.getPeer());
- }
-
- private void refreshPeers() {
- final PeerStatusCache existingCache = peerStatusCache;
- if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis())) {
- return;
- }
-
- try {
- final Set<PeerStatus> statuses = fetchRemotePeerStatuses();
- peerStatusCache = new PeerStatusCache(statuses);
- logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size());
- } catch (Exception e) {
- logger.warn("{} Unable to refresh Remote Group's peers due to {}", this, e);
- if (logger.isDebugEnabled()) {
- logger.warn("", e);
- }
- }
- }
-
-
- public String getInputPortIdentifier(final String portName) throws IOException {
- return getPortIdentifier(portName, inputPortMap);
- }
-
- public String getOutputPortIdentifier(final String portName) throws IOException {
- return getPortIdentifier(portName, outputPortMap);
- }
-
-
- private String getPortIdentifier(final String portName, final Map<String, String> portMap) throws IOException {
- String identifier;
- remoteInfoReadLock.lock();
- try {
- identifier = portMap.get(portName);
- } finally {
- remoteInfoReadLock.unlock();
- }
-
- if ( identifier != null ) {
- return identifier;
- }
-
- refreshRemoteInfo();
-
- remoteInfoReadLock.lock();
- try {
- return portMap.get(portName);
- } finally {
- remoteInfoReadLock.unlock();
- }
- }
-
-
- private ControllerDTO refreshRemoteInfo() throws IOException {
- final boolean webInterfaceSecure = clusterUrl.toString().startsWith("https");
- final RemoteNiFiUtils utils = new RemoteNiFiUtils(webInterfaceSecure ? sslContext : null);
- final ControllerDTO controller = utils.getController(URI.create(apiUri + "/controller"), commsTimeout);
-
- remoteInfoWriteLock.lock();
- try {
- this.siteToSitePort = controller.getRemoteSiteListeningPort();
- this.siteToSiteSecure = controller.isSiteToSiteSecure();
-
- inputPortMap.clear();
- for (final PortDTO inputPort : controller.getInputPorts()) {
- inputPortMap.put(inputPort.getName(), inputPort.getId());
- }
-
- outputPortMap.clear();
- for ( final PortDTO outputPort : controller.getOutputPorts()) {
- outputPortMap.put(outputPort.getName(), outputPort.getId());
- }
-
- this.remoteRefreshTime = System.currentTimeMillis();
- } finally {
- remoteInfoWriteLock.unlock();
- }
-
- return controller;
- }
-
- /**
- * @return the port that the remote instance is listening on for
- * site-to-site communication, or <code>null</code> if the remote instance
- * is not configured to allow site-to-site communications.
- *
- * @throws IOException if unable to communicate with the remote instance
- */
- private Integer getSiteToSitePort() throws IOException {
- Integer listeningPort;
- remoteInfoReadLock.lock();
- try {
- listeningPort = this.siteToSitePort;
- if (listeningPort != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
- return listeningPort;
- }
- } finally {
- remoteInfoReadLock.unlock();
- }
-
- final ControllerDTO controller = refreshRemoteInfo();
- listeningPort = controller.getRemoteSiteListeningPort();
-
- return listeningPort;
- }
-
- /**
- * Returns {@code true} if the remote instance is configured for secure site-to-site communications,
- * {@code false} otherwise.
- *
- * @return
- * @throws IOException
- */
- public boolean isSecure() throws IOException {
- remoteInfoReadLock.lock();
- try {
- final Boolean secure = this.siteToSiteSecure;
- if (secure != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
- return secure;
- }
- } finally {
- remoteInfoReadLock.unlock();
- }
-
- final ControllerDTO controller = refreshRemoteInfo();
- return controller.isSiteToSiteSecure();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index 0494d04..6fa934b 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -24,23 +24,23 @@ import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.exception.PortNotRunningException;
-import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.util.ObjectHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SocketClient implements SiteToSiteClient {
+ private static final Logger logger = LoggerFactory.getLogger(SocketClient.class);
+
private final SiteToSiteClientConfig config;
- private final EndpointConnectionStatePool pool;
+ private final EndpointConnectionPool pool;
private final boolean compress;
private final String portName;
private final long penalizationNanos;
private volatile String portIdentifier;
public SocketClient(final SiteToSiteClientConfig config) {
- pool = new EndpointConnectionStatePool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS),
+ pool = new EndpointConnectionPool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS),
config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile());
this.config = config;
@@ -66,44 +66,55 @@ public class SocketClient implements SiteToSiteClient {
return id;
}
+ final String portId;
if ( direction == TransferDirection.SEND ) {
- return pool.getInputPortIdentifier(this.portName);
+ portId = pool.getInputPortIdentifier(this.portName);
} else {
- return pool.getOutputPortIdentifier(this.portName);
+ portId = pool.getOutputPortIdentifier(this.portName);
}
+
+ if (portId == null) {
+ logger.debug("Unable to resolve port [{}] to an identifier", portName);
+ } else {
+ logger.debug("Resolved port [{}] to identifier [{}]", portName, portId);
+ }
+
+ return portId;
}
+ private RemoteDestination createRemoteDestination(final String portId) {
+ return new RemoteDestination() {
+ @Override
+ public String getIdentifier() {
+ return portId;
+ }
+
+ @Override
+ public long getYieldPeriod(final TimeUnit timeUnit) {
+ return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public boolean isUseCompression() {
+ return compress;
+ }
+ };
+ }
+
@Override
public Transaction createTransaction(final TransferDirection direction) throws IOException {
- final String portId = getPortIdentifier(TransferDirection.SEND);
+ final String portId = getPortIdentifier(direction);
if ( portId == null ) {
- throw new IOException("Could not find Port with name " + portName + " for remote NiFi instance");
+ throw new IOException("Could not find Port with name '" + portName + "' for remote NiFi instance");
}
- final RemoteDestination remoteDestination = new RemoteDestination() {
- @Override
- public String getIdentifier() {
- return portId;
- }
-
- @Override
- public long getYieldPeriod(final TimeUnit timeUnit) {
- return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS);
- }
-
- @Override
- public boolean isUseCompression() {
- return compress;
- }
- };
+ final RemoteDestination remoteDestination = createRemoteDestination(portId);
- final EndpointConnectionState connectionState;
- try {
- connectionState = pool.getEndpointConnectionState(remoteDestination, direction);
- } catch (final ProtocolException | HandshakeException | PortNotRunningException | UnknownPortException e) {
- throw new IOException(e);
+ final EndpointConnection connectionState = pool.getEndpointConnection(remoteDestination, direction, getConfig());
+ if ( connectionState == null ) {
+ return null;
}
final Transaction transaction = connectionState.getSocketClientProtocol().startTransaction(
@@ -111,7 +122,7 @@ public class SocketClient implements SiteToSiteClient {
// Wrap the transaction in a new one that will return the EndpointConnectionState back to the pool whenever
// the transaction is either completed or canceled.
- final ObjectHolder<EndpointConnectionState> connectionStateRef = new ObjectHolder<>(connectionState);
+ final ObjectHolder<EndpointConnection> connectionStateRef = new ObjectHolder<>(connectionState);
return new Transaction() {
@Override
public void confirm() throws IOException {
@@ -119,11 +130,16 @@ public class SocketClient implements SiteToSiteClient {
}
@Override
+ public void complete() throws IOException {
+ complete(false);
+ }
+
+ @Override
public void complete(final boolean requestBackoff) throws IOException {
try {
transaction.complete(requestBackoff);
} finally {
- final EndpointConnectionState state = connectionStateRef.get();
+ final EndpointConnection state = connectionStateRef.get();
if ( state != null ) {
pool.offer(connectionState);
connectionStateRef.set(null);
@@ -136,7 +152,7 @@ public class SocketClient implements SiteToSiteClient {
try {
transaction.cancel(explanation);
} finally {
- final EndpointConnectionState state = connectionStateRef.get();
+ final EndpointConnection state = connectionStateRef.get();
if ( state != null ) {
pool.terminate(connectionState);
connectionStateRef.set(null);
@@ -149,7 +165,7 @@ public class SocketClient implements SiteToSiteClient {
try {
transaction.error();
} finally {
- final EndpointConnectionState state = connectionStateRef.get();
+ final EndpointConnection state = connectionStateRef.get();
if ( state != null ) {
pool.terminate(connectionState);
connectionStateRef.set(null);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
index b61fc65..d4d55e1 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
@@ -16,8 +16,15 @@
*/
package org.apache.nifi.remote.exception;
-public class HandshakeException extends Exception {
+import java.io.IOException;
+
+/**
+ * A HandshakeException occurs when the client and the remote NiFi instance do not agree
+ * on some condition during the handshake. For example, if the NiFi instance does not recognize
+ * one of the parameters that the client passes during the Handshaking phase.
+ */
+public class HandshakeException extends IOException {
private static final long serialVersionUID = 178192341908726L;
public HandshakeException(final String message) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
index af0f467..8b97832 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
@@ -16,8 +16,12 @@
*/
package org.apache.nifi.remote.exception;
-public class PortNotRunningException extends Exception {
-
+/**
+ * PortNotRunningException occurs when the remote NiFi instance reports
+ * that the Port that the client is attempting to communicate with is not
+ * currently running and therefore communications with that Port are not allowed.
+ */
+public class PortNotRunningException extends ProtocolException {
private static final long serialVersionUID = -2790940982005516375L;
public PortNotRunningException(final String message) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
index e12348a..45a4e15 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
@@ -18,6 +18,10 @@ package org.apache.nifi.remote.exception;
import java.io.IOException;
+/**
+ * A ProtocolException occurs when unexpected data is received, for example
+ * an invalid Response Code.
+ */
public class ProtocolException extends IOException {
private static final long serialVersionUID = 5763900324505818495L;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
index e6a0fe7..592a1b3 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
@@ -16,8 +16,11 @@
*/
package org.apache.nifi.remote.exception;
-public class UnknownPortException extends Exception {
-
+/**
+ * An UnknownPortException indicates that the remote NiFi instance has reported that
+ * the endpoint that the client attempted to communicate with does not exist.
+ */
+public class UnknownPortException extends ProtocolException {
private static final long serialVersionUID = -2790940982005516375L;
public UnknownPortException(final String message) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
index 9e451fd..7dffddd 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
@@ -63,4 +63,9 @@ public class SocketChannelInput implements CommunicationsInput {
public void interrupt() {
interruptableIn.interrupt();
}
+
+ @Override
+ public void consume() throws IOException {
+ socketIn.consume();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
index 60ef33f..01fb9f2 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
@@ -47,4 +47,9 @@ public class SSLSocketChannelInput implements CommunicationsInput {
public long getBytesRead() {
return countingIn.getBytesRead();
}
+
+ @Override
+ public void consume() throws IOException {
+ in.consume();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
index befbdaa..36a0e8d 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
@@ -40,9 +40,9 @@ public interface ClientProtocol extends VersionedRemoteResource {
FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException;
- void receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
+ int receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
- void transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
+ int transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
void shutdown(Peer peer) throws IOException, ProtocolException;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
index d2e2946..5e56902 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
@@ -21,6 +21,12 @@ import java.io.InputStream;
public interface CommunicationsInput {
+ /**
+ * Reads all data currently on the socket and throws it away
+ * @throws IOException
+ */
+ void consume() throws IOException;
+
InputStream getInputStream() throws IOException;
long getBytesRead();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 5f194f8..e321663 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -150,6 +150,7 @@ public class SocketClientProtocol implements ClientProtocol {
}
}
+ logger.debug("Handshaking with properties {}", properties);
dos.writeInt(properties.size());
for ( final Map.Entry<HandshakeProperty, String> entry : properties.entrySet() ) {
dos.writeUTF(entry.getKey().name());
@@ -269,13 +270,13 @@ public class SocketClientProtocol implements ClientProtocol {
throw new IllegalStateException("Cannot start transaction; handshake resolution was " + handshakeResponse);
}
- return new SocketClientTransaction(versionNegotiator.getVersion(), peer, codec,
+ return new SocketClientTransaction(versionNegotiator.getVersion(), destination.getIdentifier(), peer, codec,
direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS));
}
@Override
- public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+ public int receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
final String userDn = peer.getCommunicationsSession().getUserDn();
final Transaction transaction = startTransaction(peer, codec, TransferDirection.RECEIVE);
@@ -288,7 +289,7 @@ public class SocketClientProtocol implements ClientProtocol {
final DataPacket dataPacket = transaction.receive();
if ( dataPacket == null ) {
if ( flowFilesReceived.isEmpty() ) {
- peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ peer.penalize(destination.getIdentifier(), destination.getYieldPeriod(TimeUnit.MILLISECONDS));
}
break;
}
@@ -322,25 +323,25 @@ public class SocketClientProtocol implements ClientProtocol {
transaction.complete(applyBackpressure);
logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
- if ( flowFilesReceived.isEmpty() ) {
- return;
+ if ( !flowFilesReceived.isEmpty() ) {
+ stopWatch.stop();
+ final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
+ final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
+ final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ final String dataSize = FormatUtils.formatDataSize(bytesReceived);
+ logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] {
+ this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate });
}
- stopWatch.stop();
- final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
- final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
- final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
- final String dataSize = FormatUtils.formatDataSize(bytesReceived);
- logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] {
- this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate });
+ return flowFilesReceived.size();
}
@Override
- public void transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+ public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
FlowFile flowFile = session.get();
if (flowFile == null) {
- return;
+ return 0;
}
try {
@@ -401,6 +402,8 @@ public class SocketClientProtocol implements ClientProtocol {
final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+
+ return flowFilesSent.size();
} catch (final Exception e) {
session.rollback();
throw e;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index edb360e..cf8f9b2 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -19,6 +19,7 @@ package org.apache.nifi.remote.protocol.socket;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
@@ -29,6 +30,8 @@ import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.io.CompressionInputStream;
+import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.RequestType;
import org.slf4j.Logger;
@@ -47,14 +50,16 @@ public class SocketClientTransaction implements Transaction {
private final boolean compress;
private final Peer peer;
private final int penaltyMillis;
+ private final String destinationId;
private boolean dataAvailable = false;
private int transfers = 0;
private TransactionState state;
- SocketClientTransaction(final int protocolVersion, final Peer peer, final FlowFileCodec codec,
+ SocketClientTransaction(final int protocolVersion, final String destinationId, final Peer peer, final FlowFileCodec codec,
final TransferDirection direction, final boolean useCompression, final int penaltyMillis) throws IOException {
this.protocolVersion = protocolVersion;
+ this.destinationId = destinationId;
this.peer = peer;
this.codec = codec;
this.direction = direction;
@@ -140,7 +145,8 @@ public class SocketClientTransaction implements Transaction {
}
logger.debug("{} Receiving data from {}", this, peer);
- final DataPacket packet = codec.decode(new CheckedInputStream(dis, crc));
+ final InputStream dataIn = compress ? new CompressionInputStream(dis) : dis;
+ final DataPacket packet = codec.decode(new CheckedInputStream(dataIn, crc));
if ( packet == null ) {
this.dataAvailable = false;
@@ -174,7 +180,8 @@ public class SocketClientTransaction implements Transaction {
logger.debug("{} Sending data to {}", this, peer);
- final OutputStream out = new CheckedOutputStream(dos, crc);
+ final OutputStream dataOut = compress ? new CompressionOutputStream(dos) : dos;
+ final OutputStream out = new CheckedOutputStream(dataOut, crc);
codec.encode(dataPacket, out);
// need to close the CompressionOutputStream in order to force it write out any remaining bytes.
@@ -208,6 +215,10 @@ public class SocketClientTransaction implements Transaction {
}
}
+ @Override
+ public void complete() throws IOException {
+ complete(false);
+ }
@Override
public void complete(boolean requestBackoff) throws IOException {
@@ -246,7 +257,7 @@ public class SocketClientTransaction implements Transaction {
logger.debug("{} Received {} from {}", this, transactionResponse, peer);
if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
- peer.penalize(penaltyMillis);
+ peer.penalize(destinationId, penaltyMillis);
} else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
index d8899ea..275e40c 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
@@ -39,7 +39,7 @@ public class TestEndpointConnectionStatePool {
collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096));
clusterNodeInfo.setNodeInformation(collection);
- final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+ final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
for ( final PeerStatus peerStatus : destinations ) {
System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
}
@@ -53,7 +53,7 @@ public class TestEndpointConnectionStatePool {
collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 50000));
clusterNodeInfo.setNodeInformation(collection);
- final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+ final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
for ( final PeerStatus peerStatus : destinations ) {
System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
}
@@ -73,7 +73,7 @@ public class TestEndpointConnectionStatePool {
collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096));
clusterNodeInfo.setNodeInformation(collection);
- final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+ final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
for ( final PeerStatus peerStatus : destinations ) {
System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
}
@@ -87,7 +87,7 @@ public class TestEndpointConnectionStatePool {
collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 50000));
clusterNodeInfo.setNodeInformation(collection);
- final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+ final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
for ( final PeerStatus peerStatus : destinations ) {
System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
new file mode 100644
index 0000000..a744905
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.socket;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.util.StandardDataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestSiteToSiteClient {
+
+ @Test
+ @Ignore("For local testing only; not really a unit test but a manual test")
+ public void testReceive() throws IOException {
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
+
+ final SiteToSiteClient client = new SiteToSiteClient.Builder()
+ .url("http://localhost:8080/nifi")
+ .portName("out")
+ .requestBatchCount(1)
+ .build();
+
+ try {
+ final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
+ Assert.assertNotNull(transaction);
+
+ final DataPacket packet = transaction.receive();
+ Assert.assertNotNull(packet);
+
+ final InputStream in = packet.getData();
+ final long size = packet.getSize();
+ final byte[] buff = new byte[(int) size];
+
+ StreamUtils.fillBuffer(in, buff);
+ System.out.println(buff.length);
+
+ Assert.assertNull(transaction.receive());
+
+ transaction.confirm();
+ transaction.complete(false);
+ } finally {
+ client.close();
+ }
+ }
+
+
+ @Test
+ @Ignore("For local testing only; not really a unit test but a manual test")
+ public void testSend() throws IOException {
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
+
+ final SiteToSiteClient client = new SiteToSiteClient.Builder()
+ .url("http://localhost:8080/nifi")
+ .portName("in")
+ .build();
+
+ try {
+ final Transaction transaction = client.createTransaction(TransferDirection.SEND);
+ Assert.assertNotNull(transaction);
+
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put("site-to-site", "yes, please!");
+ final byte[] bytes = "Hello".getBytes();
+ final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length);
+ transaction.send(packet);
+
+ transaction.confirm();
+ transaction.complete(false);
+ } finally {
+ client.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
index 32a3f26..f68c874 100644
--- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
@@ -43,6 +43,16 @@ public class SocketChannelInputStream extends InputStream {
public void setTimeout(final int timeoutMillis) {
this.timeoutMillis = timeoutMillis;
}
+
+ public void consume() throws IOException {
+ final byte[] b = new byte[4096];
+ final ByteBuffer buffer = ByteBuffer.wrap(b);
+ int bytesRead;
+ do {
+ bytesRead = channel.read(buffer);
+ buffer.flip();
+ } while ( bytesRead > 0 );
+ }
@Override
public int read() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
index 5810488..7c74b20 100644
--- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
@@ -258,6 +258,16 @@ public class SSLSocketChannel implements Closeable {
}
}
+ public void consume() throws IOException {
+ final byte[] b = new byte[4096];
+ final ByteBuffer buffer = ByteBuffer.wrap(b);
+ int readCount;
+ do {
+ readCount = channel.read(buffer);
+ buffer.flip();
+ } while (readCount > 0);
+ }
+
private int readData(final ByteBuffer dest) throws IOException {
final long startTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
index 154bd08..6fb79d4 100644
--- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
@@ -27,6 +27,10 @@ public class SSLSocketChannelInputStream extends InputStream {
this.channel = channel;
}
+ public void consume() throws IOException {
+ channel.consume();
+ }
+
@Override
public int read() throws IOException {
return channel.read();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index ac41cba..c842195 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -24,9 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.exception.CommunicationsException;
import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteGroupPort;
-import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
public interface RemoteProcessGroup {
@@ -81,8 +79,6 @@ public interface RemoteProcessGroup {
String getYieldDuration();
- EndpointConnectionStatePool getConnectionPool();
-
/**
* Sets the timeout using the TimePeriod format (e.g., "30 secs", "1 min")
*
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 7cb2874..54f0807 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -48,6 +48,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.SSLContext;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.UserService;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
@@ -128,14 +129,12 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
import org.apache.nifi.groups.StandardProcessGroup;
-import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.logging.ProcessorLogObserver;
import org.apache.nifi.nar.ExtensionManager;
-import org.apache.nifi.nar.NarClassLoader;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.nar.NarThreadContextClassLoader;
import org.apache.nifi.processor.Processor;
@@ -165,6 +164,7 @@ import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
@@ -184,7 +184,6 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index db0aeb7..79ef7a8 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -56,7 +56,6 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
-import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
import org.apache.nifi.remote.util.RemoteNiFiUtils;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
@@ -130,7 +129,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private volatile String authorizationIssue;
- private final EndpointConnectionStatePool endpointConnectionPool;
private final ScheduledExecutorService backgroundThreadExecutor;
public StandardRemoteProcessGroup(final String id, final String targetUri, final ProcessGroup processGroup,
@@ -172,13 +170,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
};
- endpointConnectionPool = new EndpointConnectionStatePool(getTargetUri().toString(), getCommunicationsTimeout(TimeUnit.MILLISECONDS),
- sslContext, eventReporter, getPeerPersistenceFile());
-
final Runnable checkAuthorizations = new InitializationTask();
-
backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id + ": " + targetUri);
- backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 0L, 30L, TimeUnit.SECONDS);
+ backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 5L, 30L, TimeUnit.SECONDS);
}
@Override
@@ -200,7 +194,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
@Override
public void shutdown() {
backgroundThreadExecutor.shutdown();
- endpointConnectionPool.shutdown();
}
@Override
@@ -1222,11 +1215,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
@Override
- public EndpointConnectionStatePool getConnectionPool() {
- return endpointConnectionPool;
- }
-
- @Override
public void verifyCanDelete() {
verifyCanDelete(false);
}
[3/3] incubator-nifi git commit: NIFI-282: Refactoring to allow for
separate client
Posted by ma...@apache.org.
NIFI-282: Refactoring to allow for separate client
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/081471c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/081471c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/081471c4
Branch: refs/heads/nifi-site-to-site-client
Commit: 081471c420f113f0eb1440df74f7dff0e04067ec
Parents: 05b6459
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Feb 9 20:41:39 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 9 20:41:39 2015 -0500
----------------------------------------------------------------------
.../main/java/org/apache/nifi/remote/Peer.java | 31 +-
.../org/apache/nifi/remote/Transaction.java | 10 +
.../nifi/remote/client/SiteToSiteClient.java | 14 +-
.../client/socket/EndpointConnection.java | 54 ++
.../client/socket/EndpointConnectionPool.java | 855 +++++++++++++++++++
.../client/socket/EndpointConnectionState.java | 54 --
.../socket/EndpointConnectionStatePool.java | 835 ------------------
.../nifi/remote/client/socket/SocketClient.java | 86 +-
.../remote/exception/HandshakeException.java | 9 +-
.../exception/PortNotRunningException.java | 8 +-
.../remote/exception/ProtocolException.java | 4 +
.../remote/exception/UnknownPortException.java | 7 +-
.../remote/io/socket/SocketChannelInput.java | 5 +
.../io/socket/ssl/SSLSocketChannelInput.java | 5 +
.../nifi/remote/protocol/ClientProtocol.java | 4 +-
.../remote/protocol/CommunicationsInput.java | 6 +
.../protocol/socket/SocketClientProtocol.java | 31 +-
.../socket/SocketClientTransaction.java | 19 +-
.../socket/TestEndpointConnectionStatePool.java | 8 +-
.../client/socket/TestSiteToSiteClient.java | 100 +++
.../io/socket/SocketChannelInputStream.java | 10 +
.../remote/io/socket/ssl/SSLSocketChannel.java | 10 +
.../socket/ssl/SSLSocketChannelInputStream.java | 4 +
.../apache/nifi/groups/RemoteProcessGroup.java | 4 -
.../apache/nifi/controller/FlowController.java | 5 +-
.../nifi/remote/StandardRemoteProcessGroup.java | 14 +-
.../nifi/remote/StandardRemoteGroupPort.java | 63 +-
.../SSLSocketChannelCommunicationsSession.java | 93 --
.../io/socket/ssl/SSLSocketChannelInput.java | 50 --
.../io/socket/ssl/SSLSocketChannelOutput.java | 44 -
.../socket/SocketFlowFileServerProtocol.java | 2 +-
31 files changed, 1250 insertions(+), 1194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
index 29af777..dda5ae3 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
@@ -18,10 +18,10 @@ package org.apache.nifi.remote;
import java.io.IOException;
import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.stream.io.NullOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
public class Peer {
@@ -29,7 +29,8 @@ public class Peer {
private final String url;
private final String clusterUrl;
private final String host;
- private long penalizationExpiration = 0L;
+
+ private final Map<String, Long> penaltyExpirationMap = new HashMap<>();
private boolean closed = false;
public Peer(final CommunicationsSession commsSession, final String peerUrl, final String clusterUrl) {
@@ -61,19 +62,31 @@ public class Peer {
// Consume the InputStream so that it doesn't linger on the Peer's outgoing socket buffer
try {
- StreamUtils.copy(commsSession.getInput().getInputStream(), new NullOutputStream());
+ commsSession.getInput().consume();
} finally {
commsSession.close();
}
}
- public void penalize(final long millis) {
- penalizationExpiration = Math.max(penalizationExpiration, System.currentTimeMillis() + millis);
+ /**
+ * Penalizes this peer for the given destination only for the provided number of milliseconds
+ * @param destinationId
+ * @param millis
+ */
+ public void penalize(final String destinationId, final long millis) {
+ final Long currentPenalty = penaltyExpirationMap.get(destinationId);
+ final long proposedPenalty = System.currentTimeMillis() + millis;
+ if ( currentPenalty == null || proposedPenalty > currentPenalty ) {
+ penaltyExpirationMap.put(destinationId, proposedPenalty);
+ }
}
+
- public boolean isPenalized() {
- return penalizationExpiration > System.currentTimeMillis();
+ public boolean isPenalized(final String destinationId) {
+ final Long currentPenalty = penaltyExpirationMap.get(destinationId);
+ return (currentPenalty != null && currentPenalty > System.currentTimeMillis());
}
+
public boolean isClosed() {
return closed;
@@ -110,8 +123,6 @@ public class Peer {
sb.append("Peer[url=").append(url);
if (closed) {
sb.append(",CLOSED");
- } else if (isPenalized()) {
- sb.append(",PENALIZED");
}
sb.append("]");
return sb.toString();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
index cc16625..9fb6147 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
@@ -137,6 +137,16 @@ public interface Transaction {
void complete(boolean requestBackoff) throws IOException;
/**
+ * <p>
+ * Completes the transaction and indicates to both the sender and receiver that the data transfer was
+ * successful.
+ * </p>
+ *
+ * @throws IOException
+ */
+ void complete() throws IOException;
+
+ /**
* <p>
* Cancels this transaction, indicating to the sender that the data has not been successfully received so that
* the sender can retry or handle however is appropriate.
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index fa94b81..47568fd 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -27,6 +27,10 @@ import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.socket.SocketClient;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.PortNotRunningException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.protocol.DataPacket;
/**
@@ -65,18 +69,24 @@ import org.apache.nifi.remote.protocol.DataPacket;
public interface SiteToSiteClient extends Closeable {
/**
+ * <p>
* Creates a new Transaction that can be used to either send data to a remote NiFi instance
* or receive data from a remote NiFi instance, depending on the value passed for the {@code direction} argument.
+ * </p>
*
+ * <p>
+ * <b>Note:</b> If all of the nodes are penalized (See {@link Builder#nodePenalizationPeriod(long, TimeUnit)}), then
+ * this method will return <code>null</code>.
+ * </p>
*
* @param direction specifies which direction the data should be transferred. A value of {@link TransferDirection#SEND}
* indicates that this Transaction will send data to the remote instance; a value of {@link TransferDirection#RECEIVE} indicates
* that this Transaction will be used to receive data from the remote instance.
*
- * @return
+ * @return a Transaction to use for sending or receiving data, or <code>null</code> if all nodes are penalized.
* @throws IOException
*/
- Transaction createTransaction(TransferDirection direction) throws IOException;
+ Transaction createTransaction(TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException;
/**
* <p>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java
new file mode 100644
index 0000000..651ae50
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.socket;
+
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
+
+public class EndpointConnection {
+ private final Peer peer;
+ private final SocketClientProtocol socketClientProtocol;
+ private final FlowFileCodec codec;
+ private volatile long lastUsed;
+
+ public EndpointConnection(final Peer peer, final SocketClientProtocol socketClientProtocol, final FlowFileCodec codec) {
+ this.peer = peer;
+ this.socketClientProtocol = socketClientProtocol;
+ this.codec = codec;
+ }
+
+ public FlowFileCodec getCodec() {
+ return codec;
+ }
+
+ public SocketClientProtocol getSocketClientProtocol() {
+ return socketClientProtocol;
+ }
+
+ public Peer getPeer() {
+ return peer;
+ }
+
+ public void setLastTimeUsed() {
+ lastUsed = System.currentTimeMillis();
+ }
+
+ public long getLastTimeUsed() {
+ return lastUsed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
new file mode 100644
index 0000000..6869cca
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -0,0 +1,855 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.socket;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+
+import javax.net.ssl.SSLContext;
+import javax.security.cert.CertificateExpiredException;
+import javax.security.cert.CertificateNotYetValidException;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.RemoteDestination;
+import org.apache.nifi.remote.RemoteResourceInitiator;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.cluster.NodeInformation;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.PortNotRunningException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+import org.apache.nifi.remote.exception.UnknownPortException;
+import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
+import org.apache.nifi.remote.util.PeerStatusCache;
+import org.apache.nifi.remote.util.RemoteNiFiUtils;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.web.api.dto.ControllerDTO;
+import org.apache.nifi.web.api.dto.PortDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EndpointConnectionPool {
+ public static final long PEER_REFRESH_PERIOD = 60000L;
+ public static final String CATEGORY = "Site-to-Site";
+ public static final long REMOTE_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
+
+ private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
+
+ private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class);
+
+ private final BlockingQueue<EndpointConnection> connectionQueue = new LinkedBlockingQueue<>();
+ private final ConcurrentMap<PeerStatus, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
+ private final URI clusterUrl;
+ private final String apiUri;
+
+ private final AtomicLong peerIndex = new AtomicLong(0L);
+
+ private final ReentrantLock peerRefreshLock = new ReentrantLock();
+ private volatile List<PeerStatus> peerStatuses;
+ private volatile long peerRefreshTime = 0L;
+ private volatile PeerStatusCache peerStatusCache;
+ private final Set<CommunicationsSession> activeCommsChannels = new HashSet<>();
+
+ private final File peersFile;
+ private final EventReporter eventReporter;
+ private final SSLContext sslContext;
+ private final ScheduledExecutorService taskExecutor;
+
+ private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock();
+ private final Lock remoteInfoReadLock = listeningPortRWLock.readLock();
+ private final Lock remoteInfoWriteLock = listeningPortRWLock.writeLock();
+ private Integer siteToSitePort;
+ private Boolean siteToSiteSecure;
+ private long remoteRefreshTime;
+ private final Map<String, String> inputPortMap = new HashMap<>(); // map input port name to identifier
+ private final Map<String, String> outputPortMap = new HashMap<>(); // map output port name to identifier
+
+ private volatile int commsTimeout;
+
+ public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final EventReporter eventReporter, final File persistenceFile) {
+ this(clusterUrl, commsTimeoutMillis, null, eventReporter, persistenceFile);
+ }
+
+ public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) {
+ try {
+ this.clusterUrl = new URI(clusterUrl);
+ } catch (final URISyntaxException e) {
+ throw new IllegalArgumentException("Invalid Cluster URL: " + clusterUrl);
+ }
+
+ // Trim the trailing /
+ String uriPath = this.clusterUrl.getPath();
+ if (uriPath.endsWith("/")) {
+ uriPath = uriPath.substring(0, uriPath.length() - 1);
+ }
+ apiUri = this.clusterUrl.getScheme() + "://" + this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api";
+
+ this.sslContext = sslContext;
+ this.peersFile = persistenceFile;
+ this.eventReporter = eventReporter;
+ this.commsTimeout = commsTimeoutMillis;
+
+ Set<PeerStatus> recoveredStatuses;
+ if ( persistenceFile != null && persistenceFile.exists() ) {
+ try {
+ recoveredStatuses = recoverPersistedPeerStatuses(peersFile);
+ this.peerStatusCache = new PeerStatusCache(recoveredStatuses, peersFile.lastModified());
+ } catch (final IOException ioe) {
+ logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", persistenceFile, ioe);
+ }
+ } else {
+ peerStatusCache = null;
+ }
+
+ // Initialize a scheduled executor and run some maintenance tasks in the background to kill off old, unused
+ // connections and keep our list of peers up-to-date.
+ taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+ private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
+
+ @Override
+ public Thread newThread(final Runnable r) {
+ final Thread thread = defaultFactory.newThread(r);
+ thread.setName("NiFi Site-to-Site Connection Pool Maintenance");
+ return thread;
+ }
+ });
+
+ taskExecutor.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ refreshPeers();
+ }
+ }, 0, 5, TimeUnit.SECONDS);
+
+ taskExecutor.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ cleanupExpiredSockets();
+ }
+ }, 5, 5, TimeUnit.SECONDS);
+ }
+
+
+ public EndpointConnection getEndpointConnection(final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
+ return getEndpointConnection(remoteDestination, direction, null);
+ }
+
+
+ public EndpointConnection getEndpointConnection(final RemoteDestination remoteDestination, final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
+ //
+ // Attempt to get a connection state that already exists for this URL.
+ //
+ FlowFileCodec codec = null;
+ CommunicationsSession commsSession = null;
+ SocketClientProtocol protocol = null;
+ EndpointConnection connection;
+ Peer peer = null;
+
+ final List<EndpointConnection> addBack = new ArrayList<>();
+ try {
+ do {
+ connection = connectionQueue.poll();
+ logger.debug("{} Connection State for {} = {}", this, clusterUrl, connection);
+
+ if ( connection == null && !addBack.isEmpty() ) {
+ // all available connections have been penalized.
+ logger.debug("{} all Connections for {} are penalized; returning no Connection", this, remoteDestination.getIdentifier());
+ return null;
+ }
+
+ if ( connection != null && connection.getPeer().isPenalized(remoteDestination.getIdentifier()) ) {
+ // we have a connection, but it's penalized. We want to add it back to the queue
+ // when we've found one to use.
+ addBack.add(connection);
+ continue;
+ }
+
+ // if we can't get an existing Connection, create one
+ if ( connection == null ) {
+ logger.debug("No Connection available for Port {}; creating new Connection", remoteDestination.getIdentifier());
+ protocol = new SocketClientProtocol();
+ protocol.setDestination(remoteDestination);
+
+ final PeerStatus peerStatus = getNextPeerStatus(direction);
+ if ( peerStatus == null ) {
+ return null;
+ }
+
+ try {
+ commsSession = establishSiteToSiteConnection(peerStatus);
+ } catch (final IOException ioe) {
+ // TODO: penalize peer status
+ penalize(peerStatus, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ throw ioe;
+ }
+
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+ try {
+ RemoteResourceInitiator.initiateResourceNegotiation(protocol, dis, dos);
+ } catch (final HandshakeException e) {
+ try {
+ commsSession.close();
+ } catch (final IOException ioe) {
+ throw e;
+ }
+ }
+
+ final String peerUrl = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort();
+ peer = new Peer(commsSession, peerUrl, clusterUrl.toString());
+
+ // set properties based on config
+ if ( config != null ) {
+ protocol.setTimeout((int) config.getTimeout(TimeUnit.MILLISECONDS));
+ protocol.setPreferredBatchCount(config.getPreferredBatchCount());
+ protocol.setPreferredBatchSize(config.getPreferredBatchSize());
+ protocol.setPreferredBatchDuration(config.getPreferredBatchDuration(TimeUnit.MILLISECONDS));
+ }
+
+ // perform handshake
+ try {
+ protocol.handshake(peer);
+
+ // handle error cases
+ if ( protocol.isDestinationFull() ) {
+ logger.warn("{} {} indicates that port's destination is full; penalizing peer", this, peer);
+ penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ connectionQueue.offer(connection);
+ continue;
+ } else if ( protocol.isPortInvalid() ) {
+ penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ cleanup(protocol, peer);
+ throw new PortNotRunningException(peer.toString() + " indicates that port " + remoteDestination.getIdentifier() + " is not running");
+ } else if ( protocol.isPortUnknown() ) {
+ penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ cleanup(protocol, peer);
+ throw new UnknownPortException(peer.toString() + " indicates that port " + remoteDestination.getIdentifier() + " is not known");
+ }
+
+ // negotiate the FlowFileCodec to use
+ codec = protocol.negotiateCodec(peer);
+ } catch (final PortNotRunningException | UnknownPortException e) {
+ throw e;
+ } catch (final Exception e) {
+ penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ cleanup(protocol, peer);
+
+ final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? clusterUrl : peer, e.toString());
+ logger.error(message);
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", e);
+ }
+ throw e;
+ }
+
+ connection = new EndpointConnection(peer, protocol, codec);
+ } else {
+ final long lastTimeUsed = connection.getLastTimeUsed();
+ final long millisSinceLastUse = System.currentTimeMillis() - lastTimeUsed;
+
+ if ( commsTimeout > 0L && millisSinceLastUse >= commsTimeout ) {
+ cleanup(connection.getSocketClientProtocol(), connection.getPeer());
+ connection = null;
+ } else {
+ codec = connection.getCodec();
+ peer = connection.getPeer();
+ commsSession = peer.getCommunicationsSession();
+ protocol = connection.getSocketClientProtocol();
+ }
+ }
+ } while ( connection == null || codec == null || commsSession == null || protocol == null );
+ } finally {
+ if ( !addBack.isEmpty() ) {
+ connectionQueue.addAll(addBack);
+ }
+ }
+
+ return connection;
+ }
+
+
+ public boolean offer(final EndpointConnection endpointConnection) {
+ final Peer peer = endpointConnection.getPeer();
+ if ( peer == null ) {
+ return false;
+ }
+
+ final String url = peer.getUrl();
+ if ( url == null ) {
+ return false;
+ }
+
+ return connectionQueue.offer(endpointConnection);
+ }
+
+ private void penalize(final PeerStatus status, final long penalizationMillis) {
+ Long expiration = peerTimeoutExpirations.get(status);
+ if ( expiration == null ) {
+ expiration = Long.valueOf(0L);
+ }
+
+ final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis);
+ peerTimeoutExpirations.put(status, Long.valueOf(newExpiration));
+ }
+
+ /**
+ * Updates internal state map to penalize a PeerStatus that points to the specified peer
+ * @param peer
+ */
+ public void penalize(final Peer peer, final long penalizationMillis) {
+ String host;
+ int port;
+ try {
+ final URI uri = new URI(peer.getUrl());
+ host = uri.getHost();
+ port = uri.getPort();
+ } catch (final URISyntaxException e) {
+ host = peer.getHost();
+ port = -1;
+ }
+
+ final PeerStatus status = new PeerStatus(host, port, true, 1);
+ penalize(status, penalizationMillis);
+ }
+
+ private void cleanup(final SocketClientProtocol protocol, final Peer peer) {
+ if ( protocol != null && peer != null ) {
+ try {
+ protocol.shutdown(peer);
+ } catch (final TransmissionDisabledException e) {
+ // User disabled transmission.... do nothing.
+ logger.debug(this + " Transmission Disabled by User");
+ } catch (IOException e1) {
+ }
+ }
+
+ if ( peer != null ) {
+ try {
+ peer.close();
+ } catch (final TransmissionDisabledException e) {
+ // User disabled transmission.... do nothing.
+ logger.debug(this + " Transmission Disabled by User");
+ } catch (IOException e1) {
+ }
+ }
+ }
+
+ private PeerStatus getNextPeerStatus(final TransferDirection direction) {
+ List<PeerStatus> peerList = peerStatuses;
+ if ( (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD) ) {
+ peerRefreshLock.lock();
+ try {
+ try {
+ peerList = createPeerStatusList(direction);
+ } catch (final Exception e) {
+ final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString());
+ logger.warn(message);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", e);
+ }
+
+ if ( eventReporter != null ) {
+ eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
+ }
+ }
+
+ this.peerStatuses = peerList;
+ peerRefreshTime = System.currentTimeMillis();
+ } finally {
+ peerRefreshLock.unlock();
+ }
+ }
+
+ if ( peerList == null || peerList.isEmpty() ) {
+ return null;
+ }
+
+ PeerStatus peerStatus;
+ for (int i=0; i < peerList.size(); i++) {
+ final long idx = peerIndex.getAndIncrement();
+ final int listIndex = (int) (idx % peerList.size());
+ peerStatus = peerList.get(listIndex);
+
+ if ( isPenalized(peerStatus) ) {
+ logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus);
+ } else {
+ return peerStatus;
+ }
+ }
+
+ logger.debug("{} All peers appear to be penalized; returning null", this);
+ return null;
+ }
+
+ private boolean isPenalized(final PeerStatus peerStatus) {
+ final Long expirationEnd = peerTimeoutExpirations.get(peerStatus);
+ return (expirationEnd == null ? false : expirationEnd > System.currentTimeMillis() );
+ }
+
+ private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException {
+ Set<PeerStatus> statuses = getPeerStatuses();
+ if ( statuses == null ) {
+ refreshPeers();
+ statuses = getPeerStatuses();
+ if ( statuses == null ) {
+ logger.debug("{} found no peers to connect to", this);
+ return Collections.emptyList();
+ }
+ }
+
+ final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
+ final List<NodeInformation> nodeInfos = new ArrayList<>();
+ for ( final PeerStatus peerStatus : statuses ) {
+ final NodeInformation nodeInfo = new NodeInformation(peerStatus.getHostname(), peerStatus.getPort(), 0, peerStatus.isSecure(), peerStatus.getFlowFileCount());
+ nodeInfos.add(nodeInfo);
+ }
+ clusterNodeInfo.setNodeInformation(nodeInfos);
+ return formulateDestinationList(clusterNodeInfo, direction);
+ }
+
+
+ private Set<PeerStatus> getPeerStatuses() {
+ final PeerStatusCache cache = this.peerStatusCache;
+ if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) {
+ return null;
+ }
+
+ if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) {
+ final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size());
+ for (final PeerStatus status : cache.getStatuses()) {
+ final PeerStatus equalizedStatus = new PeerStatus(status.getHostname(), status.getPort(), status.isSecure(), 1);
+ equalizedSet.add(equalizedStatus);
+ }
+
+ return equalizedSet;
+ }
+
+ return cache.getStatuses();
+ }
+
+ private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException, HandshakeException, UnknownPortException, PortNotRunningException {
+ final String hostname = clusterUrl.getHost();
+ final int port = getSiteToSitePort();
+
+ final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port);
+ final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString());
+ final SocketClientProtocol clientProtocol = new SocketClientProtocol();
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+ RemoteResourceInitiator.initiateResourceNegotiation(clientProtocol, dis, dos);
+
+ clientProtocol.setTimeout(commsTimeout);
+ clientProtocol.handshake(peer, null);
+ final Set<PeerStatus> peerStatuses = clientProtocol.getPeerStatuses(peer);
+ persistPeerStatuses(peerStatuses);
+
+ try {
+ clientProtocol.shutdown(peer);
+ } catch (final IOException e) {
+ final String message = String.format("%s Failed to shutdown protocol when updating list of peers due to %s", this, e.toString());
+ logger.warn(message);
+ if (logger.isDebugEnabled()) {
+ logger.warn("", e);
+ }
+ }
+
+ try {
+ peer.close();
+ } catch (final IOException e) {
+ final String message = String.format("%s Failed to close resources when updating list of peers due to %s", this, e.toString());
+ logger.warn(message);
+ if (logger.isDebugEnabled()) {
+ logger.warn("", e);
+ }
+ }
+
+ return peerStatuses;
+ }
+
+
+ private void persistPeerStatuses(final Set<PeerStatus> statuses) {
+ if ( peersFile == null ) {
+ return;
+ }
+
+ try (final OutputStream fos = new FileOutputStream(peersFile);
+ final OutputStream out = new BufferedOutputStream(fos)) {
+
+ for (final PeerStatus status : statuses) {
+ final String line = status.getHostname() + ":" + status.getPort() + ":" + status.isSecure() + "\n";
+ out.write(line.getBytes(StandardCharsets.UTF_8));
+ }
+
+ } catch (final IOException e) {
+ logger.error("Failed to persist list of Peers due to {}; if restarted and peer's NCM is down, may be unable to transfer data until communications with NCM are restored", e.toString(), e);
+ }
+ }
+
+ private Set<PeerStatus> recoverPersistedPeerStatuses(final File file) throws IOException {
+ if (!file.exists()) {
+ return null;
+ }
+
+ final Set<PeerStatus> statuses = new HashSet<>();
+ try (final InputStream fis = new FileInputStream(file);
+ final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) {
+
+ String line;
+ while ((line = reader.readLine()) != null) {
+ final String[] splits = line.split(Pattern.quote(":"));
+ if (splits.length != 3) {
+ continue;
+ }
+
+ final String hostname = splits[0];
+ final int port = Integer.parseInt(splits[1]);
+ final boolean secure = Boolean.parseBoolean(splits[2]);
+
+ statuses.add(new PeerStatus(hostname, port, secure, 1));
+ }
+ }
+
+ return statuses;
+ }
+
+
+ private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException {
+ return establishSiteToSiteConnection(peerStatus.getHostname(), peerStatus.getPort());
+ }
+
+ private CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port) throws IOException {
+ final boolean siteToSiteSecure = isSecure();
+ final String destinationUri = "nifi://" + hostname + ":" + port;
+
+ CommunicationsSession commsSession = null;
+ try {
+ if ( siteToSiteSecure ) {
+ if ( sslContext == null ) {
+ throw new IOException("Unable to communicate with " + hostname + ":" + port + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications");
+ }
+
+ final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
+ socketChannel.connect();
+
+ commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
+
+ try {
+ commsSession.setUserDn(socketChannel.getDn());
+ } catch (final CertificateNotYetValidException | CertificateExpiredException ex) {
+ throw new IOException(ex);
+ }
+ } else {
+ final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
+ commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri);
+ }
+
+ commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
+ commsSession.setUri(destinationUri);
+ } catch (final IOException ioe) {
+ if ( commsSession != null ) {
+ commsSession.close();
+ }
+
+ throw ioe;
+ }
+
+ return commsSession;
+ }
+
+
+ static List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo, final TransferDirection direction) {
+ final Collection<NodeInformation> nodeInfoSet = clusterNodeInfo.getNodeInformation();
+ final int numDestinations = Math.max(128, nodeInfoSet.size());
+ final Map<NodeInformation, Integer> entryCountMap = new HashMap<>();
+
+ long totalFlowFileCount = 0L;
+ for (final NodeInformation nodeInfo : nodeInfoSet) {
+ totalFlowFileCount += nodeInfo.getTotalFlowFiles();
+ }
+
+ int totalEntries = 0;
+ for (final NodeInformation nodeInfo : nodeInfoSet) {
+ final int flowFileCount = nodeInfo.getTotalFlowFiles();
+ // don't allow any node to get more than 80% of the data
+ final double percentageOfFlowFiles = Math.min(0.8D, ((double) flowFileCount / (double) totalFlowFileCount));
+ final double relativeWeighting = (direction == TransferDirection.RECEIVE) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles;
+ final int entries = Math.max(1, (int) (numDestinations * relativeWeighting));
+
+ entryCountMap.put(nodeInfo, Math.max(1, entries));
+ totalEntries += entries;
+ }
+
+ final List<PeerStatus> destinations = new ArrayList<>(totalEntries);
+ for (int i=0; i < totalEntries; i++) {
+ destinations.add(null);
+ }
+ for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) {
+ final NodeInformation nodeInfo = entry.getKey();
+ final int numEntries = entry.getValue();
+
+ int skipIndex = numEntries;
+ for (int i=0; i < numEntries; i++) {
+ int n = (skipIndex * i);
+ while (true) {
+ final int index = n % destinations.size();
+ PeerStatus status = destinations.get(index);
+ if ( status == null ) {
+ status = new PeerStatus(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure(), nodeInfo.getTotalFlowFiles());
+ destinations.set(index, status);
+ break;
+ } else {
+ n++;
+ }
+ }
+ }
+ }
+
+ final StringBuilder distributionDescription = new StringBuilder();
+ distributionDescription.append("New Weighted Distribution of Nodes:");
+ for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) {
+ final double percentage = entry.getValue() * 100D / (double) destinations.size();
+ distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of FlowFiles");
+ }
+ logger.info(distributionDescription.toString());
+
+ // Jumble the list of destinations.
+ return destinations;
+ }
+
+
+ private void cleanupExpiredSockets() {
+ final List<EndpointConnection> states = new ArrayList<>();
+
+ EndpointConnection state;
+ while ((state = connectionQueue.poll()) != null) {
+ // If the socket has not been used in 10 seconds, shut it down.
+ final long lastUsed = state.getLastTimeUsed();
+ if ( lastUsed < System.currentTimeMillis() - 10000L ) {
+ try {
+ state.getSocketClientProtocol().shutdown(state.getPeer());
+ } catch (final Exception e) {
+ logger.debug("Failed to shut down {} using {} due to {}",
+ new Object[] {state.getSocketClientProtocol(), state.getPeer(), e} );
+ }
+
+ cleanup(state.getSocketClientProtocol(), state.getPeer());
+ } else {
+ states.add(state);
+ }
+ }
+
+ connectionQueue.addAll(states);
+ }
+
+ public void shutdown() {
+ taskExecutor.shutdown();
+ peerTimeoutExpirations.clear();
+
+ for ( final CommunicationsSession commsSession : activeCommsChannels ) {
+ commsSession.interrupt();
+ }
+
+ EndpointConnection state;
+ while ( (state = connectionQueue.poll()) != null) {
+ cleanup(state.getSocketClientProtocol(), state.getPeer());
+ }
+ }
+
+ public void terminate(final EndpointConnection state) {
+ cleanup(state.getSocketClientProtocol(), state.getPeer());
+ }
+
+ private void refreshPeers() {
+ final PeerStatusCache existingCache = peerStatusCache;
+ if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis())) {
+ return;
+ }
+
+ try {
+ final Set<PeerStatus> statuses = fetchRemotePeerStatuses();
+ peerStatusCache = new PeerStatusCache(statuses);
+ logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size());
+ } catch (Exception e) {
+ logger.warn("{} Unable to refresh Remote Group's peers due to {}", this, e);
+ if (logger.isDebugEnabled()) {
+ logger.warn("", e);
+ }
+ }
+ }
+
+
+ public String getInputPortIdentifier(final String portName) throws IOException {
+ return getPortIdentifier(portName, inputPortMap);
+ }
+
+ public String getOutputPortIdentifier(final String portName) throws IOException {
+ return getPortIdentifier(portName, outputPortMap);
+ }
+
+
+ private String getPortIdentifier(final String portName, final Map<String, String> portMap) throws IOException {
+ String identifier;
+ remoteInfoReadLock.lock();
+ try {
+ identifier = portMap.get(portName);
+ } finally {
+ remoteInfoReadLock.unlock();
+ }
+
+ if ( identifier != null ) {
+ return identifier;
+ }
+
+ refreshRemoteInfo();
+
+ remoteInfoReadLock.lock();
+ try {
+ return portMap.get(portName);
+ } finally {
+ remoteInfoReadLock.unlock();
+ }
+ }
+
+
+ private ControllerDTO refreshRemoteInfo() throws IOException {
+ final boolean webInterfaceSecure = clusterUrl.toString().startsWith("https");
+ final RemoteNiFiUtils utils = new RemoteNiFiUtils(webInterfaceSecure ? sslContext : null);
+ final ControllerDTO controller = utils.getController(URI.create(apiUri + "/controller"), commsTimeout);
+
+ remoteInfoWriteLock.lock();
+ try {
+ this.siteToSitePort = controller.getRemoteSiteListeningPort();
+ this.siteToSiteSecure = controller.isSiteToSiteSecure();
+
+ inputPortMap.clear();
+ for (final PortDTO inputPort : controller.getInputPorts()) {
+ inputPortMap.put(inputPort.getName(), inputPort.getId());
+ }
+
+ outputPortMap.clear();
+ for ( final PortDTO outputPort : controller.getOutputPorts()) {
+ outputPortMap.put(outputPort.getName(), outputPort.getId());
+ }
+
+ this.remoteRefreshTime = System.currentTimeMillis();
+ } finally {
+ remoteInfoWriteLock.unlock();
+ }
+
+ return controller;
+ }
+
+ /**
+ * @return the port that the remote instance is listening on for
+ * site-to-site communication, or <code>null</code> if the remote instance
+ * is not configured to allow site-to-site communications.
+ *
+ * @throws IOException if unable to communicate with the remote instance
+ */
+ private Integer getSiteToSitePort() throws IOException {
+ Integer listeningPort;
+ remoteInfoReadLock.lock();
+ try {
+ listeningPort = this.siteToSitePort;
+ if (listeningPort != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
+ return listeningPort;
+ }
+ } finally {
+ remoteInfoReadLock.unlock();
+ }
+
+ final ControllerDTO controller = refreshRemoteInfo();
+ listeningPort = controller.getRemoteSiteListeningPort();
+
+ return listeningPort;
+ }
+
+
+
+
+ /**
+ * Returns {@code true} if the remote instance is configured for secure site-to-site communications,
+ * {@code false} otherwise.
+ *
+ * @return
+ * @throws IOException
+ */
+ public boolean isSecure() throws IOException {
+ remoteInfoReadLock.lock();
+ try {
+ final Boolean secure = this.siteToSiteSecure;
+ if (secure != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
+ return secure;
+ }
+ } finally {
+ remoteInfoReadLock.unlock();
+ }
+
+ final ControllerDTO controller = refreshRemoteInfo();
+ final Boolean isSecure = controller.isSiteToSiteSecure();
+ if ( isSecure == null ) {
+ throw new IOException("Remote NiFi instance " + clusterUrl + " is not currently configured to accept site-to-site connections");
+ }
+
+ return isSecure;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java
deleted file mode 100644
index f4ac727..0000000
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.client.socket;
-
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
-
-public class EndpointConnectionState {
- private final Peer peer;
- private final SocketClientProtocol socketClientProtocol;
- private final FlowFileCodec codec;
- private volatile long lastUsed;
-
- public EndpointConnectionState(final Peer peer, final SocketClientProtocol socketClientProtocol, final FlowFileCodec codec) {
- this.peer = peer;
- this.socketClientProtocol = socketClientProtocol;
- this.codec = codec;
- }
-
- public FlowFileCodec getCodec() {
- return codec;
- }
-
- public SocketClientProtocol getSocketClientProtocol() {
- return socketClientProtocol;
- }
-
- public Peer getPeer() {
- return peer;
- }
-
- public void setLastTimeUsed() {
- lastUsed = System.currentTimeMillis();
- }
-
- public long getLastTimeUsed() {
- return lastUsed;
- }
-}