You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/21 01:08:46 UTC
incubator-nifi git commit: NIFI-282: Refactoring to make client from
site-to-site components
Repository: incubator-nifi
Updated Branches:
refs/heads/site-to-site-client a6293e340 -> c174d3a60
NIFI-282: Refactoring to make client from site-to-site components
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/c174d3a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/c174d3a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/c174d3a6
Branch: refs/heads/site-to-site-client
Commit: c174d3a600358ebed8b8064247785606af6c6134
Parents: a6293e3
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Jan 20 19:07:18 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Jan 20 19:07:18 2015 -0500
----------------------------------------------------------------------
nifi/commons/site-to-site-client/pom.xml | 13 +
.../apache/nifi/remote/client/DataPacket.java | 28 --
.../nifi/remote/client/SiteToSiteClient.java | 5 +-
.../apache/nifi/remote/client/Transaction.java | 21 ++
.../socket/EndpointConnectionStatePool.java | 309 +++++++++++++------
.../nifi/remote/client/socket/SocketClient.java | 151 ++++++++-
.../nifi/remote/protocol/ClientProtocol.java | 15 +
.../apache/nifi/remote/protocol/DataPacket.java | 29 ++
.../protocol/socket/SocketClientProtocol.java | 73 ++++-
.../socket/SocketClientTransaction.java | 66 ++++
.../nifi/remote/util/RemoteNiFiUtils.java | 216 +++++++++++++
.../apache/nifi/groups/RemoteProcessGroup.java | 30 --
.../nifi/remote/StandardRemoteProcessGroup.java | 89 +-----
.../util/RemoteProcessGroupUtils.java | 216 -------------
.../nifi/remote/RemoteResourceFactory.java | 8 +
.../nifi/remote/StandardRemoteGroupPort.java | 28 +-
.../socket/SocketFlowFileServerProtocol.java | 9 +-
.../apache/nifi/remote/RemoteDestination.java | 10 -
18 files changed, 822 insertions(+), 494 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/pom.xml b/nifi/commons/site-to-site-client/pom.xml
index 7719d55..d65f440 100644
--- a/nifi/commons/site-to-site-client/pom.xml
+++ b/nifi/commons/site-to-site-client/pom.xml
@@ -21,6 +21,19 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>client-dto</artifactId>
+ <version>0.0.1-incubating-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-web-utils</artifactId>
+ </dependency>
<dependency>
<groupId>junit</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java
deleted file mode 100644
index ec77f2c..0000000
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.client;
-
-import java.io.InputStream;
-import java.util.Map;
-
-public interface DataPacket {
-
- Map<String, String> getAttributes();
-
- InputStream getData();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 47a09be..34cb56a 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -16,9 +16,12 @@
*/
package org.apache.nifi.remote.client;
+import java.io.Closeable;
import java.io.IOException;
-public interface SiteToSiteClient {
+import org.apache.nifi.remote.protocol.DataPacket;
+
+public interface SiteToSiteClient extends Closeable {
void send(DataPacket dataPacket) throws IOException;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java
new file mode 100644
index 0000000..bae6e51
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+public interface Transaction {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
index d20fb58..0718bb1 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
@@ -41,10 +41,16 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
@@ -72,21 +78,28 @@ import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSessio
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.apache.nifi.remote.util.PeerStatusCache;
+import org.apache.nifi.remote.util.RemoteNiFiUtils;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.web.api.dto.ControllerDTO;
+import org.apache.nifi.web.api.dto.PortDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EndpointConnectionStatePool {
public static final long PEER_REFRESH_PERIOD = 60000L;
public static final String CATEGORY = "Site-to-Site";
+ public static final long REMOTE_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
+
private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionStatePool.class);
- private final ConcurrentMap<String, BlockingQueue<EndpointConnectionState>> endpointConnectionMap = new ConcurrentHashMap<>();
+ private final BlockingQueue<EndpointConnectionState> connectionStateQueue = new LinkedBlockingQueue<>();
private final ConcurrentMap<PeerStatus, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
-
+ private final URI clusterUrl;
+ private final String apiUri;
+
private final AtomicLong peerIndex = new AtomicLong(0L);
private final ReentrantLock peerRefreshLock = new ReentrantLock();
@@ -98,15 +111,41 @@ public class EndpointConnectionStatePool {
private final File peersFile;
private final EventReporter eventReporter;
private final SSLContext sslContext;
+ private final ScheduledExecutorService taskExecutor;
+
+ private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock();
+ private final Lock remoteInfoReadLock = listeningPortRWLock.readLock();
+ private final Lock remoteInfoWriteLock = listeningPortRWLock.writeLock();
+ private Integer siteToSitePort;
+ private Boolean siteToSiteSecure;
+ private long remoteRefreshTime;
+ private final Map<String, String> inputPortMap = new HashMap<>(); // map input port name to identifier
+ private final Map<String, String> outputPortMap = new HashMap<>(); // map output port name to identifier
+
+ private volatile int commsTimeout;
- public EndpointConnectionStatePool(final EventReporter eventReporter, final File persistenceFile) {
- this(null, eventReporter, persistenceFile);
+ public EndpointConnectionStatePool(final String clusterUrl, final int commsTimeoutMillis, final EventReporter eventReporter, final File persistenceFile) {
+ this(clusterUrl, commsTimeoutMillis, null, eventReporter, persistenceFile);
}
- public EndpointConnectionStatePool(final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) {
+ public EndpointConnectionStatePool(final String clusterUrl, final int commsTimeoutMillis, final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) {
+ try {
+ this.clusterUrl = new URI(clusterUrl);
+ } catch (final URISyntaxException e) {
+ throw new IllegalArgumentException("Invalid Cluster URL: " + clusterUrl);
+ }
+
+ // Trim the trailing /
+ String uriPath = this.clusterUrl.getPath();
+ if (uriPath.endsWith("/")) {
+ uriPath = uriPath.substring(0, uriPath.length() - 1);
+ }
+ apiUri = this.clusterUrl.getScheme() + "://" + this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api";
+
this.sslContext = sslContext;
this.peersFile = persistenceFile;
this.eventReporter = eventReporter;
+ this.commsTimeout = commsTimeoutMillis;
Set<PeerStatus> recoveredStatuses;
if ( persistenceFile != null && persistenceFile.exists() ) {
@@ -119,21 +158,39 @@ public class EndpointConnectionStatePool {
} else {
peerStatusCache = null;
}
+
+ // Initialize a scheduled executor and run some maintenance tasks in the background to kill off old, unused
+ // connections and keep our list of peers up-to-date.
+ taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+ private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
+
+ @Override
+ public Thread newThread(final Runnable r) {
+ final Thread thread = defaultFactory.newThread(r);
+ thread.setName("NiFi Site-to-Site Connection Pool Maintenance");
+ return thread;
+ }
+ });
+
+ taskExecutor.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ refreshPeers();
+ }
+ }, 0, 5, TimeUnit.SECONDS);
+
+ taskExecutor.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ cleanupExpiredSockets();
+ }
+ }, 5, 5, TimeUnit.SECONDS);
}
- public EndpointConnectionState getEndpointConnectionState(final String clusterUrl, final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
+ public EndpointConnectionState getEndpointConnectionState(final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
//
// Attempt to get a connection state that already exists for this URL.
//
- BlockingQueue<EndpointConnectionState> connectionStateQueue = endpointConnectionMap.get(clusterUrl);
- if ( connectionStateQueue == null ) {
- connectionStateQueue = new LinkedBlockingQueue<>();
- BlockingQueue<EndpointConnectionState> existingQueue = endpointConnectionMap.putIfAbsent(clusterUrl, connectionStateQueue);
- if ( existingQueue != null ) {
- connectionStateQueue = existingQueue;
- }
- }
-
FlowFileCodec codec = null;
CommunicationsSession commsSession = null;
SocketClientProtocol protocol = null;
@@ -172,7 +229,7 @@ public class EndpointConnectionStatePool {
final String peerUrl = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort();
- peer = new Peer(commsSession, peerUrl, clusterUrl);
+ peer = new Peer(commsSession, peerUrl, clusterUrl.toString());
// perform handshake
try {
@@ -214,9 +271,8 @@ public class EndpointConnectionStatePool {
} else {
final long lastTimeUsed = connectionState.getLastTimeUsed();
final long millisSinceLastUse = System.currentTimeMillis() - lastTimeUsed;
- final long timeoutMillis = remoteDestination.getCommunicationsTimeout(TimeUnit.MILLISECONDS);
- if ( timeoutMillis > 0L && millisSinceLastUse >= timeoutMillis ) {
+ if ( commsTimeout > 0L && millisSinceLastUse >= commsTimeout ) {
cleanup(connectionState.getSocketClientProtocol(), connectionState.getPeer());
connectionState = null;
} else {
@@ -243,12 +299,7 @@ public class EndpointConnectionStatePool {
return false;
}
- final BlockingQueue<EndpointConnectionState> queue = endpointConnectionMap.get(url);
- if ( queue == null ) {
- return false;
- }
-
- return queue.offer(endpointConnectionState);
+ return connectionStateQueue.offer(endpointConnectionState);
}
/**
@@ -365,7 +416,7 @@ public class EndpointConnectionStatePool {
}
- public Set<PeerStatus> getPeerStatuses() {
+ private Set<PeerStatus> getPeerStatuses() {
final PeerStatusCache cache = this.peerStatusCache;
if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) {
return null;
@@ -384,12 +435,12 @@ public class EndpointConnectionStatePool {
return cache.getStatuses();
}
- private Set<PeerStatus> fetchRemotePeerStatuses(final URI destinationUri, final boolean secure) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException, BadRequestException {
- final String hostname = destinationUri.getHost();
- final int port = destinationUri.getPort();
+ private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException, HandshakeException, UnknownPortException, PortNotRunningException, BadRequestException {
+ final String hostname = clusterUrl.getHost();
+ final int port = getSiteToSitePort();
- final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port, secure);
- final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, destinationUri.toString());
+ final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port);
+ final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString());
final SocketClientProtocol clientProtocol = new SocketClientProtocol();
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
@@ -399,8 +450,8 @@ public class EndpointConnectionStatePool {
throw new BadRequestException(e.toString());
}
- // TODO: Make the 30000 millis configurable
- clientProtocol.handshake(peer, null, 30000);
+ clientProtocol.setTimeout(commsTimeout);
+ clientProtocol.handshake(peer, null);
final Set<PeerStatus> peerStatuses = clientProtocol.getPeerStatuses(peer);
persistPeerStatuses(peerStatuses);
@@ -474,38 +525,41 @@ public class EndpointConnectionStatePool {
}
- public CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException {
- return establishSiteToSiteConnection(peerStatus.getHostname(), peerStatus.getPort(), peerStatus.isSecure());
+ private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException {
+ return establishSiteToSiteConnection(peerStatus.getHostname(), peerStatus.getPort());
}
- public CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port, final boolean secure) throws IOException {
+ private CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port) throws IOException {
+ if ( siteToSiteSecure == null ) {
+ throw new IOException("Remote NiFi instance " + clusterUrl + " is not currently configured to accept site-to-site connections");
+ }
+
final String destinationUri = "nifi://" + hostname + ":" + port;
CommunicationsSession commsSession = null;
try {
- if ( secure ) {
- if ( sslContext == null ) {
- throw new IOException("Unable to communicate with " + hostname + ":" + port + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications");
- }
-
- final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
- socketChannel.connect();
-
- commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
-
- try {
- commsSession.setUserDn(socketChannel.getDn());
- } catch (final CertificateNotYetValidException | CertificateExpiredException ex) {
- throw new IOException(ex);
- }
- } else {
- final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
- commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri);
- }
-
- commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
-
- commsSession.setUri(destinationUri);
+ if ( siteToSiteSecure ) {
+ if ( sslContext == null ) {
+ throw new IOException("Unable to communicate with " + hostname + ":" + port + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications");
+ }
+
+ final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
+ socketChannel.connect();
+
+ commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
+
+ try {
+ commsSession.setUserDn(socketChannel.getDn());
+ } catch (final CertificateNotYetValidException | CertificateExpiredException ex) {
+ throw new IOException(ex);
+ }
+ } else {
+ final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
+ commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri);
+ }
+
+ commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
+ commsSession.setUri(destinationUri);
} catch (final IOException ioe) {
if ( commsSession != null ) {
commsSession.close();
@@ -578,59 +632,52 @@ public class EndpointConnectionStatePool {
}
- public void cleanupExpiredSockets() {
+ private void cleanupExpiredSockets() {
final List<EndpointConnectionState> states = new ArrayList<>();
- for ( final BlockingQueue<EndpointConnectionState> queue : endpointConnectionMap.values() ) {
- states.clear();
-
- EndpointConnectionState state;
- while ((state = queue.poll()) != null) {
- // If the socket has not been used in 10 seconds, shut it down.
- final long lastUsed = state.getLastTimeUsed();
- if ( lastUsed < System.currentTimeMillis() - 10000L ) {
- try {
- state.getSocketClientProtocol().shutdown(state.getPeer());
- } catch (final Exception e) {
- logger.debug("Failed to shut down {} using {} due to {}",
- new Object[] {state.getSocketClientProtocol(), state.getPeer(), e} );
- }
-
- cleanup(state.getSocketClientProtocol(), state.getPeer());
- } else {
- states.add(state);
+ EndpointConnectionState state;
+ while ((state = connectionStateQueue.poll()) != null) {
+ // If the socket has not been used in 10 seconds, shut it down.
+ final long lastUsed = state.getLastTimeUsed();
+ if ( lastUsed < System.currentTimeMillis() - 10000L ) {
+ try {
+ state.getSocketClientProtocol().shutdown(state.getPeer());
+ } catch (final Exception e) {
+ logger.debug("Failed to shut down {} using {} due to {}",
+ new Object[] {state.getSocketClientProtocol(), state.getPeer(), e} );
}
+
+ cleanup(state.getSocketClientProtocol(), state.getPeer());
+ } else {
+ states.add(state);
}
-
- queue.addAll(states);
}
+
+ connectionStateQueue.addAll(states);
}
public void shutdown() {
+ taskExecutor.shutdown();
peerTimeoutExpirations.clear();
for ( final CommunicationsSession commsSession : activeCommsChannels ) {
commsSession.interrupt();
}
- for ( final BlockingQueue<EndpointConnectionState> queue : endpointConnectionMap.values() ) {
- EndpointConnectionState state;
- while ( (state = queue.poll()) != null) {
- cleanup(state.getSocketClientProtocol(), state.getPeer());
- }
+ EndpointConnectionState state;
+ while ( (state = connectionStateQueue.poll()) != null) {
+ cleanup(state.getSocketClientProtocol(), state.getPeer());
}
-
- endpointConnectionMap.clear();
}
- public void refreshPeers(final URI targetUri, final boolean secure) {
+ private void refreshPeers() {
final PeerStatusCache existingCache = peerStatusCache;
if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis())) {
return;
}
try {
- final Set<PeerStatus> statuses = fetchRemotePeerStatuses(targetUri, secure);
+ final Set<PeerStatus> statuses = fetchRemotePeerStatuses();
peerStatusCache = new PeerStatusCache(statuses);
logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size());
} catch (Exception e) {
@@ -639,6 +686,92 @@ public class EndpointConnectionStatePool {
logger.warn("", e);
}
}
+ }
+
+
+ public String getInputPortIdentifier(final String portName) throws IOException {
+ return getPortIdentifier(portName, inputPortMap);
+ }
+
+ public String getOutputPortIdentifier(final String portName) throws IOException {
+ return getPortIdentifier(portName, outputPortMap);
+ }
+
+
+ private String getPortIdentifier(final String portName, final Map<String, String> portMap) throws IOException {
+ String identifier;
+ remoteInfoReadLock.lock();
+ try {
+ identifier = portMap.get(portName);
+ } finally {
+ remoteInfoReadLock.unlock();
+ }
+
+ if ( identifier != null ) {
+ return identifier;
+ }
+
+ refreshRemoteInfo();
+
+ remoteInfoReadLock.lock();
+ try {
+ return portMap.get(portName);
+ } finally {
+ remoteInfoReadLock.unlock();
+ }
+ }
+
+
+ private ControllerDTO refreshRemoteInfo() throws IOException {
+ final boolean webInterfaceSecure = clusterUrl.toString().startsWith("https");
+ final RemoteNiFiUtils utils = new RemoteNiFiUtils(webInterfaceSecure ? sslContext : null);
+ final ControllerDTO controller = utils.getController(URI.create(apiUri + "/controller"), commsTimeout);
+
+ remoteInfoWriteLock.lock();
+ try {
+ this.siteToSitePort = controller.getRemoteSiteListeningPort();
+ this.siteToSiteSecure = controller.isSiteToSiteSecure();
+
+ inputPortMap.clear();
+ for (final PortDTO inputPort : controller.getInputPorts()) {
+ inputPortMap.put(inputPort.getName(), inputPort.getId());
+ }
+
+ outputPortMap.clear();
+ for ( final PortDTO outputPort : controller.getOutputPorts()) {
+ outputPortMap.put(outputPort.getName(), outputPort.getId());
+ }
+
+ this.remoteRefreshTime = System.currentTimeMillis();
+ } finally {
+ remoteInfoWriteLock.unlock();
+ }
+
+ return controller;
+ }
+
+ /**
+ * @return the port that the remote instance is listening on for
+ * site-to-site communication, or <code>null</code> if the remote instance
+ * is not configured to allow site-to-site communications.
+ *
+ * @throws IOException if unable to communicate with the remote instance
+ */
+ private Integer getSiteToSitePort() throws IOException {
+ Integer listeningPort;
+ remoteInfoReadLock.lock();
+ try {
+ listeningPort = this.siteToSitePort;
+ if (listeningPort != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
+ return listeningPort;
+ }
+ } finally {
+ remoteInfoReadLock.unlock();
+ }
+
+ final ControllerDTO controller = refreshRemoteInfo();
+ listeningPort = controller.getRemoteSiteListeningPort();
+ return listeningPort;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index 48e9cc5..b81b425 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -16,16 +16,87 @@
*/
package org.apache.nifi.remote.client.socket;
+import java.io.File;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
-import org.apache.nifi.remote.client.DataPacket;
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.RemoteDestination;
+import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.PortNotRunningException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.UnknownPortException;
+import org.apache.nifi.remote.protocol.DataPacket;
public class SocketClient implements SiteToSiteClient {
-
+ private final EndpointConnectionStatePool pool;
+ private final boolean compress;
+ private final String portName;
+ private final long penalizationNanos;
+ private volatile String portIdentifier;
+
+ private SocketClient(final Builder builder) {
+ pool = new EndpointConnectionStatePool(builder.url, (int) TimeUnit.NANOSECONDS.toMillis(builder.timeoutNanos),
+ builder.sslContext, builder.eventReporter, builder.peerPersistenceFile);
+
+ this.compress = builder.useCompression;
+ this.portIdentifier = builder.portIdentifier;
+ this.portName = builder.portName;
+ this.penalizationNanos = builder.penalizationNanos;
+ }
+
+
+ private String getPortIdentifier(final TransferDirection direction) throws IOException {
+ final String id = this.portIdentifier;
+ if ( id != null ) {
+ return id;
+ }
+
+ if ( direction == TransferDirection.SEND ) {
+ return pool.getInputPortIdentifier(this.portName);
+ } else {
+ return pool.getOutputPortIdentifier(this.portName);
+ }
+ }
+
+
@Override
public void send(final DataPacket dataPacket) throws IOException {
- // TODO Auto-generated method stub
+ final String portId = getPortIdentifier(TransferDirection.SEND);
+
+ if ( portId == null ) {
+ throw new IOException("Could not find Port with name " + portName + " for remote NiFi instance");
+ }
+
+ final RemoteDestination remoteDestination = new RemoteDestination() {
+ @Override
+ public String getIdentifier() {
+ return portId;
+ }
+
+ @Override
+ public long getYieldPeriod(final TimeUnit timeUnit) {
+ return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public boolean isUseCompression() {
+ return compress;
+ }
+ };
+
+ final EndpointConnectionState connectionState;
+ try {
+ connectionState = pool.getEndpointConnectionState(remoteDestination, TransferDirection.SEND);
+ } catch (final ProtocolException | HandshakeException | PortNotRunningException | UnknownPortException e) {
+ throw new IOException(e);
+ }
+
+
}
@Override
@@ -33,5 +104,79 @@ public class SocketClient implements SiteToSiteClient {
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public void close() throws IOException {
+ pool.shutdown();
+ }
+
+ public static class Builder {
+ private String url;
+ private long timeoutNanos = TimeUnit.SECONDS.toNanos(30);
+ private long penalizationNanos = TimeUnit.SECONDS.toNanos(3);
+ private SSLContext sslContext;
+ private EventReporter eventReporter;
+ private File peerPersistenceFile;
+ private boolean useCompression;
+ private String portName;
+ private String portIdentifier;
+
+ public Builder url(final String url) {
+ this.url = url;
+ return this;
+ }
+
+ public Builder timeout(final long timeout, final TimeUnit unit) {
+ this.timeoutNanos = unit.toNanos(timeout);
+ return this;
+ }
+
+ public Builder nodePenalizationPeriod(final long period, final TimeUnit unit) {
+ this.penalizationNanos = unit.toNanos(period);
+ return this;
+ }
+
+ public Builder sslContext(final SSLContext sslContext) {
+ this.sslContext = sslContext;
+ return this;
+ }
+
+ public Builder eventReporter(final EventReporter eventReporter) {
+ this.eventReporter = eventReporter;
+ return this;
+ }
+
+ public Builder peerPersistenceFile(final File peerPersistenceFile) {
+ this.peerPersistenceFile = peerPersistenceFile;
+ return this;
+ }
+
+ public Builder useCompression(final boolean compress) {
+ this.useCompression = compress;
+ return this;
+ }
+
+ public Builder portName(final String portName) {
+ this.portName = portName;
+ return this;
+ }
+
+ public Builder portIdentifier(final String portIdentifier) {
+ this.portIdentifier = portIdentifier;
+ return this;
+ }
+
+ public SocketClient build() {
+ if ( url == null ) {
+ throw new IllegalStateException("Must specify URL to build Site-to-Site client");
+ }
+
+ if ( portName == null && portIdentifier == null ) {
+ throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client");
+ }
+
+ return new SocketClient(this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
index 32274eb..d817425 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
@@ -23,6 +23,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.VersionedRemoteResource;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
@@ -46,6 +47,20 @@ public interface ClientProtocol extends VersionedRemoteResource {
boolean isReadyForFileTransfer();
+
+
+
+ void startTransaction(Peer peer, TransferDirection direction) throws IOException;
+
+ void completeTransaction();
+
+ void rollbackTransaction();
+
+ void transferData(Peer peer, DataPacket dataPacket, FlowFileCodec codec) throws IOException, ProtocolException;
+
+ DataPacket receiveData(Peer peer, FlowFileCodec codec) throws IOException, ProtocolException;
+
+
/**
* returns <code>true</code> if remote instance indicates that the port is
* invalid
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
new file mode 100644
index 0000000..f4fa4d0
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.InputStream;
+import java.util.Map;
+
+public interface DataPacket {
+
+ Map<String, String> getAttributes();
+
+ InputStream getData();
+
+ long getSize();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 560385c..6b0c94b 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -41,7 +41,9 @@ import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.RemoteResourceInitiator;
import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.client.Transaction;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.codec.StandardFlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
@@ -50,6 +52,7 @@ import org.apache.nifi.remote.io.CompressionInputStream;
import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.protocol.ClientProtocol;
import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.StopWatch;
@@ -60,7 +63,7 @@ public class SocketClientProtocol implements ClientProtocol {
private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1);
private RemoteDestination destination;
- private boolean useCompression;
+ private boolean useCompression = false;
private String commsIdentifier;
private boolean handshakeComplete = false;
@@ -70,6 +73,7 @@ public class SocketClientProtocol implements ClientProtocol {
private Response handshakeResponse = null;
private boolean readyForFileTransfer = false;
private String transitUriPrefix = null;
+ private int timeoutMillis = 30000;
private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
@@ -81,13 +85,16 @@ public class SocketClientProtocol implements ClientProtocol {
this.useCompression = destination.isUseCompression();
}
+ public void setTimeout(final int timeoutMillis) {
+ this.timeoutMillis = timeoutMillis;
+ }
@Override
public void handshake(final Peer peer) throws IOException, HandshakeException {
- handshake(peer, destination.getIdentifier(), (int) destination.getCommunicationsTimeout(TimeUnit.MILLISECONDS));
+ handshake(peer, destination.getIdentifier());
}
- public void handshake(final Peer peer, final String destinationId, final int timeoutMillis) throws IOException, HandshakeException {
+ public void handshake(final Peer peer, final String destinationId) throws IOException, HandshakeException {
if ( handshakeComplete ) {
throw new IllegalStateException("Handshake has already been completed");
}
@@ -228,6 +235,65 @@ public class SocketClientProtocol implements ClientProtocol {
return codec;
}
+
+ // TODO: move up to top with member variables
+ private SocketClientTransaction transaction;
+
+ @Override
+ public void startTransaction(final Peer peer, final TransferDirection direction) throws IOException {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not been performed");
+ }
+ if ( !readyForFileTransfer ) {
+ throw new IllegalStateException("Cannot start transaction; handshake resolution was " + handshakeResponse);
+ }
+
+ transaction = new SocketClientTransaction(peer, direction, useCompression);
+
+ final DataOutputStream dos = transaction.getDataOutputStream();
+ if ( direction == TransferDirection.RECEIVE ) {
+ // Indicate that we would like to have some data
+ RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
+ dos.flush();
+ } else {
+ // Indicate that we would like to have some data
+ RequestType.SEND_FLOWFILES.writeRequestType(dos);
+ dos.flush();
+ }
+ }
+
+ @Override
+ public DataPacket receiveData(final FlowFileCodec codec) throws IOException, ProtocolException {
+ if ( transaction == null ) {
+ throw new IllegalStateException("Cannot receive data because no transaction has been started");
+ }
+
+ final Peer peer = transaction.getPeer();
+ logger.debug("{} Receiving FlowFiles from {}", this, peer);
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+ String userDn = commsSession.getUserDn();
+ if ( userDn == null ) {
+ userDn = "none";
+ }
+
+ // Determine if Peer will send us data or has no data to send us
+ final Response dataAvailableCode = Response.read(dis);
+ switch (dataAvailableCode.getCode()) {
+ case MORE_DATA:
+ logger.debug("{} {} Indicates that data is available", this, peer);
+ break;
+ case NO_MORE_DATA:
+ logger.debug("{} No data available from {}", peer);
+ return null;
+ default:
+ throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+ }
+
+
+ }
+
@Override
public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
@@ -258,6 +324,7 @@ public class SocketClientProtocol implements ClientProtocol {
logger.debug("{} {} Indicates that data is available", this, peer);
break;
case NO_MORE_DATA:
+ context.yield();
logger.debug("{} No data available from {}", peer);
return;
default:
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
new file mode 100644
index 0000000..0c4ce05
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.Transaction;
+import org.apache.nifi.remote.io.CompressionInputStream;
+
+public class SocketClientTransaction implements Transaction {
+ private final long startTime = System.nanoTime();
+ private long bytesReceived = 0L;
+ private CRC32 crc = new CRC32();
+
+ private final Peer peer;
+ private final TransferDirection direction;
+
+ private final DataInputStream dis;
+ private final DataOutputStream dos;
+ private final CheckedInputStream checkedInputStream;
+
+ SocketClientTransaction(final Peer peer, final TransferDirection direction, final boolean useCompression) throws IOException {
+ this.peer = peer;
+ this.direction = direction;
+
+ this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream());
+ this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
+
+ final InputStream dataInputStream = useCompression ? new CompressionInputStream(dis) : dis;
+ checkedInputStream = new CheckedInputStream(dataInputStream, crc);
+ }
+
+ CheckedInputStream getCheckedInputStream() {
+ return checkedInputStream;
+ }
+
+ DataOutputStream getDataOutputStream() {
+ return dos;
+ }
+
+ Peer getPeer() {
+ return peer;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
new file mode 100644
index 0000000..b2dbdcd
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.nifi.web.api.dto.ControllerDTO;
+import org.apache.nifi.web.api.entity.ControllerEntity;
+import org.apache.nifi.web.util.WebUtils;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
+/**
+ *
+ */
+public class RemoteNiFiUtils {
+
+ public static final String CONTROLLER_URI_PATH = "/controller";
+
+ private static final int CONNECT_TIMEOUT = 10000;
+ private static final int READ_TIMEOUT = 10000;
+
+ private final Client client;
+
+ public RemoteNiFiUtils(final SSLContext sslContext) {
+ this.client = getClient(sslContext);
+ }
+
+
+ /**
+ * Gets the content at the specified URI.
+ *
+ * @param uri
+ * @param timeoutMillis
+ * @return
+ * @throws ClientHandlerException
+ * @throws UniformInterfaceException
+ */
+ public ClientResponse get(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
+ return get(uri, timeoutMillis, null);
+ }
+
+ /**
+ * Gets the content at the specified URI using the given query parameters.
+ *
+ * @param uri
+ * @param timeoutMillis
+ * @param queryParams
+ * @return
+ * @throws ClientHandlerException
+ * @throws UniformInterfaceException
+ */
+ public ClientResponse get(final URI uri, final int timeoutMillis, final Map<String, String> queryParams) throws ClientHandlerException, UniformInterfaceException {
+ // perform the request
+ WebResource webResource = client.resource(uri);
+ if ( queryParams != null ) {
+ for ( final Map.Entry<String, String> queryEntry : queryParams.entrySet() ) {
+ webResource = webResource.queryParam(queryEntry.getKey(), queryEntry.getValue());
+ }
+ }
+
+ webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
+ webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
+
+ return webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ }
+
+ /**
+ * Performs a HEAD request to the specified URI.
+ *
+ * @param uri
+ * @param timeoutMillis
+ * @return
+ * @throws ClientHandlerException
+ * @throws UniformInterfaceException
+ */
+ public ClientResponse head(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
+ // perform the request
+ WebResource webResource = client.resource(uri);
+ webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
+ webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
+ return webResource.head();
+ }
+
+ /**
+ * Gets a client based on the specified URI.
+ *
+ * @param uri
+ * @return
+ */
+ private Client getClient(final SSLContext sslContext) {
+ final Client client;
+ if (sslContext == null) {
+ client = WebUtils.createClient(null);
+ } else {
+ client = WebUtils.createClient(null, sslContext);
+ }
+
+ client.setReadTimeout(READ_TIMEOUT);
+ client.setConnectTimeout(CONNECT_TIMEOUT);
+
+ return client;
+ }
+
+
+ /**
+ * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance
+ * is not configured to use Site-to-Site transfers.
+ *
+ * @param uri the base URI of the remote instance. This should include the path only to the nifi-api level, as well as the protocol, host, and port.
+ * @param timeoutMillis
+ * @return
+ * @throws IOException
+ */
+ public Integer getRemoteListeningPort(final String uri, final int timeoutMillis) throws IOException {
+ try {
+ final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+ return getRemoteListeningPort(uriObject, timeoutMillis);
+ } catch (URISyntaxException e) {
+ throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+ }
+ }
+
+ public String getRemoteRootGroupId(final String uri, final int timeoutMillis) throws IOException {
+ try {
+ final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+ return getRemoteRootGroupId(uriObject, timeoutMillis);
+ } catch (URISyntaxException e) {
+ throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+ }
+ }
+
+ public String getRemoteInstanceId(final String uri, final int timeoutMillis) throws IOException {
+ try {
+ final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+ return getController(uriObject, timeoutMillis).getInstanceId();
+ } catch (URISyntaxException e) {
+ throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+ }
+ }
+
+ /**
+ * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance
+ * is not configured to use Site-to-Site transfers.
+ *
+ * @param uri the full URI to fetch, including the path.
+ * @return
+ * @throws IOException
+ */
+ private Integer getRemoteListeningPort(final URI uri, final int timeoutMillis) throws IOException {
+ return getController(uri, timeoutMillis).getRemoteSiteListeningPort();
+ }
+
+ private String getRemoteRootGroupId(final URI uri, final int timeoutMillis) throws IOException {
+ return getController(uri, timeoutMillis).getId();
+ }
+
+ public ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException {
+ final ClientResponse response = get(uri, timeoutMillis);
+
+ if (Status.OK.getStatusCode() == response.getStatusInfo().getStatusCode()) {
+ final ControllerEntity entity = response.getEntity(ControllerEntity.class);
+ return entity.getController();
+ } else {
+ final String responseMessage = response.getEntity(String.class);
+ throw new IOException("Got HTTP response Code " + response.getStatusInfo().getStatusCode() + ": " + response.getStatusInfo().getReasonPhrase() + " with explanation: " + responseMessage);
+ }
+ }
+
+ /**
+ * Issues a registration request on behalf of the current user.
+ *
+ * @param baseApiUri
+ * @return
+ */
+ public ClientResponse issueRegistrationRequest(String baseApiUri) {
+ final URI uri = URI.create(String.format("%s/%s", baseApiUri, "/controller/users"));
+
+ // set up the query params
+ MultivaluedMapImpl entity = new MultivaluedMapImpl();
+ entity.add("justification", "A Remote instance of NiFi has attempted to create a reference to this NiFi. This action must be approved first.");
+
+ // create the web resource
+ WebResource webResource = client.resource(uri);
+
+ // get the client utils and make the request
+ return webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED).entity(entity).post(ClientResponse.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index 9f2dac8..ac41cba 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.groups;
-import java.io.IOException;
import java.net.URI;
import java.util.Date;
import java.util.Set;
@@ -109,15 +108,6 @@ public interface RemoteProcessGroup {
String getCommunicationsTimeout();
/**
- * @return the port that the remote instance is listening on for
- * site-to-site communication, or <code>null</code> if the remote instance
- * is not configured to allow site-to-site communications.
- *
- * @throws IOException if unable to communicate with the remote instance
- */
- Integer getListeningPort() throws IOException;
-
- /**
* Indicates whether or not the RemoteProcessGroup is currently scheduled to
* transmit data
*
@@ -229,24 +219,4 @@ public interface RemoteProcessGroup {
void verifyCanStopTransmitting();
void verifyCanUpdate();
-
- /**
- * Returns a set of PeerStatus objects that describe the different peers
- * that we can communicate with for this RemoteProcessGroup.
- *
- * If the destination is a cluster, this set will contain PeerStatuses for
- * each of the nodes in the cluster.
- *
- * If the destination is a standalone instance, this set will contain just a
- * PeerStatus for the destination.
- *
- * Once the PeerStatuses have been obtained, they may be cached by this
- * RemoteProcessGroup for some amount of time.
- *
- * If unable to obtain the PeerStatuses or no peer status has yet been
- * obtained, will return null.
- *
- * @return
- */
- Set<PeerStatus> getPeerStatuses();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 857add9..db0aeb7 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -19,7 +19,6 @@ package org.apache.nifi.remote;
import static java.util.Objects.requireNonNull;
import java.io.File;
-import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@@ -50,7 +49,6 @@ import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.exception.CommunicationsException;
-import org.apache.nifi.controller.util.RemoteProcessGroupUtils;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.events.EventReporter;
@@ -59,6 +57,7 @@ import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
+import org.apache.nifi.remote.util.RemoteNiFiUtils;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils;
@@ -85,7 +84,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
public static final String CONTROLLER_URI_PATH = "/controller";
public static final String ROOT_GROUP_STATUS_URI_PATH = "/controller/process-groups/root/status";
- public static final long LISTENING_PORT_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
// status codes
public static final int OK_STATUS_CODE = Status.OK.getStatusCode();
@@ -127,9 +125,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private ProcessGroupCounts counts = new ProcessGroupCounts(0, 0, 0, 0, 0, 0, 0, 0);
private Long refreshContentsTimestamp = null;
- private Integer listeningPort;
- private long listeningPortRetrievalTime = 0L;
private Boolean destinationSecure;
+ private Integer listeningPort;
private volatile String authorizationIssue;
@@ -175,48 +172,13 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
};
- endpointConnectionPool = new EndpointConnectionStatePool(sslContext, eventReporter, getPeerPersistenceFile());
+ endpointConnectionPool = new EndpointConnectionStatePool(getTargetUri().toString(), getCommunicationsTimeout(TimeUnit.MILLISECONDS),
+ sslContext, eventReporter, getPeerPersistenceFile());
- final Runnable socketCleanup = new Runnable() {
- @Override
- public void run() {
- final Set<StandardRemoteGroupPort> ports = new HashSet<>();
- readLock.lock();
- try {
- ports.addAll(inputPorts.values());
- ports.addAll(outputPorts.values());
- } finally {
- readLock.unlock();
- }
-
- endpointConnectionPool.cleanupExpiredSockets();
- }
- };
-
- final Runnable refreshPeers = new Runnable() {
- @Override
- public void run() {
- final boolean secure;
- try {
- secure = isSecure();
- } catch (CommunicationsException e) {
- logger.warn("{} Unable to determine if remote instance {} is configured for secure site-to-site due to {}; will not refresh list of peers", new Object[] {this, getTargetUri(), e.toString()});
- if ( logger.isDebugEnabled() ) {
- logger.warn("", e);
- }
- return;
- }
-
- endpointConnectionPool.refreshPeers(getTargetUri(), secure);
- }
- };
-
final Runnable checkAuthorizations = new InitializationTask();
backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id + ": " + targetUri);
backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 0L, 30L, TimeUnit.SECONDS);
- backgroundThreadExecutor.scheduleWithFixedDelay(refreshPeers, 0, 5, TimeUnit.SECONDS);
- backgroundThreadExecutor.scheduleWithFixedDelay(socketCleanup, 10L, 10L, TimeUnit.SECONDS);
}
@Override
@@ -810,7 +772,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
return;
}
- final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null);
+ final RemoteNiFiUtils utils = new RemoteNiFiUtils(isWebApiSecure() ? sslContext : null);
final String uriVal = apiUri.toString() + CONTROLLER_URI_PATH;
URI uri;
try {
@@ -950,39 +912,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
return descriptor;
}
- /**
- * @return the port that the remote instance is listening on for
- * site-to-site communication, or <code>null</code> if the remote instance
- * is not configured to allow site-to-site communications.
- *
- * @throws IOException if unable to communicate with the remote instance
- */
- @Override
- public Integer getListeningPort() throws IOException {
- Integer listeningPort;
- readLock.lock();
- try {
- listeningPort = this.listeningPort;
- if (listeningPort != null && this.listeningPortRetrievalTime > System.currentTimeMillis() - LISTENING_PORT_REFRESH_MILLIS) {
- return listeningPort;
- }
- } finally {
- readLock.unlock();
- }
-
- final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null);
- listeningPort = utils.getRemoteListeningPort(apiUri.toString(), getCommunicationsTimeout(TimeUnit.MILLISECONDS));
- writeLock.lock();
- try {
- this.listeningPort = listeningPort;
- this.listeningPortRetrievalTime = System.currentTimeMillis();
- } finally {
- writeLock.unlock();
- }
-
- return listeningPort;
- }
-
@Override
public boolean isTransmitting() {
return transmitting.get();
@@ -1218,7 +1147,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
@Override
public void run() {
try {
- final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null);
+ final RemoteNiFiUtils utils = new RemoteNiFiUtils(isWebApiSecure() ? sslContext : null);
final ClientResponse response = utils.get(new URI(apiUri + CONTROLLER_URI_PATH), getCommunicationsTimeout(TimeUnit.MILLISECONDS));
final int statusCode = response.getStatus();
@@ -1398,12 +1327,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
}
- @Override
- public Set<PeerStatus> getPeerStatuses() {
- return endpointConnectionPool.getPeerStatuses();
- }
-
-
private File getPeerPersistenceFile() {
final File stateDir = NiFiProperties.getInstance().getPersistentStateDirectory();
return new File(stateDir, getIdentifier() + ".peers");
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.java
deleted file mode 100644
index 10208f8..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.util;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Map;
-
-import javax.net.ssl.SSLContext;
-import javax.ws.rs.core.MediaType;
-
-import org.apache.nifi.web.api.dto.ControllerDTO;
-import org.apache.nifi.web.api.entity.ControllerEntity;
-import org.apache.nifi.web.util.WebUtils;
-
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientHandlerException;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.ClientResponse.Status;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.UniformInterfaceException;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.core.util.MultivaluedMapImpl;
-
-/**
- *
- */
-public class RemoteProcessGroupUtils {
-
- public static final String CONTROLLER_URI_PATH = "/controller";
-
- private static final int CONNECT_TIMEOUT = 10000;
- private static final int READ_TIMEOUT = 10000;
-
- private final Client client;
-
- public RemoteProcessGroupUtils(final SSLContext sslContext) {
- this.client = getClient(sslContext);
- }
-
-
- /**
- * Gets the content at the specified URI.
- *
- * @param uri
- * @param timeoutMillis
- * @return
- * @throws ClientHandlerException
- * @throws UniformInterfaceException
- */
- public ClientResponse get(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
- return get(uri, timeoutMillis, null);
- }
-
- /**
- * Gets the content at the specified URI using the given query parameters.
- *
- * @param uri
- * @param timeoutMillis
- * @param queryParams
- * @return
- * @throws ClientHandlerException
- * @throws UniformInterfaceException
- */
- public ClientResponse get(final URI uri, final int timeoutMillis, final Map<String, String> queryParams) throws ClientHandlerException, UniformInterfaceException {
- // perform the request
- WebResource webResource = client.resource(uri);
- if ( queryParams != null ) {
- for ( final Map.Entry<String, String> queryEntry : queryParams.entrySet() ) {
- webResource = webResource.queryParam(queryEntry.getKey(), queryEntry.getValue());
- }
- }
-
- webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
- webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
-
- return webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
- }
-
- /**
- * Performs a HEAD request to the specified URI.
- *
- * @param uri
- * @param timeoutMillis
- * @return
- * @throws ClientHandlerException
- * @throws UniformInterfaceException
- */
- public ClientResponse head(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
- // perform the request
- WebResource webResource = client.resource(uri);
- webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
- webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
- return webResource.head();
- }
-
- /**
- * Gets a client based on the specified URI.
- *
- * @param uri
- * @return
- */
- private Client getClient(final SSLContext sslContext) {
- final Client client;
- if (sslContext == null) {
- client = WebUtils.createClient(null);
- } else {
- client = WebUtils.createClient(null, sslContext);
- }
-
- client.setReadTimeout(READ_TIMEOUT);
- client.setConnectTimeout(CONNECT_TIMEOUT);
-
- return client;
- }
-
-
- /**
- * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance
- * is not configured to use Site-to-Site transfers.
- *
- * @param uri the base URI of the remote instance. This should include the path only to the nifi-api level, as well as the protocol, host, and port.
- * @param timeoutMillis
- * @return
- * @throws IOException
- */
- public Integer getRemoteListeningPort(final String uri, final int timeoutMillis) throws IOException {
- try {
- final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
- return getRemoteListeningPort(uriObject, timeoutMillis);
- } catch (URISyntaxException e) {
- throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
- }
- }
-
- public String getRemoteRootGroupId(final String uri, final int timeoutMillis) throws IOException {
- try {
- final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
- return getRemoteRootGroupId(uriObject, timeoutMillis);
- } catch (URISyntaxException e) {
- throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
- }
- }
-
- public String getRemoteInstanceId(final String uri, final int timeoutMillis) throws IOException {
- try {
- final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
- return getController(uriObject, timeoutMillis).getInstanceId();
- } catch (URISyntaxException e) {
- throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
- }
- }
-
- /**
- * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance
- * is not configured to use Site-to-Site transfers.
- *
- * @param uri the full URI to fetch, including the path.
- * @return
- * @throws IOException
- */
- private Integer getRemoteListeningPort(final URI uri, final int timeoutMillis) throws IOException {
- return getController(uri, timeoutMillis).getRemoteSiteListeningPort();
- }
-
- private String getRemoteRootGroupId(final URI uri, final int timeoutMillis) throws IOException {
- return getController(uri, timeoutMillis).getId();
- }
-
- private ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException {
- final ClientResponse response = get(uri, timeoutMillis);
-
- if (Status.OK.getStatusCode() == response.getStatusInfo().getStatusCode()) {
- final ControllerEntity entity = response.getEntity(ControllerEntity.class);
- return entity.getController();
- } else {
- final String responseMessage = response.getEntity(String.class);
- throw new IOException("Got HTTP response Code " + response.getStatusInfo().getStatusCode() + ": " + response.getStatusInfo().getReasonPhrase() + " with explanation: " + responseMessage);
- }
- }
-
- /**
- * Issues a registration request on behalf of the current user.
- *
- * @param baseApiUri
- * @return
- */
- public ClientResponse issueRegistrationRequest(String baseApiUri) {
- final URI uri = URI.create(String.format("%s/%s", baseApiUri, "/controller/users"));
-
- // set up the query params
- MultivaluedMapImpl entity = new MultivaluedMapImpl();
- entity.add("justification", "A Remote instance of NiFi has attempted to create a reference to this NiFi. This action must be approved first.");
-
- // create the web resource
- WebResource webResource = client.resource(uri);
-
- // get the client utils and make the request
- return webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED).entity(entity).post(ClientResponse.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
index 922d4e7..2b27de2 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
@@ -56,6 +56,14 @@ public class RemoteResourceFactory extends RemoteResourceInitiator {
}
}
+ public static void rejectCodecNegotiation(final DataInputStream dis, final DataOutputStream dos, final String explanation) throws IOException {
+ dis.readUTF(); // read codec name
+ dis.readInt(); // read codec version
+
+ dos.write(ABORT);
+ dos.writeUTF(explanation);
+ dos.flush();
+ }
@SuppressWarnings("unchecked")
public static <T extends ClientProtocol> T receiveClientProtocolNegotiation(final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 77ac1a9..82d8206 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -17,7 +17,6 @@
package org.apache.nifi.remote;
import java.io.IOException;
-import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -33,7 +32,6 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ProcessScheduler;
-import org.apache.nifi.controller.exception.CommunicationsException;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.ProcessContext;
@@ -144,7 +142,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
final EndpointConnectionState connectionState;
try {
- connectionState = connectionStatePool.getEndpointConnectionState(url, this, transferDirection);
+ connectionState = connectionStatePool.getEndpointConnectionState(this, transferDirection);
} catch (final PortNotRunningException e) {
context.yield();
this.targetRunning.set(false);
@@ -366,28 +364,4 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
public boolean isSideEffectFree() {
return false;
}
-
- @Override
- public String getDescription() {
- return toString();
- }
-
- @Override
- public long getCommunicationsTimeout(final TimeUnit timeUnit) {
- return getRemoteProcessGroup().getCommunicationsTimeout(timeUnit);
- }
-
- @Override
- public URI getTargetUri() {
- return remoteGroup.getTargetUri();
- }
-
- @Override
- public boolean isSecure() {
- try {
- return remoteGroup.isSecure();
- } catch (final CommunicationsException ce) {
- return false;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index 647b45c..887429c 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -204,11 +204,6 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.GZIP.name());
throw new HandshakeException("Missing Property " + HandshakeProperty.GZIP.name());
}
- if ( port == null ) {
- logger.debug("Responding with ResponseCode MISSING_PROPERTY because Port Identifier property is missing");
- ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.PORT_IDENTIFIER.name());
- throw new HandshakeException("Missing Property " + HandshakeProperty.PORT_IDENTIFIER.name());
- }
// send "OK" response
if ( !responseWritten ) {
@@ -243,6 +238,10 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+ if ( port == null ) {
+ RemoteResourceFactory.rejectCodecNegotiation(dis, dos, "Cannot transfer FlowFiles because no port was specified");
+ }
+
// Negotiate the FlowFileCodec to use.
try {
negotiatedFlowFileCodec = RemoteResourceFactory.receiveCodecNegotiation(dis, dos);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
index 94de86b..8c972f7 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
@@ -16,21 +16,11 @@
*/
package org.apache.nifi.remote;
-import java.net.URI;
import java.util.concurrent.TimeUnit;
public interface RemoteDestination {
-
- String getDescription();
-
String getIdentifier();
- URI getTargetUri();
-
- boolean isSecure();
-
- long getCommunicationsTimeout(TimeUnit timeUnit);
-
long getYieldPeriod(TimeUnit timeUnit);
boolean isUseCompression();