You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/16 20:18:40 UTC
incubator-nifi git commit: NIFI-282: Refactored to remove Jersey
client from dependencies; made site-to-site config serializable;
allowed SiteToSiteClient.Builder to build a SiteToSiteClientConfig without
building the client itself.
Repository: incubator-nifi
Updated Branches:
refs/heads/nifi-site-to-site-client 8f0402fbb -> e16fc7972
NIFI-282: Refactored to remove Jersey client from dependencies; made site-to-site config serializable; allowed SiteToSiteClient.Builder to build a SiteToSiteClientConfig without building the client itself.
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/e16fc797
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/e16fc797
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/e16fc797
Branch: refs/heads/nifi-site-to-site-client
Commit: e16fc7972c24a04d8212e26f66fdcb6e940ffe86
Parents: 8f0402f
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Feb 16 14:18:24 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 16 14:18:24 2015 -0500
----------------------------------------------------------------------
.../apache/nifi/remote/RemoteDestination.java | 8 +-
.../nifi-site-to-site-client/pom.xml | 80 +++----
.../nifi/remote/client/SiteToSiteClient.java | 156 ++++++++------
.../remote/client/SiteToSiteClientConfig.java | 3 +-
.../client/socket/EndpointConnectionPool.java | 83 +++++--
.../nifi/remote/client/socket/SocketClient.java | 10 +-
.../nifi/remote/util/NiFiRestApiUtil.java | 98 +++++++++
.../nifi/remote/util/RemoteNiFiUtils.java | 216 -------------------
.../client/socket/TestSiteToSiteClient.java | 17 +-
.../org/apache/nifi/remote/RemoteNiFiUtils.java | 216 +++++++++++++++++++
.../nifi/remote/StandardRemoteProcessGroup.java | 1 -
11 files changed, 533 insertions(+), 355 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
index f718581..508ab37 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
@@ -29,7 +29,13 @@ public interface RemoteDestination {
* @return
*/
String getIdentifier();
-
+
+ /**
+ * Returns the human-readable name of the remote destination
+ * @return
+ */
+ String getName();
+
/**
* Returns the amount of time that system should pause sending to a particular node if unable to
* send data to or receive data from this endpoint
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/pom.xml b/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
index 3fc00a2..0d21a3d 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
+++ b/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
@@ -1,43 +1,45 @@
<?xml version="1.0"?>
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-commons</artifactId>
- <version>0.0.2-incubating-SNAPSHOT</version>
- </parent>
-
- <artifactId>nifi-site-to-site-client</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-client</artifactId>
- </dependency>
- <dependency>
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-client-dto</artifactId>
+ <artifactId>nifi-commons</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-web-utils</artifactId>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
+ </parent>
+
+ <artifactId>nifi-site-to-site-client</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ </dependency>
+ <!-- <dependency> <groupId>com.sun.jersey</groupId> <artifactId>jersey-client</artifactId>
+ </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-web-utils</artifactId>
+ </dependency> -->
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>1.9.13</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-client-dto</artifactId>
+ <version>0.0.2-incubating-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 0591b5a..5f84382 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -19,6 +19,7 @@ package org.apache.nifi.remote.client;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
@@ -122,8 +123,10 @@ public interface SiteToSiteClient extends Closeable {
* and a new client created.
* </p>
*/
- public static class Builder {
- private String url;
+ public static class Builder implements Serializable {
+ private static final long serialVersionUID = -4954962284343090219L;
+
+ private String url;
private long timeoutNanos = TimeUnit.SECONDS.toNanos(30);
private long penalizationNanos = TimeUnit.SECONDS.toNanos(3);
private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L);
@@ -309,10 +312,89 @@ public interface SiteToSiteClient extends Closeable {
return this;
}
+ /**
+ * Returns a {@link SiteToSiteClientConfig} for the configured values but does not create a SiteToSiteClient
+ * @return
+ */
+ public SiteToSiteClientConfig buildConfig() {
+ final SiteToSiteClientConfig config = new SiteToSiteClientConfig() {
+ private static final long serialVersionUID = 1323119754841633818L;
+
+ @Override
+ public boolean isUseCompression() {
+ return Builder.this.isUseCompression();
+ }
+
+ @Override
+ public String getUrl() {
+ return Builder.this.getUrl();
+ }
+
+ @Override
+ public long getTimeout(final TimeUnit timeUnit) {
+ return Builder.this.getTimeout(timeUnit);
+ }
+
+ @Override
+ public long getIdleConnectionExpiration(final TimeUnit timeUnit) {
+ return Builder.this.getIdleConnectionExpiration(timeUnit);
+ }
+
+ @Override
+ public SSLContext getSslContext() {
+ return Builder.this.getSslContext();
+ }
+
+ @Override
+ public String getPortName() {
+ return Builder.this.getPortName();
+ }
+
+ @Override
+ public String getPortIdentifier() {
+ return Builder.this.getPortIdentifier();
+ }
+
+ @Override
+ public long getPenalizationPeriod(final TimeUnit timeUnit) {
+ return Builder.this.getPenalizationPeriod(timeUnit);
+ }
+
+ @Override
+ public File getPeerPersistenceFile() {
+ return Builder.this.getPeerPersistenceFile();
+ }
+
+ @Override
+ public EventReporter getEventReporter() {
+ return Builder.this.getEventReporter();
+ }
+
+ @Override
+ public long getPreferredBatchDuration(final TimeUnit timeUnit) {
+ return timeUnit.convert(Builder.this.batchNanos, TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public long getPreferredBatchSize() {
+ return Builder.this.batchSize;
+ }
+
+ @Override
+ public int getPreferredBatchCount() {
+ return Builder.this.batchCount;
+ }
+ };
+
+ return config;
+ }
/**
* Builds a new SiteToSiteClient that can be used to send and receive data with remote instances of NiFi
* @return
+ *
+ * @throws IllegalStateException if either the url is not set or neither the port name nor port identifier
+ * is set.
*/
public SiteToSiteClient build() {
if ( url == null ) {
@@ -323,75 +405,7 @@ public interface SiteToSiteClient extends Closeable {
throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client");
}
- final SiteToSiteClientConfig config = new SiteToSiteClientConfig() {
-
- @Override
- public boolean isUseCompression() {
- return Builder.this.isUseCompression();
- }
-
- @Override
- public String getUrl() {
- return Builder.this.getUrl();
- }
-
- @Override
- public long getTimeout(final TimeUnit timeUnit) {
- return Builder.this.getTimeout(timeUnit);
- }
-
- @Override
- public long getIdleConnectionExpiration(final TimeUnit timeUnit) {
- return Builder.this.getIdleConnectionExpiration(timeUnit);
- }
-
- @Override
- public SSLContext getSslContext() {
- return Builder.this.getSslContext();
- }
-
- @Override
- public String getPortName() {
- return Builder.this.getPortName();
- }
-
- @Override
- public String getPortIdentifier() {
- return Builder.this.getPortIdentifier();
- }
-
- @Override
- public long getPenalizationPeriod(final TimeUnit timeUnit) {
- return Builder.this.getPenalizationPeriod(timeUnit);
- }
-
- @Override
- public File getPeerPersistenceFile() {
- return Builder.this.getPeerPersistenceFile();
- }
-
- @Override
- public EventReporter getEventReporter() {
- return Builder.this.getEventReporter();
- }
-
- @Override
- public long getPreferredBatchDuration(final TimeUnit timeUnit) {
- return timeUnit.convert(Builder.this.batchNanos, TimeUnit.NANOSECONDS);
- }
-
- @Override
- public long getPreferredBatchSize() {
- return Builder.this.batchSize;
- }
-
- @Override
- public int getPreferredBatchCount() {
- return Builder.this.batchCount;
- }
- };
-
- return new SocketClient(config);
+ return new SocketClient(buildConfig());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
index d03ab3c..5e7fbe8 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -17,6 +17,7 @@
package org.apache.nifi.remote.client;
import java.io.File;
+import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
@@ -24,7 +25,7 @@ import javax.net.ssl.SSLContext;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.protocol.DataPacket;
-public interface SiteToSiteClientConfig {
+public interface SiteToSiteClientConfig extends Serializable {
/**
* Returns the configured URL for the remote NiFi instance
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index c0e4761..f9a8a38 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -79,8 +79,8 @@ import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
+import org.apache.nifi.remote.util.NiFiRestApiUtil;
import org.apache.nifi.remote.util.PeerStatusCache;
-import org.apache.nifi.remote.util.RemoteNiFiUtils;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.web.api.dto.ControllerDTO;
@@ -201,6 +201,17 @@ public class EndpointConnectionPool {
}, 5, 5, TimeUnit.SECONDS);
}
+ private String getPortIdentifier(final TransferDirection transferDirection) throws IOException {
+ if ( remoteDestination.getIdentifier() != null ) {
+ return remoteDestination.getIdentifier();
+ }
+
+ if ( transferDirection == TransferDirection.RECEIVE ) {
+ return getOutputPortIdentifier(remoteDestination.getName());
+ } else {
+ return getInputPortIdentifier(remoteDestination.getName());
+ }
+ }
public EndpointConnection getEndpointConnection(final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
return getEndpointConnection(direction, null);
@@ -222,14 +233,15 @@ public class EndpointConnectionPool {
do {
connection = connectionQueue.poll();
logger.debug("{} Connection State for {} = {}", this, clusterUrl, connection);
+ final String portId = getPortIdentifier(direction);
if ( connection == null && !addBack.isEmpty() ) {
// all available connections have been penalized.
- logger.debug("{} all Connections for {} are penalized; returning no Connection", this, remoteDestination.getIdentifier());
+ logger.debug("{} all Connections for {} are penalized; returning no Connection", this, portId);
return null;
}
- if ( connection != null && connection.getPeer().isPenalized(remoteDestination.getIdentifier()) ) {
+ if ( connection != null && connection.getPeer().isPenalized(portId) ) {
// we have a connection, but it's penalized. We want to add it back to the queue
// when we've found one to use.
addBack.add(connection);
@@ -238,9 +250,9 @@ public class EndpointConnectionPool {
// if we can't get an existing Connection, create one
if ( connection == null ) {
- logger.debug("{} No Connection available for Port {}; creating new Connection", this, remoteDestination.getIdentifier());
+ logger.debug("{} No Connection available for Port {}; creating new Connection", this, portId);
protocol = new SocketClientProtocol();
- protocol.setDestination(remoteDestination);
+ protocol.setDestination(new IdEnrichedRemoteDestination(remoteDestination, portId));
logger.debug("{} getting next peer status", this);
final PeerStatus peerStatus = getNextPeerStatus(direction);
@@ -249,11 +261,12 @@ public class EndpointConnectionPool {
return null;
}
+ final long penalizationMillis = remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS);
try {
logger.debug("{} Establishing site-to-site connection with {}", this, peerStatus);
commsSession = establishSiteToSiteConnection(peerStatus);
} catch (final IOException ioe) {
- penalize(peerStatus, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ penalize(peerStatus, penalizationMillis);
throw ioe;
}
@@ -289,17 +302,17 @@ public class EndpointConnectionPool {
// handle error cases
if ( protocol.isDestinationFull() ) {
logger.warn("{} {} indicates that port's destination is full; penalizing peer", this, peer);
- penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ penalize(peer, penalizationMillis);
connectionQueue.offer(connection);
continue;
} else if ( protocol.isPortInvalid() ) {
- penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ penalize(peer, penalizationMillis);
cleanup(protocol, peer);
- throw new PortNotRunningException(peer.toString() + " indicates that port " + remoteDestination.getIdentifier() + " is not running");
+ throw new PortNotRunningException(peer.toString() + " indicates that port " + portId + " is not running");
} else if ( protocol.isPortUnknown() ) {
- penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ penalize(peer, penalizationMillis);
cleanup(protocol, peer);
- throw new UnknownPortException(peer.toString() + " indicates that port " + remoteDestination.getIdentifier() + " is not known");
+ throw new UnknownPortException(peer.toString() + " indicates that port " + portId + " is not known");
}
// negotiate the FlowFileCodec to use
@@ -309,7 +322,7 @@ public class EndpointConnectionPool {
} catch (final PortNotRunningException | UnknownPortException e) {
throw e;
} catch (final Exception e) {
- penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ penalize(peer, penalizationMillis);
cleanup(protocol, peer);
final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? clusterUrl : peer, e.toString());
@@ -539,7 +552,16 @@ public class EndpointConnectionPool {
clientProtocol.setTimeout(commsTimeout);
if (clientProtocol.getVersionNegotiator().getVersion() < 5) {
- clientProtocol.handshake(peer, remoteDestination.getIdentifier());
+ String portId = getPortIdentifier(TransferDirection.RECEIVE);
+ if ( portId == null ) {
+ portId = getPortIdentifier(TransferDirection.SEND);
+ }
+
+ if ( portId == null ) {
+ peer.close();
+ throw new IOException("Failed to determine the identifier of port " + remoteDestination.getName());
+ }
+ clientProtocol.handshake(peer, portId);
} else {
clientProtocol.handshake(peer, null);
}
@@ -818,8 +840,8 @@ public class EndpointConnectionPool {
private ControllerDTO refreshRemoteInfo() throws IOException {
final boolean webInterfaceSecure = clusterUrl.toString().startsWith("https");
- final RemoteNiFiUtils utils = new RemoteNiFiUtils(webInterfaceSecure ? sslContext : null);
- final ControllerDTO controller = utils.getController(URI.create(apiUri + "/controller"), commsTimeout);
+ final NiFiRestApiUtil utils = new NiFiRestApiUtil(webInterfaceSecure ? sslContext : null);
+ final ControllerDTO controller = utils.getController(apiUri + "/controller", commsTimeout);
remoteInfoWriteLock.lock();
try {
@@ -898,4 +920,35 @@ public class EndpointConnectionPool {
return isSecure;
}
+
+
+ private class IdEnrichedRemoteDestination implements RemoteDestination {
+ private final RemoteDestination original;
+ private final String identifier;
+
+ public IdEnrichedRemoteDestination(final RemoteDestination original, final String identifier) {
+ this.original = original;
+ this.identifier = identifier;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public String getName() {
+ return original.getName();
+ }
+
+ @Override
+ public long getYieldPeriod(final TimeUnit timeUnit) {
+ return original.getYieldPeriod(timeUnit);
+ }
+
+ @Override
+ public boolean isUseCompression() {
+ return original.isUseCompression();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index 016e67f..c11c2ab 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -43,7 +43,8 @@ public class SocketClient implements SiteToSiteClient {
private volatile boolean closed = false;
public SocketClient(final SiteToSiteClientConfig config) {
- pool = new EndpointConnectionPool(config.getUrl(), createRemoteDestination(config.getPortIdentifier()),
+ pool = new EndpointConnectionPool(config.getUrl(),
+ createRemoteDestination(config.getPortIdentifier(), config.getPortName()),
(int) config.getTimeout(TimeUnit.MILLISECONDS),
(int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile());
@@ -88,12 +89,17 @@ public class SocketClient implements SiteToSiteClient {
}
- private RemoteDestination createRemoteDestination(final String portId) {
+ private RemoteDestination createRemoteDestination(final String portId, final String portName) {
return new RemoteDestination() {
@Override
public String getIdentifier() {
return portId;
}
+
+ @Override
+ public String getName() {
+ return portName;
+ }
@Override
public long getYieldPeriod(final TimeUnit timeUnit) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java
new file mode 100644
index 0000000..10352ec
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.web.api.dto.ControllerDTO;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class NiFiRestApiUtil {
+ public static final int RESPONSE_CODE_OK = 200;
+
+ private final SSLContext sslContext;
+
+ public NiFiRestApiUtil(final SSLContext sslContext) {
+ this.sslContext = sslContext;
+ }
+
+ private HttpURLConnection getConnection(final String connUrl, final int timeoutMillis) throws IOException {
+ final URL url = new URL(connUrl);
+ final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ connection.setConnectTimeout(timeoutMillis);
+ connection.setReadTimeout(timeoutMillis);
+
+ // special handling for https
+ if (sslContext != null && connection instanceof HttpsURLConnection) {
+ HttpsURLConnection secureConnection = (HttpsURLConnection) connection;
+ secureConnection.setSSLSocketFactory(sslContext.getSocketFactory());
+
+ // check the trusted hostname property and override the HostnameVerifier
+ secureConnection.setHostnameVerifier(new OverrideHostnameVerifier(url.getHost(),
+ secureConnection.getHostnameVerifier()));
+ }
+
+ return connection;
+ }
+
+ public ControllerDTO getController(final String url, final int timeoutMillis) throws IOException {
+ final HttpURLConnection connection = getConnection(url, timeoutMillis);
+ connection.setRequestMethod("GET");
+ final int responseCode = connection.getResponseCode();
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ StreamUtils.copy(connection.getInputStream(), baos);
+ final String responseMessage = baos.toString();
+
+ if ( responseCode == RESPONSE_CODE_OK ) {
+ final ObjectMapper mapper = new ObjectMapper();
+ final JsonNode jsonNode = mapper.readTree(responseMessage);
+ final JsonNode controllerNode = jsonNode.get("controller");
+ return mapper.readValue(controllerNode, ControllerDTO.class);
+ } else {
+ throw new IOException("Got HTTP response Code " + responseCode + ": " + connection.getResponseMessage() + " with explanation: " + responseMessage);
+ }
+ }
+
+ private static class OverrideHostnameVerifier implements HostnameVerifier {
+ private final String trustedHostname;
+ private final HostnameVerifier delegate;
+
+ private OverrideHostnameVerifier(String trustedHostname, HostnameVerifier delegate) {
+ this.trustedHostname = trustedHostname;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public boolean verify(String hostname, SSLSession session) {
+ if (trustedHostname.equalsIgnoreCase(hostname)) {
+ return true;
+ }
+ return delegate.verify(hostname, session);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
deleted file mode 100644
index b2dbdcd..0000000
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.util;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Map;
-
-import javax.net.ssl.SSLContext;
-import javax.ws.rs.core.MediaType;
-
-import org.apache.nifi.web.api.dto.ControllerDTO;
-import org.apache.nifi.web.api.entity.ControllerEntity;
-import org.apache.nifi.web.util.WebUtils;
-
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientHandlerException;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.ClientResponse.Status;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.UniformInterfaceException;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.core.util.MultivaluedMapImpl;
-
-/**
- *
- */
-public class RemoteNiFiUtils {
-
- public static final String CONTROLLER_URI_PATH = "/controller";
-
- private static final int CONNECT_TIMEOUT = 10000;
- private static final int READ_TIMEOUT = 10000;
-
- private final Client client;
-
- public RemoteNiFiUtils(final SSLContext sslContext) {
- this.client = getClient(sslContext);
- }
-
-
- /**
- * Gets the content at the specified URI.
- *
- * @param uri
- * @param timeoutMillis
- * @return
- * @throws ClientHandlerException
- * @throws UniformInterfaceException
- */
- public ClientResponse get(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
- return get(uri, timeoutMillis, null);
- }
-
- /**
- * Gets the content at the specified URI using the given query parameters.
- *
- * @param uri
- * @param timeoutMillis
- * @param queryParams
- * @return
- * @throws ClientHandlerException
- * @throws UniformInterfaceException
- */
- public ClientResponse get(final URI uri, final int timeoutMillis, final Map<String, String> queryParams) throws ClientHandlerException, UniformInterfaceException {
- // perform the request
- WebResource webResource = client.resource(uri);
- if ( queryParams != null ) {
- for ( final Map.Entry<String, String> queryEntry : queryParams.entrySet() ) {
- webResource = webResource.queryParam(queryEntry.getKey(), queryEntry.getValue());
- }
- }
-
- webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
- webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
-
- return webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
- }
-
- /**
- * Performs a HEAD request to the specified URI.
- *
- * @param uri
- * @param timeoutMillis
- * @return
- * @throws ClientHandlerException
- * @throws UniformInterfaceException
- */
- public ClientResponse head(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
- // perform the request
- WebResource webResource = client.resource(uri);
- webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
- webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
- return webResource.head();
- }
-
- /**
- * Gets a client based on the specified URI.
- *
- * @param uri
- * @return
- */
- private Client getClient(final SSLContext sslContext) {
- final Client client;
- if (sslContext == null) {
- client = WebUtils.createClient(null);
- } else {
- client = WebUtils.createClient(null, sslContext);
- }
-
- client.setReadTimeout(READ_TIMEOUT);
- client.setConnectTimeout(CONNECT_TIMEOUT);
-
- return client;
- }
-
-
- /**
- * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance
- * is not configured to use Site-to-Site transfers.
- *
- * @param uri the base URI of the remote instance. This should include the path only to the nifi-api level, as well as the protocol, host, and port.
- * @param timeoutMillis
- * @return
- * @throws IOException
- */
- public Integer getRemoteListeningPort(final String uri, final int timeoutMillis) throws IOException {
- try {
- final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
- return getRemoteListeningPort(uriObject, timeoutMillis);
- } catch (URISyntaxException e) {
- throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
- }
- }
-
- public String getRemoteRootGroupId(final String uri, final int timeoutMillis) throws IOException {
- try {
- final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
- return getRemoteRootGroupId(uriObject, timeoutMillis);
- } catch (URISyntaxException e) {
- throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
- }
- }
-
- public String getRemoteInstanceId(final String uri, final int timeoutMillis) throws IOException {
- try {
- final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
- return getController(uriObject, timeoutMillis).getInstanceId();
- } catch (URISyntaxException e) {
- throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
- }
- }
-
- /**
- * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance
- * is not configured to use Site-to-Site transfers.
- *
- * @param uri the full URI to fetch, including the path.
- * @return
- * @throws IOException
- */
- private Integer getRemoteListeningPort(final URI uri, final int timeoutMillis) throws IOException {
- return getController(uri, timeoutMillis).getRemoteSiteListeningPort();
- }
-
- private String getRemoteRootGroupId(final URI uri, final int timeoutMillis) throws IOException {
- return getController(uri, timeoutMillis).getId();
- }
-
- public ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException {
- final ClientResponse response = get(uri, timeoutMillis);
-
- if (Status.OK.getStatusCode() == response.getStatusInfo().getStatusCode()) {
- final ControllerEntity entity = response.getEntity(ControllerEntity.class);
- return entity.getController();
- } else {
- final String responseMessage = response.getEntity(String.class);
- throw new IOException("Got HTTP response Code " + response.getStatusInfo().getStatusCode() + ": " + response.getStatusInfo().getReasonPhrase() + " with explanation: " + responseMessage);
- }
- }
-
- /**
- * Issues a registration request on behalf of the current user.
- *
- * @param baseApiUri
- * @return
- */
- public ClientResponse issueRegistrationRequest(String baseApiUri) {
- final URI uri = URI.create(String.format("%s/%s", baseApiUri, "/controller/users"));
-
- // set up the query params
- MultivaluedMapImpl entity = new MultivaluedMapImpl();
- entity.add("justification", "A Remote instance of NiFi has attempted to create a reference to this NiFi. This action must be approved first.");
-
- // create the web resource
- WebResource webResource = client.resource(uri);
-
- // get the client utils and make the request
- return webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED).entity(entity).post(ClientResponse.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
index 8781421..bb16a34 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
@@ -36,7 +36,7 @@ import org.junit.Test;
public class TestSiteToSiteClient {
@Test
- @Ignore("For local testing only; not really a unit test but a manual test")
+ //@Ignore("For local testing only; not really a unit test but a manual test")
public void testReceive() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
@@ -58,7 +58,6 @@ public class TestSiteToSiteClient {
final byte[] buff = new byte[(int) size];
StreamUtils.fillBuffer(in, buff);
- System.out.println(buff.length);
Assert.assertNull(transaction.receive());
@@ -71,7 +70,7 @@ public class TestSiteToSiteClient {
@Test
- @Ignore("For local testing only; not really a unit test but a manual test")
+ //@Ignore("For local testing only; not really a unit test but a manual test")
public void testSend() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
@@ -84,12 +83,12 @@ public class TestSiteToSiteClient {
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
Assert.assertNotNull(transaction);
- final Map<String, String> attrs = new HashMap<>();
- attrs.put("site-to-site", "yes, please!");
- final byte[] bytes = "Hello".getBytes();
- final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
- final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length);
- transaction.send(packet);
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put("site-to-site", "yes, please!");
+ final byte[] bytes = "Hello".getBytes();
+ final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length);
+ transaction.send(packet);
transaction.confirm();
transaction.complete();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/RemoteNiFiUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/RemoteNiFiUtils.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/RemoteNiFiUtils.java
new file mode 100644
index 0000000..23dfdda
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/RemoteNiFiUtils.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.nifi.web.api.dto.ControllerDTO;
+import org.apache.nifi.web.api.entity.ControllerEntity;
+import org.apache.nifi.web.util.WebUtils;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
+/**
+ *
+ */
+public class RemoteNiFiUtils {
+
+ public static final String CONTROLLER_URI_PATH = "/controller";
+
+ private static final int CONNECT_TIMEOUT = 10000;
+ private static final int READ_TIMEOUT = 10000;
+
+ private final Client client;
+
+ public RemoteNiFiUtils(final SSLContext sslContext) {
+ this.client = getClient(sslContext);
+ }
+
+
+ /**
+ * Gets the content at the specified URI.
+ *
+ * @param uri
+ * @param timeoutMillis
+ * @return
+ * @throws ClientHandlerException
+ * @throws UniformInterfaceException
+ */
+ public ClientResponse get(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
+ return get(uri, timeoutMillis, null);
+ }
+
+ /**
+ * Gets the content at the specified URI using the given query parameters.
+ *
+ * @param uri
+ * @param timeoutMillis
+ * @param queryParams
+ * @return
+ * @throws ClientHandlerException
+ * @throws UniformInterfaceException
+ */
+ public ClientResponse get(final URI uri, final int timeoutMillis, final Map<String, String> queryParams) throws ClientHandlerException, UniformInterfaceException {
+ // perform the request
+ WebResource webResource = client.resource(uri);
+ if ( queryParams != null ) {
+ for ( final Map.Entry<String, String> queryEntry : queryParams.entrySet() ) {
+ webResource = webResource.queryParam(queryEntry.getKey(), queryEntry.getValue());
+ }
+ }
+
+ webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
+ webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
+
+ return webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ }
+
+ /**
+ * Performs a HEAD request to the specified URI.
+ *
+ * @param uri
+ * @param timeoutMillis
+ * @return
+ * @throws ClientHandlerException
+ * @throws UniformInterfaceException
+ */
+ public ClientResponse head(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
+ // perform the request
+ WebResource webResource = client.resource(uri);
+ webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
+ webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
+ return webResource.head();
+ }
+
+ /**
+ * Gets a client based on the specified URI.
+ *
+ * @param uri
+ * @return
+ */
+ private Client getClient(final SSLContext sslContext) {
+ final Client client;
+ if (sslContext == null) {
+ client = WebUtils.createClient(null);
+ } else {
+ client = WebUtils.createClient(null, sslContext);
+ }
+
+ client.setReadTimeout(READ_TIMEOUT);
+ client.setConnectTimeout(CONNECT_TIMEOUT);
+
+ return client;
+ }
+
+
+ /**
+ * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance
+ * is not configured to use Site-to-Site transfers.
+ *
+ * @param uri the base URI of the remote instance. This should include the path only to the nifi-api level, as well as the protocol, host, and port.
+ * @param timeoutMillis
+ * @return
+ * @throws IOException
+ */
+ public Integer getRemoteListeningPort(final String uri, final int timeoutMillis) throws IOException {
+ try {
+ final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+ return getRemoteListeningPort(uriObject, timeoutMillis);
+ } catch (URISyntaxException e) {
+ throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+ }
+ }
+
+ public String getRemoteRootGroupId(final String uri, final int timeoutMillis) throws IOException {
+ try {
+ final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+ return getRemoteRootGroupId(uriObject, timeoutMillis);
+ } catch (URISyntaxException e) {
+ throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+ }
+ }
+
+ public String getRemoteInstanceId(final String uri, final int timeoutMillis) throws IOException {
+ try {
+ final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+ return getController(uriObject, timeoutMillis).getInstanceId();
+ } catch (URISyntaxException e) {
+ throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+ }
+ }
+
+ /**
+ * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance
+ * is not configured to use Site-to-Site transfers.
+ *
+ * @param uri the full URI to fetch, including the path.
+ * @return
+ * @throws IOException
+ */
+ private Integer getRemoteListeningPort(final URI uri, final int timeoutMillis) throws IOException {
+ return getController(uri, timeoutMillis).getRemoteSiteListeningPort();
+ }
+
+ private String getRemoteRootGroupId(final URI uri, final int timeoutMillis) throws IOException {
+ return getController(uri, timeoutMillis).getId();
+ }
+
+ public ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException {
+ final ClientResponse response = get(uri, timeoutMillis);
+
+ if (Status.OK.getStatusCode() == response.getStatusInfo().getStatusCode()) {
+ final ControllerEntity entity = response.getEntity(ControllerEntity.class);
+ return entity.getController();
+ } else {
+ final String responseMessage = response.getEntity(String.class);
+ throw new IOException("Got HTTP response Code " + response.getStatusInfo().getStatusCode() + ": " + response.getStatusInfo().getReasonPhrase() + " with explanation: " + responseMessage);
+ }
+ }
+
+ /**
+ * Issues a registration request on behalf of the current user.
+ *
+ * @param baseApiUri
+ * @return
+ */
+ public ClientResponse issueRegistrationRequest(String baseApiUri) {
+ final URI uri = URI.create(String.format("%s/%s", baseApiUri, "/controller/users"));
+
+ // set up the query params
+ MultivaluedMapImpl entity = new MultivaluedMapImpl();
+ entity.add("justification", "A Remote instance of NiFi has attempted to create a reference to this NiFi. This action must be approved first.");
+
+ // create the web resource
+ WebResource webResource = client.resource(uri);
+
+ // get the client utils and make the request
+ return webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED).entity(entity).post(ClientResponse.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 79ef7a8..6b70fe6 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -56,7 +56,6 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
-import org.apache.nifi.remote.util.RemoteNiFiUtils;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils;