You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2015/04/22 23:12:57 UTC
[07/49] incubator-nifi git commit: NIFI-271 checkpoint push because
there are so many changes. Long way to go but got through dto library
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
index 2b27de2..b0ce357 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
@@ -28,22 +28,22 @@ import org.apache.nifi.remote.protocol.ServerProtocol;
public class RemoteResourceFactory extends RemoteResourceInitiator {
- @SuppressWarnings("unchecked")
+ @SuppressWarnings("unchecked")
public static <T extends FlowFileCodec> T receiveCodecNegotiation(final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
final String codecName = dis.readUTF();
final int version = dis.readInt();
-
+
final T codec = (T) RemoteResourceManager.createCodec(codecName, version);
final VersionNegotiator negotiator = codec.getVersionNegotiator();
- if ( negotiator.isVersionSupported(version) ) {
+ if (negotiator.isVersionSupported(version)) {
dos.write(RESOURCE_OK);
dos.flush();
-
+
negotiator.setVersion(version);
return codec;
} else {
final Integer preferred = negotiator.getPreferredVersion(version);
- if ( preferred == null ) {
+ if (preferred == null) {
dos.write(ABORT);
dos.flush();
throw new HandshakeException("Unable to negotiate an acceptable version of the FlowFileCodec " + codecName);
@@ -51,36 +51,36 @@ public class RemoteResourceFactory extends RemoteResourceInitiator {
dos.write(DIFFERENT_RESOURCE_VERSION);
dos.writeInt(preferred);
dos.flush();
-
+
return receiveCodecNegotiation(dis, dos);
}
- }
-
- public static void rejectCodecNegotiation(final DataInputStream dis, final DataOutputStream dos, final String explanation) throws IOException {
- dis.readUTF(); // read codec name
- dis.readInt(); // read codec version
-
- dos.write(ABORT);
- dos.writeUTF(explanation);
- dos.flush();
- }
-
- @SuppressWarnings("unchecked")
+ }
+
+ public static void rejectCodecNegotiation(final DataInputStream dis, final DataOutputStream dos, final String explanation) throws IOException {
+ dis.readUTF(); // read codec name
+ dis.readInt(); // read codec version
+
+ dos.write(ABORT);
+ dos.writeUTF(explanation);
+ dos.flush();
+ }
+
+ @SuppressWarnings("unchecked")
public static <T extends ClientProtocol> T receiveClientProtocolNegotiation(final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
final String protocolName = dis.readUTF();
final int version = dis.readInt();
-
+
final T protocol = (T) RemoteResourceManager.createClientProtocol(protocolName);
final VersionNegotiator negotiator = protocol.getVersionNegotiator();
- if ( negotiator.isVersionSupported(version) ) {
+ if (negotiator.isVersionSupported(version)) {
dos.write(RESOURCE_OK);
dos.flush();
-
+
negotiator.setVersion(version);
return protocol;
} else {
final Integer preferred = negotiator.getPreferredVersion(version);
- if ( preferred == null ) {
+ if (preferred == null) {
dos.write(ABORT);
dos.flush();
throw new HandshakeException("Unable to negotiate an acceptable version of the ClientProtocol " + protocolName);
@@ -88,28 +88,27 @@ public class RemoteResourceFactory extends RemoteResourceInitiator {
dos.write(DIFFERENT_RESOURCE_VERSION);
dos.writeInt(preferred);
dos.flush();
-
+
return receiveClientProtocolNegotiation(dis, dos);
}
}
-
-
- @SuppressWarnings("unchecked")
+
+ @SuppressWarnings("unchecked")
public static <T extends ServerProtocol> T receiveServerProtocolNegotiation(final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
- final String protocolName = dis.readUTF();
+ final String protocolName = dis.readUTF();
final int version = dis.readInt();
-
+
final T protocol = (T) RemoteResourceManager.createServerProtocol(protocolName);
final VersionNegotiator negotiator = protocol.getVersionNegotiator();
- if ( negotiator.isVersionSupported(version) ) {
+ if (negotiator.isVersionSupported(version)) {
dos.write(RESOURCE_OK);
dos.flush();
-
+
negotiator.setVersion(version);
return protocol;
} else {
final Integer preferred = negotiator.getPreferredVersion(version);
- if ( preferred == null ) {
+ if (preferred == null) {
dos.write(ABORT);
dos.flush();
throw new HandshakeException("Unable to negotiate an acceptable version of the ServerProtocol " + protocolName);
@@ -117,54 +116,53 @@ public class RemoteResourceFactory extends RemoteResourceInitiator {
dos.write(DIFFERENT_RESOURCE_VERSION);
dos.writeInt(preferred);
dos.flush();
-
+
return receiveServerProtocolNegotiation(dis, dos);
}
}
-
-
-
-
- public static <T extends VersionedRemoteResource> T receiveResourceNegotiation(final Class<T> cls, final DataInputStream dis, final DataOutputStream dos, final Class<?>[] constructorArgClasses, final Object[] constructorArgs) throws IOException, HandshakeException {
- final String resourceClassName = dis.readUTF();
- final T resource;
- try {
+
+ public static <T extends VersionedRemoteResource> T
+ receiveResourceNegotiation(final Class<T> cls, final DataInputStream dis, final DataOutputStream dos, final Class<?>[] constructorArgClasses, final Object[] constructorArgs)
+ throws IOException, HandshakeException {
+ final String resourceClassName = dis.readUTF();
+ final T resource;
+ try {
@SuppressWarnings("unchecked")
- final Class<T> resourceClass = (Class<T>) Class.forName(resourceClassName);
- if ( !cls.isAssignableFrom(resourceClass) ) {
- throw new HandshakeException("Expected to negotiate a Versioned Resource of type " + cls.getName() + " but received class name of " + resourceClassName);
+ final Class<T> resourceClass = (Class<T>) Class.forName(resourceClassName);
+ if (!cls.isAssignableFrom(resourceClass)) {
+ throw new HandshakeException("Expected to negotiate a Versioned Resource of type " + cls.getName() + " but received class name of " + resourceClassName);
}
-
+
final Constructor<T> ctr = resourceClass.getConstructor(constructorArgClasses);
resource = ctr.newInstance(constructorArgs);
} catch (final Throwable t) {
- dos.write(ABORT);
- final String errorMsg = "Unable to instantiate Versioned Resource of type " + resourceClassName;
- dos.writeUTF(errorMsg);
- dos.flush();
- throw new HandshakeException(errorMsg);
+ dos.write(ABORT);
+ final String errorMsg = "Unable to instantiate Versioned Resource of type " + resourceClassName;
+ dos.writeUTF(errorMsg);
+ dos.flush();
+ throw new HandshakeException(errorMsg);
}
-
+
final int version = dis.readInt();
final VersionNegotiator negotiator = resource.getVersionNegotiator();
- if ( negotiator.isVersionSupported(version) ) {
+ if (negotiator.isVersionSupported(version)) {
dos.write(RESOURCE_OK);
dos.flush();
-
+
negotiator.setVersion(version);
return resource;
} else {
final Integer preferred = negotiator.getPreferredVersion(version);
- if ( preferred == null ) {
- dos.write(ABORT);
- dos.flush();
- throw new HandshakeException("Unable to negotiate an acceptable version of the resource " + resourceClassName);
+ if (preferred == null) {
+ dos.write(ABORT);
+ dos.flush();
+ throw new HandshakeException("Unable to negotiate an acceptable version of the resource " + resourceClassName);
}
dos.write(DIFFERENT_RESOURCE_VERSION);
dos.writeInt(preferred);
dos.flush();
-
+
return receiveResourceNegotiation(cls, dis, dos, constructorArgClasses, constructorArgs);
}
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java
index f86f066..8bbe7aa 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java
@@ -34,20 +34,21 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RemoteResourceManager {
+
private static final Map<String, Class<? extends FlowFileCodec>> codecClassMap;
private static final Map<String, Class<? extends ServerProtocol>> desiredServerProtocolClassMap = new ConcurrentHashMap<>();
private static final Map<String, Class<? extends ClientProtocol>> desiredClientProtocolClassMap = new ConcurrentHashMap<>();
-
+
private static final Map<String, Set<Class<? extends ServerProtocol>>> serverProtocolClassMap;
private static final Map<String, Set<Class<? extends ClientProtocol>>> clientProtocolClassMap;
-
+
private static final Logger logger = LoggerFactory.getLogger(RemoteResourceManager.class);
-
+
static {
final Map<String, Class<? extends FlowFileCodec>> codecMap = new HashMap<>();
final Map<String, Set<Class<? extends ServerProtocol>>> serverProtocolMap = new HashMap<>();
final Map<String, Set<Class<? extends ClientProtocol>>> clientProtocolMap = new HashMap<>();
-
+
// load all of the FlowFileCodecs that we know
final ClassLoader classLoader = RemoteResourceManager.class.getClassLoader();
final ServiceLoader<FlowFileCodec> flowFileCodecLoader = ServiceLoader.load(FlowFileCodec.class, classLoader);
@@ -58,12 +59,12 @@ public class RemoteResourceManager {
final String codecName = codec.getResourceName();
final Class<? extends FlowFileCodec> previousValue = codecMap.put(codecName, clazz);
- if ( previousValue != null ) {
- logger.warn("Multiple FlowFileCodec's found with name {}; choosing to use {} in place of {}",
- new Object[] {codecName, clazz.getName(), previousValue.getName()});
+ if (previousValue != null) {
+ logger.warn("Multiple FlowFileCodec's found with name {}; choosing to use {} in place of {}",
+ new Object[]{codecName, clazz.getName(), previousValue.getName()});
}
}
-
+
final ServiceLoader<ServerProtocol> serverProtocolLoader = ServiceLoader.load(ServerProtocol.class, classLoader);
final Iterator<ServerProtocol> serverItr = serverProtocolLoader.iterator();
while (serverItr.hasNext()) {
@@ -72,14 +73,14 @@ public class RemoteResourceManager {
final String protocolName = protocol.getResourceName();
Set<Class<? extends ServerProtocol>> classSet = serverProtocolMap.get(protocolName);
- if ( classSet == null ) {
+ if (classSet == null) {
classSet = new HashSet<>();
serverProtocolMap.put(protocolName, classSet);
}
-
+
classSet.add(clazz);
}
-
+
final ServiceLoader<ClientProtocol> clientProtocolLoader = ServiceLoader.load(ClientProtocol.class, classLoader);
final Iterator<ClientProtocol> clientItr = clientProtocolLoader.iterator();
while (clientItr.hasNext()) {
@@ -88,133 +89,132 @@ public class RemoteResourceManager {
final String protocolName = protocol.getResourceName();
Set<Class<? extends ClientProtocol>> classSet = clientProtocolMap.get(protocolName);
- if ( classSet == null ) {
+ if (classSet == null) {
classSet = new HashSet<>();
clientProtocolMap.put(protocolName, classSet);
}
-
+
classSet.add(clazz);
}
-
+
codecClassMap = Collections.unmodifiableMap(codecMap);
clientProtocolClassMap = Collections.unmodifiableMap(clientProtocolMap);
serverProtocolClassMap = Collections.unmodifiableMap(serverProtocolMap);
}
-
public static boolean isCodecSupported(final String codecName) {
return codecClassMap.containsKey(codecName);
}
-
+
public static boolean isCodecSupported(final String codecName, final int version) {
- if ( !isCodecSupported(codecName) ) {
+ if (!isCodecSupported(codecName)) {
return false;
}
-
+
final FlowFileCodec codec = createCodec(codecName);
final VersionNegotiator negotiator = codec.getVersionNegotiator();
return (negotiator.isVersionSupported(version));
}
-
+
public static FlowFileCodec createCodec(final String codecName, final int version) {
final FlowFileCodec codec = createCodec(codecName);
final VersionNegotiator negotiator = codec.getVersionNegotiator();
- if ( !negotiator.isVersionSupported(version) ) {
+ if (!negotiator.isVersionSupported(version)) {
throw new IllegalArgumentException("FlowFile Codec " + codecName + " does not support version " + version);
}
-
+
negotiator.setVersion(version);
return codec;
}
-
+
private static FlowFileCodec createCodec(final String codecName) {
final Class<? extends FlowFileCodec> codecClass = codecClassMap.get(codecName);
- if ( codecClass == null ) {
+ if (codecClass == null) {
throw new IllegalArgumentException("Unknown Codec: " + codecName);
}
-
+
try {
return codecClass.newInstance();
} catch (final Exception e) {
throw new RuntimeException("Unable to instantiate class " + codecClass.getName(), e);
}
}
-
+
public static Set<String> getSupportedCodecNames() {
return codecClassMap.keySet();
}
-
+
public static List<Integer> getSupportedVersions(final String codecName) {
final FlowFileCodec codec = createCodec(codecName);
return codec.getSupportedVersions();
}
-
+
public static Set<Class<? extends ClientProtocol>> getClientProtocolClasses(final String protocolName) {
final Set<Class<? extends ClientProtocol>> classes = clientProtocolClassMap.get(protocolName);
- if ( classes == null ) {
+ if (classes == null) {
return new HashSet<>();
}
return new HashSet<>(classes);
}
-
+
public static Set<Class<? extends ServerProtocol>> getServerProtocolClasses(final String protocolName) {
final Set<Class<? extends ServerProtocol>> classes = serverProtocolClassMap.get(protocolName);
- if ( classes == null ) {
+ if (classes == null) {
return new HashSet<>();
}
return new HashSet<>(classes);
}
-
+
public static void setServerProtocolImplementation(final String protocolName, final Class<? extends ServerProtocol> clazz) {
desiredServerProtocolClassMap.put(protocolName, clazz);
}
-
+
public static void setClientProtocolImplementation(final String protocolName, final Class<? extends ClientProtocol> clazz) {
desiredClientProtocolClassMap.put(protocolName, clazz);
}
-
+
public static ServerProtocol createServerProtocol(final String protocolName) {
final Set<Class<? extends ServerProtocol>> classSet = getServerProtocolClasses(protocolName);
- if ( classSet.isEmpty() ) {
+ if (classSet.isEmpty()) {
throw new IllegalArgumentException("Unknkown Server Protocol: " + protocolName);
}
Class<? extends ServerProtocol> desiredClass = desiredServerProtocolClassMap.get(protocolName);
- if ( desiredClass == null && classSet.size() > 1 ) {
+ if (desiredClass == null && classSet.size() > 1) {
throw new IllegalStateException("Multiple implementations of Server Protocol " + protocolName + " were found and no preferred implementation has been specified");
}
-
- if ( desiredClass != null && !classSet.contains(desiredClass) ) {
+
+ if (desiredClass != null && !classSet.contains(desiredClass)) {
throw new IllegalStateException("Desired implementation of Server Protocol " + protocolName + " is set to " + desiredClass + ", but that Protocol is not registered as a Server Protocol");
}
-
- if ( desiredClass == null ) {
+
+ if (desiredClass == null) {
desiredClass = classSet.iterator().next();
}
-
+
try {
return desiredClass.newInstance();
} catch (final Exception e) {
throw new RuntimeException("Unable to instantiate class " + desiredClass.getName(), e);
- }
+ }
}
-
+
public static ClientProtocol createClientProtocol(final String protocolName) {
final Set<Class<? extends ClientProtocol>> classSet = getClientProtocolClasses(protocolName);
- if ( classSet.isEmpty() ) {
+ if (classSet.isEmpty()) {
throw new IllegalArgumentException("Unknkown Client Protocol: " + protocolName);
}
Class<? extends ClientProtocol> desiredClass = desiredClientProtocolClassMap.get(protocolName);
- if ( desiredClass == null && classSet.size() > 1 ) {
+ if (desiredClass == null && classSet.size() > 1) {
throw new IllegalStateException("Multiple implementations of Client Protocol " + protocolName + " were found and no preferred implementation has been specified");
}
-
- if ( desiredClass != null && !classSet.contains(desiredClass) ) {
+
+ if (desiredClass != null && !classSet.contains(desiredClass)) {
throw new IllegalStateException("Desired implementation of Client Protocol " + protocolName + " is set to " + desiredClass + ", but that Protocol is not registered as a Client Protocol");
}
-
- if ( desiredClass == null ) {
+
+ if (desiredClass == null) {
desiredClass = classSet.iterator().next();
}
@@ -222,6 +222,6 @@ public class RemoteResourceManager {
return desiredClass.newInstance();
} catch (final Exception e) {
throw new RuntimeException("Unable to instantiate class " + desiredClass.getName(), e);
- }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
index 59e4d0a..6f7b977 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
@@ -30,4 +30,4 @@ public interface RemoteSiteListener {
void stop();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index 493d1fe..809147e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -49,43 +49,42 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SocketRemoteSiteListener implements RemoteSiteListener {
+
public static final String DEFAULT_FLOWFILE_PATH = "./";
private final int socketPort;
private final SSLContext sslContext;
private final NodeInformant nodeInformant;
private final AtomicReference<ProcessGroup> rootGroup = new AtomicReference<>();
-
+
private final AtomicBoolean stopped = new AtomicBoolean(false);
-
+
private static final Logger LOG = LoggerFactory.getLogger(SocketRemoteSiteListener.class);
public SocketRemoteSiteListener(final int socketPort, final SSLContext sslContext) {
this(socketPort, sslContext, null);
}
-
+
public SocketRemoteSiteListener(final int socketPort, final SSLContext sslContext, final NodeInformant nodeInformant) {
this.socketPort = socketPort;
this.sslContext = sslContext;
this.nodeInformant = nodeInformant;
}
-
@Override
public void setRootGroup(final ProcessGroup rootGroup) {
this.rootGroup.set(rootGroup);
}
-
@Override
public void start() throws IOException {
final boolean secure = (sslContext != null);
-
+
final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(true);
serverSocketChannel.bind(new InetSocketAddress(socketPort));
stopped.set(false);
-
+
final Thread listenerThread = new Thread(new Runnable() {
private int threadCount = 0;
@@ -95,19 +94,21 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
final ProcessGroup processGroup = rootGroup.get();
// If nodeInformant is not null, we are in clustered mode, which means that we don't care about
// the processGroup.
- if ( (nodeInformant == null) && (processGroup == null || (processGroup.getInputPorts().isEmpty() && processGroup.getOutputPorts().isEmpty())) ) {
- try { Thread.sleep(2000L); } catch (final Exception e) {}
+ if ((nodeInformant == null) && (processGroup == null || (processGroup.getInputPorts().isEmpty() && processGroup.getOutputPorts().isEmpty()))) {
+ try {
+ Thread.sleep(2000L);
+ } catch (final Exception e) {
+ }
continue;
}
-
-
+
LOG.trace("Accepting Connection...");
Socket acceptedSocket = null;
try {
serverSocketChannel.configureBlocking(false);
final ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.setSoTimeout(2000);
- while ( !stopped.get() && acceptedSocket == null ) {
+ while (!stopped.get() && acceptedSocket == null) {
try {
acceptedSocket = serverSocket.accept();
} catch (final SocketTimeoutException ste) {
@@ -116,14 +117,14 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
}
} catch (final IOException e) {
LOG.error("RemoteSiteListener Unable to accept connection due to {}", e.toString());
- if ( LOG.isDebugEnabled() ) {
+ if (LOG.isDebugEnabled()) {
LOG.error("", e);
}
continue;
}
LOG.trace("Got connection");
-
- if ( stopped.get() ) {
+
+ if (stopped.get()) {
return;
}
final Socket socket = acceptedSocket;
@@ -135,25 +136,25 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
final InetAddress inetAddress = socket.getInetAddress();
String hostname = inetAddress.getHostName();
final int slashIndex = hostname.indexOf("/");
- if ( slashIndex == 0 ) {
+ if (slashIndex == 0) {
hostname = hostname.substring(1);
- } else if ( slashIndex > 0 ) {
+ } else if (slashIndex > 0) {
hostname = hostname.substring(0, slashIndex);
}
final int port = socket.getPort();
final String peerUri = "nifi://" + hostname + ":" + port;
LOG.debug("{} Connection URL is {}", this, peerUri);
-
+
final CommunicationsSession commsSession;
final String dn;
try {
- if ( secure ) {
+ if (secure) {
final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel, false);
LOG.trace("Channel is secure; connecting...");
sslSocketChannel.connect();
LOG.trace("Channel connected");
-
+
commsSession = new SSLSocketChannelCommunicationsSession(sslSocketChannel, peerUri);
dn = sslSocketChannel.getDn();
commsSession.setUserDn(dn);
@@ -164,7 +165,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
}
} catch (final Exception e) {
LOG.error("RemoteSiteListener Unable to accept connection from {} due to {}", socket, e.toString());
- if ( LOG.isDebugEnabled() ) {
+ if (LOG.isDebugEnabled()) {
LOG.error("", e);
}
try {
@@ -173,135 +174,136 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
}
return;
}
-
+
LOG.info("Received connection from {}, User DN: {}", socket.getInetAddress(), dn);
-
+
final InputStream socketIn;
final OutputStream socketOut;
-
+
try {
socketIn = commsSession.getInput().getInputStream();
socketOut = commsSession.getOutput().getOutputStream();
} catch (final IOException e) {
- LOG.error("Connection dropped from {} before any data was transmitted", peerUri);
- try {
- commsSession.close();
- } catch (final IOException ioe) {}
-
- return;
+ LOG.error("Connection dropped from {} before any data was transmitted", peerUri);
+ try {
+ commsSession.close();
+ } catch (final IOException ioe) {
+ }
+
+ return;
}
-
+
final DataInputStream dis = new DataInputStream(socketIn);
- final DataOutputStream dos = new DataOutputStream(socketOut);
-
- ServerProtocol protocol = null;
- Peer peer = null;
+ final DataOutputStream dos = new DataOutputStream(socketOut);
+
+ ServerProtocol protocol = null;
+ Peer peer = null;
try {
- // ensure that we are communicating with another NiFi
+ // ensure that we are communicating with another NiFi
LOG.debug("Verifying magic bytes...");
- verifyMagicBytes(dis, peerUri);
-
- LOG.debug("Receiving Server Protocol Negotiation");
- protocol = RemoteResourceFactory.receiveServerProtocolNegotiation(dis, dos);
- protocol.setRootProcessGroup(rootGroup.get());
- protocol.setNodeInformant(nodeInformant);
-
- final PeerDescription description = new PeerDescription("localhost", getPort(), sslContext != null);
- peer = new Peer(description, commsSession, peerUri, "nifi://localhost:" + getPort());
- LOG.debug("Handshaking....");
- protocol.handshake(peer);
-
- if (!protocol.isHandshakeSuccessful()) {
- LOG.error("Handshake failed with {}; closing connection", peer);
- try {
- peer.close();
- } catch (final IOException e) {
- LOG.warn("Failed to close {} due to {}", peer, e);
- }
-
- // no need to shutdown protocol because we failed to perform handshake
- return;
- }
-
- commsSession.setTimeout((int) protocol.getRequestExpiration());
-
- LOG.info("Successfully negotiated ServerProtocol {} Version {} with {}", new Object[] {
- protocol.getResourceName(), protocol.getVersionNegotiator().getVersion(), peer});
-
- try {
- while (!protocol.isShutdown()) {
- LOG.trace("Getting Protocol Request Type...");
-
+ verifyMagicBytes(dis, peerUri);
+
+ LOG.debug("Receiving Server Protocol Negotiation");
+ protocol = RemoteResourceFactory.receiveServerProtocolNegotiation(dis, dos);
+ protocol.setRootProcessGroup(rootGroup.get());
+ protocol.setNodeInformant(nodeInformant);
+
+ final PeerDescription description = new PeerDescription("localhost", getPort(), sslContext != null);
+ peer = new Peer(description, commsSession, peerUri, "nifi://localhost:" + getPort());
+ LOG.debug("Handshaking....");
+ protocol.handshake(peer);
+
+ if (!protocol.isHandshakeSuccessful()) {
+ LOG.error("Handshake failed with {}; closing connection", peer);
+ try {
+ peer.close();
+ } catch (final IOException e) {
+ LOG.warn("Failed to close {} due to {}", peer, e);
+ }
+
+ // no need to shutdown protocol because we failed to perform handshake
+ return;
+ }
+
+ commsSession.setTimeout((int) protocol.getRequestExpiration());
+
+ LOG.info("Successfully negotiated ServerProtocol {} Version {} with {}", new Object[]{
+ protocol.getResourceName(), protocol.getVersionNegotiator().getVersion(), peer});
+
+ try {
+ while (!protocol.isShutdown()) {
+ LOG.trace("Getting Protocol Request Type...");
+
int timeoutCount = 0;
RequestType requestType = null;
-
- while ( requestType == null ) {
+
+ while (requestType == null) {
try {
requestType = protocol.getRequestType(peer);
} catch (final SocketTimeoutException e) {
// Give the timeout a bit longer (twice as long) to receive the Request Type,
// in order to attempt to receive more data without shutting down the socket if we don't
// have to.
- LOG.debug("{} Timed out waiting to receive RequestType using {} with {}", new Object[] {this, protocol, peer});
+ LOG.debug("{} Timed out waiting to receive RequestType using {} with {}", new Object[]{this, protocol, peer});
timeoutCount++;
requestType = null;
-
- if ( timeoutCount >= 2 ) {
+
+ if (timeoutCount >= 2) {
throw e;
}
}
}
-
+
LOG.debug("Request type from {} is {}", protocol, requestType);
- switch (requestType) {
- case NEGOTIATE_FLOWFILE_CODEC:
- protocol.negotiateCodec(peer);
- break;
- case RECEIVE_FLOWFILES:
- // peer wants to receive FlowFiles, so we will transfer FlowFiles.
- protocol.getPort().transferFlowFiles(peer, protocol, new HashMap<String, String>());
- break;
- case SEND_FLOWFILES:
- // Peer wants to send FlowFiles, so we will receive.
+ switch (requestType) {
+ case NEGOTIATE_FLOWFILE_CODEC:
+ protocol.negotiateCodec(peer);
+ break;
+ case RECEIVE_FLOWFILES:
+ // peer wants to receive FlowFiles, so we will transfer FlowFiles.
+ protocol.getPort().transferFlowFiles(peer, protocol, new HashMap<String, String>());
+ break;
+ case SEND_FLOWFILES:
+ // Peer wants to send FlowFiles, so we will receive.
protocol.getPort().receiveFlowFiles(peer, protocol, new HashMap<String, String>());
- break;
- case REQUEST_PEER_LIST:
- protocol.sendPeerList(peer);
- break;
- case SHUTDOWN:
- protocol.shutdown(peer);
- break;
- }
- }
- LOG.debug("Finished communicating with {} ({})", peer, protocol);
- } catch (final Exception e) {
- LOG.error("Unable to communicate with remote instance {} ({}) due to {}; closing connection", peer, protocol, e.toString());
- if ( LOG.isDebugEnabled() ) {
- LOG.error("", e);
- }
- }
+ break;
+ case REQUEST_PEER_LIST:
+ protocol.sendPeerList(peer);
+ break;
+ case SHUTDOWN:
+ protocol.shutdown(peer);
+ break;
+ }
+ }
+ LOG.debug("Finished communicating with {} ({})", peer, protocol);
+ } catch (final Exception e) {
+ LOG.error("Unable to communicate with remote instance {} ({}) due to {}; closing connection", peer, protocol, e.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.error("", e);
+ }
+ }
} catch (final IOException e) {
LOG.error("Unable to communicate with remote instance {} due to {}; closing connection", peer, e.toString());
- if ( LOG.isDebugEnabled() ) {
+ if (LOG.isDebugEnabled()) {
LOG.error("", e);
}
} catch (final Throwable t) {
LOG.error("Handshake failed when communicating with {}; closing connection. Reason for failure: {}", peerUri, t.toString());
- if ( LOG.isDebugEnabled() ) {
+ if (LOG.isDebugEnabled()) {
LOG.error("", t);
}
} finally {
LOG.trace("Cleaning up");
try {
- if ( protocol != null && peer != null ) {
+ if (protocol != null && peer != null) {
protocol.shutdown(peer);
}
} catch (final Exception protocolException) {
LOG.warn("Failed to shutdown protocol due to {}", protocolException.toString());
}
-
+
try {
- if ( peer != null ) {
+ if (peer != null) {
peer.close();
}
} catch (final Exception peerException) {
@@ -320,30 +322,30 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
listenerThread.setName("Site-to-Site Listener");
listenerThread.start();
}
-
+
@Override
public int getPort() {
return socketPort;
}
-
+
@Override
public void stop() {
stopped.set(true);
}
-
+
private void verifyMagicBytes(final InputStream in, final String peerDescription) throws IOException, HandshakeException {
final byte[] receivedMagicBytes = new byte[CommunicationsSession.MAGIC_BYTES.length];
// expect magic bytes
try {
- for (int i=0; i < receivedMagicBytes.length; i++) {
+ for (int i = 0; i < receivedMagicBytes.length; i++) {
receivedMagicBytes[i] = (byte) in.read();
}
} catch (final EOFException e) {
throw new HandshakeException("Handshake failed (not enough bytes) when communicating with " + peerDescription);
}
-
- if ( !Arrays.equals(CommunicationsSession.MAGIC_BYTES, receivedMagicBytes) ) {
+
+ if (!Arrays.equals(CommunicationsSession.MAGIC_BYTES, receivedMagicBytes)) {
throw new HandshakeException("Handshake with " + peerDescription + " failed because the Magic Header was not present");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index eec6ed5..982d9ff 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -56,14 +56,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StandardRemoteGroupPort extends RemoteGroupPort {
+
private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
public static final String USER_AGENT = "NiFi-Site-to-Site";
public static final String CONTENT_TYPE = "application/octet-stream";
-
+
public static final int GZIP_COMPRESSION_LEVEL = 1;
-
+
private static final String CATEGORY = "Site to Site";
-
+
private static final Logger logger = LoggerFactory.getLogger(StandardRemoteGroupPort.class);
private final RemoteProcessGroup remoteGroup;
private final AtomicBoolean useCompression = new AtomicBoolean(false);
@@ -71,28 +72,27 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
private final AtomicBoolean targetRunning = new AtomicBoolean(true);
private final SSLContext sslContext;
private final TransferDirection transferDirection;
-
+
private final AtomicReference<SiteToSiteClient> clientRef = new AtomicReference<>();
-
-
- public StandardRemoteGroupPort(final String id, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup,
+
+ public StandardRemoteGroupPort(final String id, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup,
final TransferDirection direction, final ConnectableType type, final SSLContext sslContext, final ProcessScheduler scheduler) {
// remote group port id needs to be unique but cannot just be the id of the port
// in the remote group instance. this supports referencing the same remote
// instance more than once.
super(id, name, processGroup, type, scheduler);
-
+
this.remoteGroup = remoteGroup;
this.transferDirection = direction;
this.sslContext = sslContext;
setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
}
-
+
private static File getPeerPersistenceFile(final String portId) {
final File stateDir = NiFiProperties.getInstance().getPersistentStateDirectory();
return new File(stateDir, portId + ".peers");
}
-
+
@Override
public boolean isTargetRunning() {
return targetRunning.get();
@@ -101,18 +101,18 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
public void setTargetRunning(boolean targetRunning) {
this.targetRunning.set(targetRunning);
}
-
+
@Override
public boolean isTriggerWhenEmpty() {
return getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT;
}
-
+
@Override
public void shutdown() {
- super.shutdown();
-
+ super.shutdown();
+
final SiteToSiteClient client = clientRef.get();
- if ( client != null ) {
+ if (client != null) {
try {
client.close();
} catch (final IOException ioe) {
@@ -120,58 +120,57 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
}
}
}
-
+
@Override
public void onSchedulingStart() {
super.onSchedulingStart();
-
+
final SiteToSiteClient client = new SiteToSiteClient.Builder()
- .url(remoteGroup.getTargetUri().toString())
- .portIdentifier(getIdentifier())
- .sslContext(sslContext)
- .eventReporter(remoteGroup.getEventReporter())
- .peerPersistenceFile(getPeerPersistenceFile(getIdentifier()))
- .build();
+ .url(remoteGroup.getTargetUri().toString())
+ .portIdentifier(getIdentifier())
+ .sslContext(sslContext)
+ .eventReporter(remoteGroup.getEventReporter())
+ .peerPersistenceFile(getPeerPersistenceFile(getIdentifier()))
+ .build();
clientRef.set(client);
}
-
-
+
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
- if ( !remoteGroup.isTransmitting() ) {
+ if (!remoteGroup.isTransmitting()) {
logger.debug("{} {} is not transmitting; will not send/receive", this, remoteGroup);
return;
}
- if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && session.getQueueSize().getObjectCount() == 0 ) {
+ if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && session.getQueueSize().getObjectCount() == 0) {
logger.debug("{} No data to send", this);
return;
}
-
+
String url = getRemoteProcessGroup().getTargetUri().toString();
-
+
// If we are sending data, we need to ensure that we have at least 1 FlowFile to send. Otherwise,
// we don't want to create a transaction at all.
final FlowFile firstFlowFile;
- if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
+ if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) {
firstFlowFile = session.get();
- if ( firstFlowFile == null ) {
+ if (firstFlowFile == null) {
return;
}
} else {
firstFlowFile = null;
}
-
+
final SiteToSiteClient client = clientRef.get();
final Transaction transaction;
try {
- transaction = client.createTransaction(transferDirection);
+ transaction = client.createTransaction(transferDirection);
} catch (final PortNotRunningException e) {
context.yield();
this.targetRunning.set(false);
final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port is not in a valid state", this, url);
logger.error(message);
- session.rollback();
+ session.rollback();
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
return;
} catch (final UnknownPortException e) {
@@ -179,22 +178,22 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
this.targetExists.set(false);
final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port no longer exists", this, url);
logger.error(message);
- session.rollback();
+ session.rollback();
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
return;
} catch (final IOException e) {
- context.yield();
+ context.yield();
final String message = String.format("%s failed to communicate with %s due to %s", this, url, e.toString());
logger.error(message);
- if ( logger.isDebugEnabled() ) {
+ if (logger.isDebugEnabled()) {
logger.error("", e);
}
- session.rollback();
+ session.rollback();
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
return;
}
-
- if ( transaction == null ) {
+
+ if (transaction == null) {
logger.debug("{} Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this);
session.rollback();
context.yield();
@@ -202,11 +201,11 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
}
try {
- if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
+ if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) {
transferFlowFiles(transaction, context, session, firstFlowFile);
} else {
final int numReceived = receiveFlowFiles(transaction, context, session);
- if ( numReceived == 0 ) {
+ if (numReceived == 0) {
context.yield();
}
}
@@ -215,24 +214,22 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
} catch (final Throwable t) {
final String message = String.format("%s failed to communicate with remote NiFi instance due to %s", this, t.toString());
logger.error("{} failed to communicate with remote NiFi instance due to {}", this, t.toString());
- if ( logger.isDebugEnabled() ) {
+ if (logger.isDebugEnabled()) {
logger.error("", t);
}
-
+
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
transaction.error();
session.rollback();
}
}
-
@Override
public String getYieldPeriod() {
// delegate yield duration to remote process group
return remoteGroup.getYieldDuration();
}
-
-
+
private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session, FlowFile firstFlowFile) throws IOException, ProtocolException {
FlowFile flowFile = firstFlowFile;
@@ -241,7 +238,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
final long startSendingNanos = System.nanoTime();
final StopWatch stopWatch = new StopWatch(true);
long bytesSent = 0L;
-
+
final Set<FlowFile> flowFilesSent = new HashSet<>();
boolean continueTransaction = true;
while (continueTransaction) {
@@ -255,79 +252,78 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
transaction.send(dataPacket);
}
});
-
+
final long transferNanos = System.nanoTime() - startNanos;
final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
-
+
flowFilesSent.add(flowFile);
bytesSent += flowFile.getSize();
logger.debug("{} Sent {} to {}", this, flowFile, transaction.getCommunicant().getUrl());
-
+
final String transitUri = transaction.getCommunicant().getUrl() + "/" + flowFile.getAttribute(CoreAttributes.UUID.key());
session.getProvenanceReporter().send(flowFile, transitUri, "Remote DN=" + userDn, transferMillis, false);
session.remove(flowFile);
-
+
final long sendingNanos = System.nanoTime() - startSendingNanos;
- if ( sendingNanos < BATCH_SEND_NANOS ) {
+ if (sendingNanos < BATCH_SEND_NANOS) {
flowFile = session.get();
} else {
flowFile = null;
}
-
+
continueTransaction = (flowFile != null);
}
-
+
transaction.confirm();
-
+
// consume input stream entirely, ignoring its contents. If we
// don't do this, the Connection will not be returned to the pool
stopWatch.stop();
final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
final String dataSize = FormatUtils.formatDataSize(bytesSent);
-
+
session.commit();
transaction.complete();
-
+
final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
- logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
+ logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{
this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
-
+
return flowFilesSent.size();
} catch (final Exception e) {
session.rollback();
throw e;
}
-
}
-
+
private int receiveFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session) throws IOException, ProtocolException {
final String userDn = transaction.getCommunicant().getDistinguishedName();
-
+
final StopWatch stopWatch = new StopWatch(true);
final Set<FlowFile> flowFilesReceived = new HashSet<>();
long bytesReceived = 0L;
-
+
while (true) {
final long start = System.nanoTime();
final DataPacket dataPacket = transaction.receive();
- if ( dataPacket == null ) {
+ if (dataPacket == null) {
break;
}
-
+
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
flowFile = session.importFrom(dataPacket.getData(), flowFile);
final long receiveNanos = System.nanoTime() - start;
-
+
String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
- if ( sourceFlowFileIdentifier == null ) {
+ if (sourceFlowFileIdentifier == null) {
sourceFlowFileIdentifier = "<Unknown Identifier>";
}
-
+
final String transitUri = transaction.getCommunicant().getUrl() + sourceFlowFileIdentifier;
- session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier,
+ session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier,
"Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos));
session.transfer(flowFile, Relationship.ANONYMOUS);
@@ -336,22 +332,22 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
// Confirm that what we received was the correct data.
transaction.confirm();
-
+
// Commit the session so that we have persisted the data
session.commit();
transaction.complete();
- if ( !flowFilesReceived.isEmpty() ) {
+ if (!flowFilesReceived.isEmpty()) {
stopWatch.stop();
final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
final String dataSize = FormatUtils.formatDataSize(bytesReceived);
- logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] {
- this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate });
+ logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{
+ this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
}
-
+
return flowFilesReceived.size();
}
@@ -371,44 +367,44 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
ValidationResult error = null;
if (!targetExists.get()) {
error = new ValidationResult.Builder()
- .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName()))
- .subject(String.format("Remote port '%s'", getName()))
- .valid(false)
- .build();
- } else if ( getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty() ) {
+ .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName()))
+ .subject(String.format("Remote port '%s'", getName()))
+ .valid(false)
+ .build();
+ } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) {
error = new ValidationResult.Builder()
- .explanation(String.format("Port '%s' has no outbound connections", getName()))
- .subject(String.format("Remote port '%s'", getName()))
- .valid(false)
- .build();
+ .explanation(String.format("Port '%s' has no outbound connections", getName()))
+ .subject(String.format("Remote port '%s'", getName()))
+ .valid(false)
+ .build();
}
-
- if ( error != null ) {
+
+ if (error != null) {
validationErrors.add(error);
}
-
+
return validationErrors;
}
-
+
@Override
public void verifyCanStart() {
super.verifyCanStart();
-
- if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && getIncomingConnections().isEmpty() ) {
+
+ if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && getIncomingConnections().isEmpty()) {
throw new IllegalStateException("Port " + getName() + " has no incoming connections");
}
}
-
+
@Override
public void setUseCompression(final boolean useCompression) {
this.useCompression.set(useCompression);
}
-
+
@Override
public boolean isUseCompression() {
return useCompression.get();
}
-
+
@Override
public String toString() {
return "RemoteGroupPort[name=" + getName() + ",target=" + remoteGroup.getTargetUri().toString() + "]";
@@ -418,34 +414,32 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
public RemoteProcessGroup getRemoteProcessGroup() {
return remoteGroup;
}
-
+
@Override
public TransferDirection getTransferDirection() {
return (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) ? TransferDirection.SEND : TransferDirection.RECEIVE;
}
-
+
public void setTargetExists(final boolean exists) {
this.targetExists.set(exists);
}
-
+
@Override
public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException {
super.removeConnection(connection);
-
- // If the Port no longer exists on the remote instance and this is the last Connection, tell
+
+ // If the Port no longer exists on the remote instance and this is the last Connection, tell
// RemoteProcessGroup to remove me
- if ( !getTargetExists() && !hasIncomingConnection() && getConnections().isEmpty() ) {
+ if (!getTargetExists() && !hasIncomingConnection() && getConnections().isEmpty()) {
remoteGroup.removeNonExistentPort(this);
}
}
-
-
+
@Override
public SchedulingStrategy getSchedulingStrategy() {
return SchedulingStrategy.TIMER_DRIVEN;
}
-
-
+
@Override
public boolean isSideEffectFree() {
return false;