You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/12/02 19:02:29 UTC
[1/2] nifi git commit: NIFI-3026: Support multiple remote target URLs
Repository: nifi
Updated Branches:
refs/heads/master d8d29811f -> 7c5bd876b
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 091f00e..1fd4d32 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -24,8 +24,6 @@ import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.UniformInterfaceException;
import java.io.File;
import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@@ -91,10 +89,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private final String id;
- private final URI targetUri;
- private final URI apiUri;
- private final String host;
- private final String protocol;
+ private final String targetUris;
private final ProcessScheduler scheduler;
private final EventReporter eventReporter;
private final NiFiProperties nifiProperties;
@@ -136,30 +131,18 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private final ScheduledExecutorService backgroundThreadExecutor;
- public StandardRemoteProcessGroup(final String id, final String targetUri, final ProcessGroup processGroup,
- final FlowController flowController, final SSLContext sslContext, final NiFiProperties nifiProperties) {
+ public StandardRemoteProcessGroup(final String id, final String targetUris, final ProcessGroup processGroup,
+ final FlowController flowController, final SSLContext sslContext, final NiFiProperties nifiProperties) {
this.nifiProperties = nifiProperties;
this.id = requireNonNull(id);
this.flowController = requireNonNull(flowController);
- final URI uri;
- try {
- uri = new URI(requireNonNull(targetUri.trim()));
-
- final String apiPath = SiteToSiteRestApiClient.resolveBaseUrl(uri);
-
- apiUri = new URI(apiPath);
- } catch (final URISyntaxException e) {
- throw new IllegalArgumentException(e);
- }
- this.host = uri.getHost();
- this.protocol = uri.getAuthority();
- this.targetUri = uri;
+ this.targetUris = targetUris;
this.targetId = null;
this.processGroup = new AtomicReference<>(processGroup);
this.sslContext = sslContext;
this.scheduler = flowController.getProcessScheduler();
- this.authorizationIssue = "Establishing connection to " + targetUri;
+ this.authorizationIssue = "Establishing connection to " + targetUris;
final BulletinRepository bulletinRepository = flowController.getBulletinRepository();
eventReporter = new EventReporter() {
@@ -176,7 +159,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
};
final Runnable checkAuthorizations = new InitializationTask();
- backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id + ": " + targetUri);
+ backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id + ": " + targetUris);
backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 5L, 30L, TimeUnit.SECONDS);
}
@@ -298,14 +281,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
return targetId;
}
- public String getProtocol() {
- return protocol;
- }
-
@Override
public String getName() {
final String name = this.name.get();
- return name == null ? targetUri.toString() : name;
+ return name == null ? getTargetUri() : name;
}
@Override
@@ -361,17 +340,18 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
@Override
- public URI getTargetUri() {
- return targetUri;
+ public String getTargetUri() {
+ return SiteToSiteRestApiClient.getFirstUrl(targetUris);
}
@Override
- public String getAuthorizationIssue() {
- return authorizationIssue;
+ public String getTargetUris() {
+ return targetUris;
}
- public String getHost() {
- return host;
+ @Override
+ public String getAuthorizationIssue() {
+ return authorizationIssue;
}
public int getInputPortCount() {
@@ -739,7 +719,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
@Override
public String toString() {
- return "RemoteProcessGroup[" + targetUri + "]";
+ return "RemoteProcessGroup[" + targetUris + "]";
}
@Override
@@ -786,7 +766,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
// perform the request
final ControllerDTO dto;
try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()) {
- dto = apiClient.getController();
+ dto = apiClient.getController(targetUris);
} catch (IOException e) {
writeLock.lock();
try {
@@ -807,7 +787,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
writeLock.unlock();
}
- throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + getApiUri() + " due to: " + e.getMessage());
+ throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + targetUris + " due to: " + e.getMessage());
}
writeLock.lock();
@@ -878,16 +858,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private SiteToSiteRestApiClient getSiteToSiteRestApiClient() {
SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, new HttpProxy(proxyHost, proxyPort, proxyUser, proxyPassword), getEventReporter());
- apiClient.setBaseUrl(getApiUri());
apiClient.setConnectTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS));
apiClient.setReadTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS));
return apiClient;
}
- protected String getApiUri() {
- return apiUri.toString();
- }
-
/**
* Converts a set of ports into a set of remote process group ports.
*
@@ -1092,10 +1067,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
}
- private boolean isWebApiSecure() {
- return targetUri.toString().toLowerCase().startsWith("https");
- }
-
@Override
public boolean isSiteToSiteEnabled() {
readLock.lock();
@@ -1117,7 +1088,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
public void run() {
try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()) {
try {
- final ControllerDTO dto = apiClient.getController();
+ final ControllerDTO dto = apiClient.getController(targetUris);
if (dto.getRemoteSiteListeningPort() == null && SiteToSiteTransportProtocol.RAW.equals(transportProtocol)) {
authorizationIssue = "Remote instance is not configured to allow RAW Site-to-Site communications at this time.";
@@ -1140,8 +1111,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
if (e.getResponseCode() == UNAUTHORIZED_STATUS_CODE) {
try {
// attempt to issue a registration request in case the target instance is a 0.x
- final RemoteNiFiUtils utils = new RemoteNiFiUtils(isWebApiSecure() ? sslContext : null);
- final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiUri.toString());
+ final boolean isApiSecure = apiClient.getBaseUrl().toLowerCase().startsWith("https");
+ final RemoteNiFiUtils utils = new RemoteNiFiUtils(isApiSecure ? sslContext : null);
+ final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiClient.getBaseUrl());
if (Response.Status.Family.SUCCESSFUL.equals(requestAccountResponse.getStatusInfo().getFamily())) {
logger.info("{} Issued a Request to communicate with remote instance", this);
} else {
@@ -1169,7 +1141,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
} catch (final Exception e) {
logger.warn(String.format("Unable to connect to %s due to %s", StandardRemoteProcessGroup.this, e));
getEventReporter().reportEvent(Severity.WARNING, "Site to Site", String.format("Unable to connect to %s due to %s",
- StandardRemoteProcessGroup.this.getTargetUri().toString(), e));
+ StandardRemoteProcessGroup.this.getTargetUris(), e));
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/remote/TestStandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/remote/TestStandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/remote/TestStandardRemoteProcessGroup.java
deleted file mode 100644
index 69d38e9..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/remote/TestStandardRemoteProcessGroup.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.remote;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.nifi.controller.FlowController;
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.util.NiFiProperties;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class TestStandardRemoteProcessGroup {
-
- @Test
- public void testApiUri() {
- final NiFiProperties properties = Mockito.mock(NiFiProperties.class);
- final FlowController controller = Mockito.mock(FlowController.class);
- final ProcessGroup group = Mockito.mock(ProcessGroup.class);
-
- final String expectedUri = "http://localhost:8080/nifi-api";
- StandardRemoteProcessGroup rpg = new StandardRemoteProcessGroup("id", "http://localhost:8080/nifi", group, controller, null, properties);
- assertEquals(expectedUri, rpg.getApiUri());
-
- rpg = new StandardRemoteProcessGroup("id", "http://localhost:8080/nifi/", group, controller, null, properties);
- assertEquals(expectedUri, rpg.getApiUri());
-
- rpg = new StandardRemoteProcessGroup("id", "http://localhost:8080/nifi/ ", group, controller, null, properties);
- assertEquals(expectedUri, rpg.getApiUri());
-
- rpg = new StandardRemoteProcessGroup("id", " http://localhost:8080/nifi/ ", group, controller, null, properties);
- assertEquals(expectedUri, rpg.getApiUri());
-
- rpg = new StandardRemoteProcessGroup("id", "http://localhost:8080/", group, controller, null, properties);
- assertEquals(expectedUri, rpg.getApiUri());
-
- rpg = new StandardRemoteProcessGroup("id", "http://localhost:8080", group, controller, null, properties);
- assertEquals(expectedUri, rpg.getApiUri());
-
- rpg = new StandardRemoteProcessGroup("id", "http://localhost:8080 ", group, controller, null, properties);
- assertEquals(expectedUri, rpg.getApiUri());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 48d60d6..3a23601 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -49,6 +49,7 @@ import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.http.HttpProxy;
+import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
@@ -143,7 +144,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
final long penalizationMillis = FormatUtils.getTimeDuration(remoteGroup.getYieldDuration(), TimeUnit.MILLISECONDS);
final SiteToSiteClient client = new SiteToSiteClient.Builder()
- .url(remoteGroup.getTargetUri().toString())
+ .urls(SiteToSiteRestApiClient.parseClusterUrls(remoteGroup.getTargetUris()))
.portIdentifier(getIdentifier())
.sslContext(sslContext)
.useCompression(isUseCompression())
@@ -169,7 +170,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
return;
}
- final String url = getRemoteProcessGroup().getTargetUri().toString();
+ final String url = getRemoteProcessGroup().getTargetUri();
// If we are sending data, we need to ensure that we have at least 1 FlowFile to send. Otherwise,
// we don't want to create a transaction at all.
@@ -433,7 +434,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
@Override
public String toString() {
- return "RemoteGroupPort[name=" + getName() + ",target=" + remoteGroup.getTargetUri().toString() + "]";
+ return "RemoteGroupPort[name=" + getName() + ",targets=" + remoteGroup.getTargetUris() + "]";
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
index b44f118..23d3fda 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
@@ -39,7 +39,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.InputStream;
-import java.net.URI;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;
@@ -108,7 +107,7 @@ public class TestStandardRemoteGroupPort {
doReturn(true).when(remoteGroup).isTransmitting();
doReturn(protocol).when(remoteGroup).getTransportProtocol();
- doReturn(new URI(REMOTE_CLUSTER_URL)).when(remoteGroup).getTargetUri();
+ doReturn(REMOTE_CLUSTER_URL).when(remoteGroup).getTargetUri();
doReturn(siteToSiteClient).when(port).getSiteToSiteClient();
doReturn(transaction).when(siteToSiteClient).createTransaction(eq(direction));
doReturn(eventReporter).when(remoteGroup).getEventReporter();
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java
index d9a5df6..e119437 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java
@@ -236,7 +236,7 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
// create the remote process group details
FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = new FlowChangeRemoteProcessGroupDetails();
- remoteProcessGroupDetails.setUri(remoteProcessGroup.getTargetUri().toString());
+ remoteProcessGroupDetails.setUri(remoteProcessGroup.getTargetUri());
// save the actions if necessary
if (!details.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 8b9366f..5809a06 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -36,6 +36,7 @@ import org.apache.nifi.authorization.TemplateAuthorizable;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
+import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.Revision;
@@ -1356,31 +1357,12 @@ public class ProcessGroupResource extends ApplicationResource {
// set the processor id as appropriate
remoteProcessGroupDTO.setId(generateUuid());
- // parse the uri
- final URI uri;
- try {
- uri = URI.create(remoteProcessGroupDTO.getTargetUri());
- } catch (final IllegalArgumentException e) {
- throw new IllegalArgumentException("The specified remote process group URL is malformed: " + remoteProcessGroupDTO.getTargetUri());
- }
-
- // validate each part of the uri
- if (uri.getScheme() == null || uri.getHost() == null) {
- throw new IllegalArgumentException("The specified remote process group URL is malformed: " + remoteProcessGroupDTO.getTargetUri());
- }
-
- if (!(uri.getScheme().equalsIgnoreCase("http") || uri.getScheme().equalsIgnoreCase("https"))) {
- throw new IllegalArgumentException("The specified remote process group URL is invalid because it is not http or https: " + remoteProcessGroupDTO.getTargetUri());
- }
-
- // normalize the uri to the other controller
- String controllerUri = uri.toString();
- if (controllerUri.endsWith("/")) {
- controllerUri = StringUtils.substringBeforeLast(controllerUri, "/");
- }
+ // parse the uri to check if the uri is valid
+ final String targetUris = remoteProcessGroupDTO.getTargetUris();
+ SiteToSiteRestApiClient.parseClusterUrls(targetUris);
- // since the uri is valid, use the normalized version
- remoteProcessGroupDTO.setTargetUri(controllerUri);
+ // since the uri is valid, use it
+ remoteProcessGroupDTO.setTargetUris(targetUris);
// create the remote process group
final Revision revision = getRevision(remoteProcessGroupEntity, remoteProcessGroupDTO.getId());
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 3f48128..ec0392d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -1533,7 +1533,7 @@ public final class DtoFactory {
dto.setCommunicationsTimeout(group.getCommunicationsTimeout());
dto.setYieldDuration(group.getYieldDuration());
dto.setParentGroupId(group.getProcessGroup().getIdentifier());
- dto.setTargetUri(group.getTargetUri().toString());
+ dto.setTargetUris(group.getTargetUris());
dto.setFlowRefreshed(group.getLastRefreshTime());
dto.setContents(contents);
dto.setTransportProtocol(group.getTransportProtocol().name());
@@ -2857,7 +2857,7 @@ public final class DtoFactory {
copy.setActiveRemoteOutputPortCount(original.getActiveRemoteOutputPortCount());
copy.setInactiveRemoteOutputPortCount(original.getInactiveRemoteOutputPortCount());
copy.setParentGroupId(original.getParentGroupId());
- copy.setTargetUri(original.getTargetUri());
+ copy.setTargetUris(original.getTargetUris());
copy.setTransportProtocol(original.getTransportProtocol());
copy.setProxyHost(original.getProxyHost());
copy.setProxyPort(original.getProxyPort());
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 30fcbd7..2db2bbe 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -1805,7 +1805,7 @@ public class ControllerFacade implements Authorizable {
addIfAppropriate(searchStr, group.getIdentifier(), "Id", matches);
addIfAppropriate(searchStr, group.getName(), "Name", matches);
addIfAppropriate(searchStr, group.getComments(), "Comments", matches);
- addIfAppropriate(searchStr, group.getTargetUri().toString(), "URL", matches);
+ addIfAppropriate(searchStr, group.getTargetUris(), "URLs", matches);
// consider the transmission status
if ((StringUtils.containsIgnoreCase("transmitting", searchStr) || StringUtils.containsIgnoreCase("transmission enabled", searchStr)) && group.isTransmitting()) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
index bf4c96e..d022b15 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
@@ -75,13 +75,13 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
throw new IllegalArgumentException("Cannot specify a different Parent Group ID than the Group to which the Remote Process Group is being added.");
}
- final String rawTargetUri = remoteProcessGroupDTO.getTargetUri();
- if (rawTargetUri == null) {
- throw new IllegalArgumentException("Cannot add a Remote Process Group without specifying the Target URI");
+ final String targetUris = remoteProcessGroupDTO.getTargetUris();
+ if (targetUris == null || targetUris.length() == 0) {
+ throw new IllegalArgumentException("Cannot add a Remote Process Group without specifying the Target URI(s)");
}
// create the remote process group
- RemoteProcessGroup remoteProcessGroup = flowController.createRemoteProcessGroup(remoteProcessGroupDTO.getId(), rawTargetUri);
+ RemoteProcessGroup remoteProcessGroup = flowController.createRemoteProcessGroup(remoteProcessGroupDTO.getId(), targetUris);
// set other properties
updateRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupDTO);
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-remote-process-group-dialog.jsp
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-remote-process-group-dialog.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-remote-process-group-dialog.jsp
index ab6c3ae..4af046e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-remote-process-group-dialog.jsp
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-remote-process-group-dialog.jsp
@@ -18,9 +18,11 @@
<div id="new-remote-process-group-dialog" class="hidden large-dialog">
<div class="dialog-content">
<div class="setting">
- <div class="setting-name">URL</div>
+ <div class="setting-name">URLs
+ <div class="fa fa-question-circle" alt="Info" title="Specify the remote target NiFi URLs. Multiple URLs can be specified in comma-separated format. Different protocols cannot be mixed. If remote NiFi is a cluster, two or more node URLs are recommended for better connection establishment availability."></div>
+ </div>
<div class="setting-field">
- <input id="new-remote-process-group-uri" type="text" placeholder="https://remotehost:8080/nifi"/>
+ <input id="new-remote-process-group-uris" type="text" placeholder="https://remotehost:8080/nifi"/>
</div>
</div>
<div class="setting">
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-configuration.jsp
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-configuration.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-configuration.jsp
index 8ad6a73..c8af6d3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-configuration.jsp
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-configuration.jsp
@@ -30,9 +30,9 @@
</div>
</div>
<div class="setting">
- <div class="setting-name">URL</div>
+ <div class="setting-name">URLs</div>
<div class="setting-field">
- <span id="remote-process-group-url"></span>
+ <span id="remote-process-group-urls"></span>
</div>
</div>
<div class="setting">
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-details.jsp
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-details.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-details.jsp
index e368d46..6f7f992 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-details.jsp
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-details.jsp
@@ -30,9 +30,9 @@
</div>
</div>
<div class="setting">
- <div class="setting-name">URL</div>
+ <div class="setting-name">URLs</div>
<div class="setting-field">
- <span id="read-only-remote-process-group-url"></span>
+ <span id="read-only-remote-process-group-urls"></span>
</div>
</div>
<div class="setting">
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-ports.jsp
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-ports.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-ports.jsp
index 8899f33..672800c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-ports.jsp
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-ports.jsp
@@ -33,9 +33,9 @@
<div class="spacer"> </div>
<div class="settings-right">
<div class="setting">
- <div class="setting-name">URL</div>
+ <div class="setting-name">URLs</div>
<div class="setting-field">
- <span id="remote-process-group-ports-url"></span>
+ <span id="remote-process-group-ports-urls"></span>
</div>
</div>
<div class="remote-port-header">
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/header/components/nf-ng-remote-process-group-component.js
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/header/components/nf-ng-remote-process-group-component.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/header/components/nf-ng-remote-process-group-component.js
index 7bf2633..90e67b6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/header/components/nf-ng-remote-process-group-component.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/header/components/nf-ng-remote-process-group-component.js
@@ -34,7 +34,7 @@ nf.ng.RemoteProcessGroupComponent = function (serviceProvider) {
}
}),
'component': {
- 'targetUri': $('#new-remote-process-group-uri').val(),
+ 'targetUris': $('#new-remote-process-group-uris').val(),
'position': {
'x': pt.x,
'y': pt.y
@@ -125,7 +125,7 @@ nf.ng.RemoteProcessGroupComponent = function (serviceProvider) {
headerText: 'Add Remote Process Group',
handler: {
close: function () {
- $('#new-remote-process-group-uri').val('');
+ $('#new-remote-process-group-uris').val('');
$('#new-remote-process-group-timeout').val(defaultTimeout);
$('#new-remote-process-group-yield-duration').val(defaultYieldDuration);
$('#new-remote-process-group-transport-protocol-combo').combo('setSelectedOption', {
@@ -265,7 +265,7 @@ nf.ng.RemoteProcessGroupComponent = function (serviceProvider) {
this.modal.show();
// set the focus and key handlers
- $('#new-remote-process-group-uri').focus().off('keyup').on('keyup', function (e) {
+ $('#new-remote-process-group-uris').focus().off('keyup').on('keyup', function (e) {
var code = e.keyCode ? e.keyCode : e.which;
if (code === $.ui.keyCode.ENTER) {
addRemoteProcessGroup();
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-configuration.js
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-configuration.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-configuration.js
index 6a33af7..ece8be6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-configuration.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-configuration.js
@@ -106,7 +106,7 @@ nf.RemoteProcessGroupConfiguration = (function () {
// clear the remote process group details
$('#remote-process-group-id').text('');
$('#remote-process-group-name').text('');
- $('#remote-process-group-url').text('');
+ $('#remote-process-group-urls').text('');
$('#remote-process-group-timeout').val('');
$('#remote-process-group-yield-duration').val('');
$('#remote-process-group-transport-protocol-combo').combo('setSelectedOption', {
@@ -144,7 +144,7 @@ nf.RemoteProcessGroupConfiguration = (function () {
// populate the port settings
$('#remote-process-group-id').text(selectionData.id);
$('#remote-process-group-name').text(selectionData.component.name);
- $('#remote-process-group-url').text(selectionData.component.targetUri);
+ $('#remote-process-group-urls').text(selectionData.component.targetUris);
// populate the text fields
$('#remote-process-group-timeout').val(selectionData.component.communicationsTimeout);
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-details.js
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-details.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-details.js
index d757496..ebe116c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-details.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-details.js
@@ -41,7 +41,7 @@ nf.RemoteProcessGroupDetails = (function () {
// clear the remote process group details
nf.Common.clearField('read-only-remote-process-group-id');
nf.Common.clearField('read-only-remote-process-group-name');
- nf.Common.clearField('read-only-remote-process-group-url');
+ nf.Common.clearField('read-only-remote-process-group-urls');
nf.Common.clearField('read-only-remote-process-group-timeout');
nf.Common.clearField('read-only-remote-process-group-yield-duration');
nf.Common.clearField('read-only-remote-process-group-transport-protocol');
@@ -67,7 +67,7 @@ nf.RemoteProcessGroupDetails = (function () {
// populate the port settings
nf.Common.populateField('read-only-remote-process-group-id', selectionData.id);
nf.Common.populateField('read-only-remote-process-group-name', selectionData.component.name);
- nf.Common.populateField('read-only-remote-process-group-url', selectionData.component.targetUri);
+ nf.Common.populateField('read-only-remote-process-group-urls', selectionData.component.targetUris);
nf.Common.populateField('read-only-remote-process-group-timeout', selectionData.component.communicationsTimeout);
nf.Common.populateField('read-only-remote-process-group-yield-duration', selectionData.component.yieldDuration);
nf.Common.populateField('read-only-remote-process-group-transport-protocol', selectionData.component.transportProtocol);
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js
index 634a65f..529cda7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js
@@ -179,7 +179,7 @@ nf.RemoteProcessGroupPorts = (function () {
// clear the remote process group details
$('#remote-process-group-ports-id').text('');
$('#remote-process-group-ports-name').text('');
- $('#remote-process-group-ports-url').text('');
+ $('#remote-process-group-ports-urls').text('');
// clear any tooltips
var dialog = $('#remote-process-group-ports');
@@ -484,7 +484,7 @@ nf.RemoteProcessGroupPorts = (function () {
// populate the port settings
$('#remote-process-group-ports-id').text(remoteProcessGroup.id);
$('#remote-process-group-ports-name').text(remoteProcessGroup.name);
- $('#remote-process-group-ports-url').text(remoteProcessGroup.targetUri);
+ $('#remote-process-group-ports-urls').text(remoteProcessGroup.targetUris);
// get the contents
var remoteProcessGroupContents = remoteProcessGroup.contents;
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group.js
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group.js
index 6e14bf6..1903e44 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group.js
@@ -499,7 +499,7 @@ nf.RemoteProcessGroup = (function () {
remoteProcessGroupUri.text(null).selectAll('title').remove();
// apply ellipsis to the remote process group name as necessary
- nf.CanvasUtils.ellipsis(remoteProcessGroupUri, d.component.targetUri);
+ nf.CanvasUtils.ellipsis(remoteProcessGroupUri, d.component.targetUris);
}).append('title').text(function (d) {
return d.component.name;
});
[2/2] nifi git commit: NIFI-3026: Support multiple remote target URLs
Posted by ma...@apache.org.
NIFI-3026: Support multiple remote target URLs
- Added urls in addition to the existing url, to support multiple target
URLs
- Backward compatibility is provided by returning the first url if
multipe urls are specified, but component accessing the url doesn't
support multiple urls
- UI is not fully updated yet. Following UI components are planned to be updated
by different commits
- Search component: only the first URL is searchable and shown
- Component status: RPG status shows only the first URL
- Component action history: only the first URL is searchable and shown
- Updated Search component to use URLs.
This closes #1208.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7c5bd876
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7c5bd876
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7c5bd876
Branch: refs/heads/master
Commit: 7c5bd876bdd49ec14482a24dd31a073757f06ed4
Parents: d8d2981
Author: Koji Kawamura <ij...@apache.org>
Authored: Wed Nov 30 09:28:16 2016 +0900
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Dec 2 14:01:39 2016 -0500
----------------------------------------------------------------------
.../remote/client/AbstractSiteToSiteClient.java | 13 +-
.../nifi/remote/client/SiteInfoProvider.java | 63 +++++-
.../nifi/remote/client/SiteToSiteClient.java | 61 ++++--
.../remote/client/SiteToSiteClientConfig.java | 12 ++
.../nifi/remote/client/http/HttpClient.java | 18 +-
.../client/socket/EndpointConnectionPool.java | 12 +-
.../nifi/remote/client/socket/SocketClient.java | 2 +-
.../remote/util/SiteToSiteRestApiClient.java | 132 +++++++++++-
.../remote/client/TestSiteInfoProvider.java | 207 +++++++++++++++++++
.../nifi/remote/client/http/TestHttpClient.java | 27 ++-
.../client/socket/TestSiteToSiteClient.java | 17 +-
.../util/TestSiteToSiteRestApiClient.java | 182 +++++++++-------
.../nifi/web/api/dto/RemoteProcessGroupDTO.java | 50 ++++-
.../web/api/dto/TestRemoteProcessGroupDTO.java | 49 +++++
.../apache/nifi/groups/RemoteProcessGroup.java | 5 +-
.../apache/nifi/controller/FlowController.java | 10 +-
.../controller/StandardFlowSynchronizer.java | 2 +-
.../serialization/FlowFromDOMFactory.java | 1 +
.../serialization/StandardFlowSerializer.java | 3 +-
.../nifi/remote/StandardRemoteProcessGroup.java | 72 ++-----
.../remote/TestStandardRemoteProcessGroup.java | 59 ------
.../nifi/remote/StandardRemoteGroupPort.java | 7 +-
.../remote/TestStandardRemoteGroupPort.java | 3 +-
.../nifi/audit/RemoteProcessGroupAuditor.java | 2 +-
.../nifi/web/api/ProcessGroupResource.java | 30 +--
.../org/apache/nifi/web/api/dto/DtoFactory.java | 4 +-
.../nifi/web/controller/ControllerFacade.java | 2 +-
.../dao/impl/StandardRemoteProcessGroupDAO.java | 8 +-
.../canvas/new-remote-process-group-dialog.jsp | 6 +-
.../remote-process-group-configuration.jsp | 4 +-
.../canvas/remote-process-group-details.jsp | 4 +-
.../canvas/remote-process-group-ports.jsp | 4 +-
.../nf-ng-remote-process-group-component.js | 6 +-
.../nf-remote-process-group-configuration.js | 4 +-
.../canvas/nf-remote-process-group-details.js | 4 +-
.../nf/canvas/nf-remote-process-group-ports.js | 4 +-
.../js/nf/canvas/nf-remote-process-group.js | 2 +-
37 files changed, 785 insertions(+), 306 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java
index 0dec3df..05cf1f9 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java
@@ -16,30 +16,19 @@
*/
package org.apache.nifi.remote.client;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Objects;
import java.util.concurrent.TimeUnit;
public abstract class AbstractSiteToSiteClient implements SiteToSiteClient {
protected final SiteToSiteClientConfig config;
protected final SiteInfoProvider siteInfoProvider;
- protected final URI clusterUrl;
public AbstractSiteToSiteClient(final SiteToSiteClientConfig config) {
this.config = config;
- try {
- Objects.requireNonNull(config.getUrl(), "URL cannot be null");
- clusterUrl = new URI(config.getUrl());
- } catch (final URISyntaxException e) {
- throw new IllegalArgumentException("Invalid Cluster URL: " + config.getUrl());
- }
-
final int commsTimeout = (int) config.getTimeout(TimeUnit.MILLISECONDS);
siteInfoProvider = new SiteInfoProvider();
- siteInfoProvider.setClusterUrl(clusterUrl);
+ siteInfoProvider.setClusterUrls(config.getUrls());
siteInfoProvider.setSslContext(config.getSslContext());
siteInfoProvider.setConnectTimeoutMillis(commsTimeout);
siteInfoProvider.setReadTimeoutMillis(commsTimeout);
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java
index aac7912..a1a9a9c 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java
@@ -18,8 +18,10 @@ package org.apache.nifi.remote.client;
import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -50,19 +52,26 @@ public class SiteInfoProvider {
private final Map<String, String> inputPortMap = new HashMap<>(); // map input port name to identifier
private final Map<String, String> outputPortMap = new HashMap<>(); // map output port name to identifier
- private URI clusterUrl;
+ private Set<String> clusterUrls;
+ private URI activeClusterUrl;
private SSLContext sslContext;
private int connectTimeoutMillis;
private int readTimeoutMillis;
private ControllerDTO refreshRemoteInfo() throws IOException {
- final ControllerDTO controller;
- try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, proxy, EventReporter.NO_OP)) {
- apiClient.setBaseUrl(SiteToSiteRestApiClient.resolveBaseUrl(clusterUrl));
+ final ControllerDTO controller;
+ final URI connectedClusterUrl;
+ try (final SiteToSiteRestApiClient apiClient = createSiteToSiteRestApiClient(sslContext, proxy)) {
apiClient.setConnectTimeoutMillis(connectTimeoutMillis);
apiClient.setReadTimeoutMillis(readTimeoutMillis);
- controller = apiClient.getController();
+ controller = apiClient.getController(clusterUrls);
+ try {
+ connectedClusterUrl = new URI(apiClient.getBaseUrl());
+ } catch (URISyntaxException e) {
+ // This should not happen since apiClient has successfully communicated with this URL.
+ throw new RuntimeException("Failed to parse connected cluster URL due to " + e);
+ }
}
remoteInfoWriteLock.lock();
@@ -70,6 +79,7 @@ public class SiteInfoProvider {
this.siteToSitePort = controller.getRemoteSiteListeningPort();
this.siteToSiteHttpPort = controller.getRemoteSiteHttpListeningPort();
this.siteToSiteSecure = controller.isSiteToSiteSecure();
+ this.activeClusterUrl = connectedClusterUrl;
inputPortMap.clear();
for (final PortDTO inputPort : controller.getInputPorts()) {
@@ -89,8 +99,12 @@ public class SiteInfoProvider {
return controller;
}
+ protected SiteToSiteRestApiClient createSiteToSiteRestApiClient(final SSLContext sslContext, final HttpProxy proxy) {
+ return new SiteToSiteRestApiClient(sslContext, proxy, EventReporter.NO_OP);
+ }
+
public boolean isWebInterfaceSecure() {
- return clusterUrl.toString().startsWith("https");
+ return clusterUrls.stream().anyMatch(url -> url.startsWith("https"));
}
/**
@@ -162,7 +176,7 @@ public class SiteInfoProvider {
final ControllerDTO controller = refreshRemoteInfo();
final Boolean isSecure = controller.isSiteToSiteSecure();
if (isSecure == null) {
- throw new IOException("Remote NiFi instance " + clusterUrl + " is not currently configured to accept site-to-site connections");
+ throw new IOException("Remote NiFi instance " + clusterUrls + " is not currently configured to accept site-to-site connections");
}
return isSecure;
@@ -207,8 +221,39 @@ public class SiteInfoProvider {
}
}
- public void setClusterUrl(URI clusterUrl) {
- this.clusterUrl = clusterUrl;
+ /**
+ * Return an active cluster URL that is known to work.
+ * If it is unknown yet or cache is expired, then remote info will be refreshed.
+ * @return an active cluster URL
+ */
+ public URI getActiveClusterUrl() throws IOException {
+ URI resultClusterUrl;
+ remoteInfoReadLock.lock();
+ try {
+ resultClusterUrl = this.activeClusterUrl;
+ if (resultClusterUrl != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
+ return resultClusterUrl;
+ }
+ } finally {
+ remoteInfoReadLock.unlock();
+ }
+
+ refreshRemoteInfo();
+
+ remoteInfoReadLock.lock();
+ try {
+ return this.activeClusterUrl;
+ } finally {
+ remoteInfoReadLock.unlock();
+ }
+ }
+
+ public void setClusterUrls(Set<String> clusterUrls) {
+ this.clusterUrls = clusterUrls;
+ }
+
+ public Set<String> getClusterUrls() {
+ return clusterUrls;
}
public void setSslContext(SSLContext sslContext) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 94ebca6..3d7bacc 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -41,6 +41,8 @@ import java.io.InputStream;
import java.io.Serializable;
import java.security.KeyStore;
import java.security.SecureRandom;
+import java.util.LinkedHashSet;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
@@ -147,7 +149,7 @@ public interface SiteToSiteClient extends Closeable {
private static final long serialVersionUID = -4954962284343090219L;
- private String url;
+ private Set<String> urls;
private long timeoutNanos = TimeUnit.SECONDS.toNanos(30);
private long penalizationNanos = TimeUnit.SECONDS.toNanos(3);
private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L);
@@ -176,7 +178,7 @@ public interface SiteToSiteClient extends Closeable {
* @return the builder
*/
public Builder fromConfig(final SiteToSiteClientConfig config) {
- this.url = config.getUrl();
+ this.urls = config.getUrls();
this.timeoutNanos = config.getTimeout(TimeUnit.NANOSECONDS);
this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
this.idleExpirationNanos = config.getIdleConnectionExpiration(TimeUnit.NANOSECONDS);
@@ -202,15 +204,37 @@ public interface SiteToSiteClient extends Closeable {
}
/**
- * Specifies the URL of the remote NiFi instance. If this URL points to
- * the Cluster Manager of a NiFi cluster, data transfer to and from
- * nodes will be automatically load balanced across the different nodes.
+ * <p>Specifies the URL of the remote NiFi instance.</p>
+ * <p>If this URL points to a NiFi node in a NiFi cluster, data transfer to and from
+ * nodes will be automatically load balanced across the different nodes.</p>
+ *
+ * <p>For better connectivity with a NiFi cluster, use {@link #urls(Set)} instead.</p>
*
* @param url url of remote instance
* @return the builder
*/
public Builder url(final String url) {
- this.url = url;
+ final Set<String> urls = new LinkedHashSet<>();
+ if (url != null && url.length() > 0) {
+ urls.add(url);
+ }
+ this.urls = urls;
+ return this;
+ }
+
+ /**
+ * <p>Specifies the URLs of the remote NiFi instance.</p>
+ * <p>If this URL points to a NiFi node in a NiFi cluster, data transfer to and from
+ * nodes will be automatically load balanced across the different nodes.</p>
+ *
+ * <p>Multiple urls provide better connectivity with a NiFi cluster, able to connect
+ * to the target cluster at long as one of the specified urls is accessible.</p>
+ *
+ * @param urls urls of remote instance
+ * @return the builder
+ */
+ public Builder urls(final Set<String> urls) {
+ this.urls = urls;
return this;
}
@@ -542,7 +566,7 @@ public interface SiteToSiteClient extends Closeable {
* or if the transport protocol is not supported.
*/
public SiteToSiteClient build() {
- if (url == null) {
+ if (urls == null) {
throw new IllegalStateException("Must specify URL to build Site-to-Site client");
}
@@ -564,7 +588,10 @@ public interface SiteToSiteClient extends Closeable {
* @return the configured URL for the remote NiFi instance
*/
public String getUrl() {
- return url;
+ if (urls != null && urls.size() > 0) {
+ return urls.iterator().next();
+ }
+ return null;
}
/**
@@ -668,7 +695,8 @@ public interface SiteToSiteClient extends Closeable {
private static final long serialVersionUID = 1L;
- private final String url;
+ // This Set instance has to be initialized here to be serialized via Kryo.
+ private final Set<String> urls = new LinkedHashSet<>();
private final long timeoutNanos;
private final long penalizationNanos;
private final long idleExpirationNanos;
@@ -692,7 +720,6 @@ public interface SiteToSiteClient extends Closeable {
// some serialization frameworks require a default constructor
private StandardSiteToSiteClientConfig() {
- this.url = null;
this.timeoutNanos = 0;
this.penalizationNanos = 0;
this.idleExpirationNanos = 0;
@@ -716,7 +743,9 @@ public interface SiteToSiteClient extends Closeable {
}
private StandardSiteToSiteClientConfig(final SiteToSiteClient.Builder builder) {
- this.url = builder.url;
+ if (builder.urls != null) {
+ this.urls.addAll(builder.urls);
+ }
this.timeoutNanos = builder.timeoutNanos;
this.penalizationNanos = builder.penalizationNanos;
this.idleExpirationNanos = builder.idleExpirationNanos;
@@ -746,7 +775,15 @@ public interface SiteToSiteClient extends Closeable {
@Override
public String getUrl() {
- return url;
+ if (urls != null && urls.size() > 0) {
+ return urls.iterator().next();
+ }
+ return null;
+ }
+
+ @Override
+ public Set<String> getUrls() {
+ return urls;
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
index 65a7cfc..5bdeee4 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -18,6 +18,7 @@ package org.apache.nifi.remote.client;
import java.io.File;
import java.io.Serializable;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
@@ -31,10 +32,21 @@ public interface SiteToSiteClientConfig extends Serializable {
/**
* @return the configured URL for the remote NiFi instance
+ * @deprecated This method only returns single URL string even if multiple URLs are set
+ * for backward compatibility for implementations that does not expect multiple URLs.
+ * {@link #getUrls()} should be used instead then should support multiple URLs when making requests.
*/
String getUrl();
/**
+ * SiteToSite implementations should support multiple URLs when establishing a SiteToSite connection with a remote
+ * NiFi instance to provide robust connectivity so that it can keep working as long as at least one of
+ * the configured URLs is accessible.
+ * @return the configured URLs for the remote NiFi instances.
+ */
+ Set<String> getUrls();
+
+ /**
* @param timeUnit unit over which to report the timeout
* @return the communications timeout in given unit
*/
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
index b275265..c933db7 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -92,12 +91,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
throw new IOException("Remote instance of NiFi is not configured to allow HTTP site-to-site communications");
}
- final URI clusterUrl;
- try {
- clusterUrl = new URI(config.getUrl());
- } catch (final URISyntaxException e) {
- throw new IllegalArgumentException("Specified clusterUrl was: " + config.getUrl(), e);
- }
+ final URI clusterUrl = siteInfoProvider.getActiveClusterUrl();
return new PeerDescription(clusterUrl.getHost(), siteInfoProvider.getSiteToSiteHttpPort(), siteInfoProvider.isSecure());
}
@@ -135,8 +129,14 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
final CommunicationsSession commSession = new HttpCommunicationsSession();
final String nodeApiUrl = resolveNodeApiUrl(peerStatus.getPeerDescription());
- final String clusterUrl = config.getUrl();
- final Peer peer = new Peer(peerStatus.getPeerDescription(), commSession, nodeApiUrl, clusterUrl);
+ final StringBuilder clusterUrls = new StringBuilder();
+ config.getUrls().forEach(url -> {
+ if (clusterUrls.length() > 0) {
+ clusterUrls.append(",");
+ clusterUrls.append(url);
+ }
+ });
+ final Peer peer = new Peer(peerStatus.getPeerDescription(), commSession, nodeApiUrl, clusterUrls.toString());
final int penaltyMillis = (int) config.getPenalizationPeriod(TimeUnit.MILLISECONDS);
String portId = config.getPortIdentifier();
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index a17deaa..6f08f73 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -73,7 +73,6 @@ public class EndpointConnectionPool implements PeerStatusProvider {
private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class);
private final ConcurrentMap<PeerDescription, BlockingQueue<EndpointConnection>> connectionQueueMap = new ConcurrentHashMap<>();
- private final URI clusterUrl;
private final Set<EndpointConnection> activeConnections = Collections.synchronizedSet(new HashSet<>());
@@ -85,17 +84,14 @@ public class EndpointConnectionPool implements PeerStatusProvider {
private volatile int commsTimeout;
private volatile boolean shutdown = false;
- private volatile Set<PeerStatus> lastFetchedQueryablePeers;
private final SiteInfoProvider siteInfoProvider;
private final PeerSelector peerSelector;
- public EndpointConnectionPool(final URI clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis,
+ public EndpointConnectionPool(final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis,
final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile, final SiteInfoProvider siteInfoProvider) {
- Objects.requireNonNull(clusterUrl, "URL cannot be null");
Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null");
- this.clusterUrl = clusterUrl;
this.remoteDestination = remoteDestination;
this.sslContext = sslContext;
this.eventReporter = eventReporter;
@@ -156,6 +152,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
SocketClientProtocol protocol = null;
EndpointConnection connection;
Peer peer = null;
+ URI clusterUrl = siteInfoProvider.getActiveClusterUrl();
do {
final List<EndpointConnection> addBack = new ArrayList<>();
@@ -361,7 +358,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
@Override
public PeerDescription getBootstrapPeerDescription() throws IOException {
- final String hostname = clusterUrl.getHost();
+ final String hostname = siteInfoProvider.getActiveClusterUrl().getHost();
final Integer port = siteInfoProvider.getSiteToSitePort();
if (port == null) {
throw new IOException("Remote instance of NiFi is not configured to allow RAW Socket site-to-site communications");
@@ -375,6 +372,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
public Set<PeerStatus> fetchRemotePeerStatuses(final PeerDescription peerDescription) throws IOException {
final String hostname = peerDescription.getHostname();
final int port = peerDescription.getPort();
+ final URI clusterUrl = siteInfoProvider.getActiveClusterUrl();
final PeerDescription clusterPeerDescription = new PeerDescription(hostname, port, clusterUrl.toString().startsWith("https://"));
final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port);
@@ -522,7 +520,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
@Override
public String toString() {
- return "EndpointConnectionPool[Cluster URL=" + clusterUrl + "]";
+ return "EndpointConnectionPool[Cluster URL=" + siteInfoProvider.getClusterUrls() + "]";
}
private class IdEnrichedRemoteDestination implements RemoteDestination {
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index d04234f..1d3cce7 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -47,7 +47,7 @@ public class SocketClient extends AbstractSiteToSiteClient {
super(config);
final int commsTimeout = (int) config.getTimeout(TimeUnit.MILLISECONDS);
- pool = new EndpointConnectionPool(clusterUrl,
+ pool = new EndpointConnectionPool(
createRemoteDestination(config.getPortIdentifier(), config.getPortName()),
commsTimeout,
(int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
index 67bd75e..89da6a0 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
@@ -107,7 +107,10 @@ import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@@ -118,6 +121,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
import java.util.regex.Pattern;
import static org.apache.commons.lang3.StringUtils.isEmpty;
@@ -316,7 +320,48 @@ public class SiteToSiteRestApiClient implements Closeable {
}
}
- public ControllerDTO getController() throws IOException {
+ /**
+ * Parse the clusterUrls String, and try each URL in clusterUrls one by one to get a controller resource
+ * from those remote NiFi instances until a controller is successfully returned or try out all URLs.
+ * After this method execution, the base URL is set with the successful URL.
+ * @param clusterUrls url of the remote NiFi instance, multiple urls can be specified in comma-separated format
+ * @throws IllegalArgumentException when it fails to parse the URLs string,
+ * URLs string contains multiple protocols (http and https mix),
+ * or none of URL is specified.
+ */
+ public ControllerDTO getController(final String clusterUrls) throws IOException {
+ return getController(parseClusterUrls(clusterUrls));
+ }
+
+ /**
+ * Try each URL in clusterUrls one by one to get a controller resource
+ * from those remote NiFi instances until a controller is successfully returned or try out all URLs.
+ * After this method execution, the base URL is set with the successful URL.
+ */
+ public ControllerDTO getController(final Set<String> clusterUrls) throws IOException {
+
+ IOException lastException = null;
+ for (final String clusterUrl : clusterUrls) {
+ // The url may not be normalized if it passed directly without parsed with parseClusterUrls.
+ setBaseUrl(resolveBaseUrl(clusterUrl));
+ try {
+ return getController();
+ } catch (IOException e) {
+ lastException = e;
+ logger.warn("Failed to get controller from " + clusterUrl + " due to " + e);
+ if (logger.isDebugEnabled()) {
+ logger.debug("", e);
+ }
+ }
+ }
+
+ if (clusterUrls.size() > 1) {
+ throw new IOException("Tried all cluster URLs but none of those was accessible. Last Exception was " + lastException, lastException);
+ }
+ throw lastException;
+ }
+
+ private ControllerDTO getController() throws IOException {
try {
final HttpGet get = createGetControllerRequest();
return execute(get, ControllerEntity.class).getController();
@@ -1158,15 +1203,78 @@ public class SiteToSiteRestApiClient implements Closeable {
this.readTimeoutMillis = readTimeoutMillis;
}
- public static String resolveBaseUrl(final String clusterUrl) {
+ public static String getFirstUrl(final String clusterUrlStr) {
+ if (clusterUrlStr == null) {
+ return null;
+ }
+
+ final int commaIndex = clusterUrlStr.indexOf(',');
+ if (commaIndex > -1) {
+ return clusterUrlStr.substring(0, commaIndex);
+ }
+ return clusterUrlStr;
+ }
+
+ /**
+ * Parse the comma-separated URLs string for the remote NiFi instances.
+ * @return A set containing one or more URLs
+ * @throws IllegalArgumentException when it fails to parse the URLs string,
+ * URLs string contains multiple protocols (http and https mix),
+ * or none of URL is specified.
+ */
+ public static Set<String> parseClusterUrls(final String clusterUrlStr) {
+ final Set<String> urls = new LinkedHashSet<>();
+ if (clusterUrlStr != null && clusterUrlStr.length() > 0) {
+ Arrays.stream(clusterUrlStr.split(","))
+ .map(s -> s.trim())
+ .filter(s -> s.length() > 0)
+ .forEach(s -> {
+ validateUriString(s);
+ urls.add(resolveBaseUrl(s).intern());
+ });
+ }
+
+ if (urls.size() == 0) {
+ throw new IllegalArgumentException("Cluster URL was not specified.");
+ }
+
+ final Predicate<String> isHttps = url -> url.toLowerCase().startsWith("https:");
+ if (urls.stream().anyMatch(isHttps) && urls.stream().anyMatch(isHttps.negate())) {
+ throw new IllegalArgumentException("Different protocols are used in the cluster URLs " + clusterUrlStr);
+ }
+
+ return Collections.unmodifiableSet(urls);
+ }
+
+ private static void validateUriString(String s) {
+ // parse the uri
+ final URI uri;
+ try {
+ uri = URI.create(s);
+ } catch (final IllegalArgumentException e) {
+ throw new IllegalArgumentException("The specified remote process group URL is malformed: " + s);
+ }
+
+ // validate each part of the uri
+ if (uri.getScheme() == null || uri.getHost() == null) {
+ throw new IllegalArgumentException("The specified remote process group URL is malformed: " + s);
+ }
+
+ if (!(uri.getScheme().equalsIgnoreCase("http") || uri.getScheme().equalsIgnoreCase("https"))) {
+ throw new IllegalArgumentException("The specified remote process group URL is invalid because it is not http or https: " + s);
+ }
+ }
+
+ private static String resolveBaseUrl(final String clusterUrl) {
Objects.requireNonNull(clusterUrl, "clusterUrl cannot be null.");
- URI clusterUri;
+ final URI uri;
try {
- clusterUri = new URI(clusterUrl.trim());
+ uri = new URI(clusterUrl.trim());
} catch (final URISyntaxException e) {
- throw new IllegalArgumentException("Specified clusterUrl was: " + clusterUrl, e);
+ throw new IllegalArgumentException("The specified URL is malformed: " + clusterUrl);
}
- return resolveBaseUrl(clusterUri);
+
+ return resolveBaseUrl(uri);
}
/**
@@ -1179,7 +1287,17 @@ public class SiteToSiteRestApiClient implements Closeable {
* @param clusterUrl url to be resolved
* @return resolved url
*/
- public static String resolveBaseUrl(final URI clusterUrl) {
+ private static String resolveBaseUrl(final URI clusterUrl) {
+
+ if (clusterUrl.getScheme() == null || clusterUrl.getHost() == null) {
+ throw new IllegalArgumentException("The specified URL is malformed: " + clusterUrl);
+ }
+
+ if (!(clusterUrl.getScheme().equalsIgnoreCase("http") || clusterUrl.getScheme().equalsIgnoreCase("https"))) {
+ throw new IllegalArgumentException("The specified URL is invalid because it is not http or https: " + clusterUrl);
+ }
+
+
String uriPath = clusterUrl.getPath().trim();
if (StringUtils.isEmpty(uriPath) || uriPath.equals("/")) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestSiteInfoProvider.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestSiteInfoProvider.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestSiteInfoProvider.java
new file mode 100644
index 0000000..d9ba4ed
--- /dev/null
+++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestSiteInfoProvider.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.protocol.http.HttpProxy;
+import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
+import org.apache.nifi.web.api.dto.ControllerDTO;
+import org.apache.nifi.web.api.dto.PortDTO;
+import org.junit.Test;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class TestSiteInfoProvider {
+
+ @Test
+ public void testSecure() throws Exception {
+
+ final Set<String> expectedClusterUrl = new LinkedHashSet<>(Arrays.asList(new String[]{"https://node1:8443", "https://node2:8443"}));
+ final String expectedActiveClusterUrl = "https://node2:8443/nifi-api";
+ final SSLContext expectedSslConText = mock(SSLContext.class);
+ final HttpProxy expectedHttpProxy = mock(HttpProxy.class);
+
+ final SiteInfoProvider siteInfoProvider = spy(new SiteInfoProvider());
+ siteInfoProvider.setClusterUrls(expectedClusterUrl);
+ siteInfoProvider.setSslContext(expectedSslConText);
+ siteInfoProvider.setProxy(expectedHttpProxy);
+
+ final ControllerDTO controllerDTO = new ControllerDTO();
+
+ final PortDTO inputPort1 = new PortDTO();
+ inputPort1.setName("input-one");
+ inputPort1.setId("input-0001");
+
+ final PortDTO inputPort2 = new PortDTO();
+ inputPort2.setName("input-two");
+ inputPort2.setId("input-0002");
+
+ final PortDTO outputPort1 = new PortDTO();
+ outputPort1.setName("output-one");
+ outputPort1.setId("output-0001");
+
+ final PortDTO outputPort2 = new PortDTO();
+ outputPort2.setName("output-two");
+ outputPort2.setId("output-0002");
+
+ final Set<PortDTO> inputPorts = new HashSet<>();
+ inputPorts.add(inputPort1);
+ inputPorts.add(inputPort2);
+
+ final Set<PortDTO> outputPorts = new HashSet<>();
+ outputPorts.add(outputPort1);
+ outputPorts.add(outputPort2);
+
+ controllerDTO.setInputPorts(inputPorts);
+ controllerDTO.setOutputPorts(outputPorts);
+ controllerDTO.setRemoteSiteListeningPort(8081);
+ controllerDTO.setRemoteSiteHttpListeningPort(8443);
+ controllerDTO.setSiteToSiteSecure(true);
+
+ // SiteInfoProvider uses SiteToSIteRestApiClient to get ControllerDTO.
+ doAnswer(invocation -> {
+ final SSLContext sslContext = invocation.getArgumentAt(0, SSLContext.class);
+ final HttpProxy httpProxy = invocation.getArgumentAt(1, HttpProxy.class);
+
+ assertEquals(expectedSslConText, sslContext);
+ assertEquals(expectedHttpProxy, httpProxy);
+
+ final SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
+
+ when(apiClient.getController(eq(expectedClusterUrl))).thenReturn(controllerDTO);
+
+ when(apiClient.getBaseUrl()).thenReturn(expectedActiveClusterUrl);
+
+ return apiClient;
+ }).when(siteInfoProvider).createSiteToSiteRestApiClient(any(), any());
+
+ // siteInfoProvider should expose correct information of the remote NiFi cluster.
+ assertEquals(controllerDTO.getRemoteSiteListeningPort(), siteInfoProvider.getSiteToSitePort());
+ assertEquals(controllerDTO.getRemoteSiteHttpListeningPort(), siteInfoProvider.getSiteToSiteHttpPort());
+ assertEquals(controllerDTO.isSiteToSiteSecure(), siteInfoProvider.isSecure());
+ assertTrue(siteInfoProvider.isWebInterfaceSecure());
+
+ assertEquals(inputPort1.getId(), siteInfoProvider.getInputPortIdentifier(inputPort1.getName()));
+ assertEquals(inputPort2.getId(), siteInfoProvider.getInputPortIdentifier(inputPort2.getName()));
+ assertEquals(outputPort1.getId(), siteInfoProvider.getOutputPortIdentifier(outputPort1.getName()));
+ assertEquals(outputPort2.getId(), siteInfoProvider.getOutputPortIdentifier(outputPort2.getName()));
+ assertNull(siteInfoProvider.getInputPortIdentifier("not-exist"));
+ assertNull(siteInfoProvider.getOutputPortIdentifier("not-exist"));
+
+ assertEquals(inputPort1.getId(), siteInfoProvider.getPortIdentifier(inputPort1.getName(), TransferDirection.SEND));
+ assertEquals(outputPort1.getId(), siteInfoProvider.getPortIdentifier(outputPort1.getName(), TransferDirection.RECEIVE));
+
+ assertEquals(expectedActiveClusterUrl, siteInfoProvider.getActiveClusterUrl().toString());
+
+ }
+
+ @Test
+ public void testPlain() throws Exception {
+
+ final Set<String> expectedClusterUrl = new LinkedHashSet<>(Arrays.asList(new String[]{"http://node1:8443, http://node2:8443"}));
+ final String expectedActiveClusterUrl = "http://node2:8443/nifi-api";
+
+ final SiteInfoProvider siteInfoProvider = spy(new SiteInfoProvider());
+ siteInfoProvider.setClusterUrls(expectedClusterUrl);
+
+ final ControllerDTO controllerDTO = new ControllerDTO();
+
+ controllerDTO.setInputPorts(Collections.emptySet());
+ controllerDTO.setOutputPorts(Collections.emptySet());
+ controllerDTO.setRemoteSiteListeningPort(8081);
+ controllerDTO.setRemoteSiteHttpListeningPort(8080);
+ controllerDTO.setSiteToSiteSecure(false);
+
+ // SiteInfoProvider uses SiteToSIteRestApiClient to get ControllerDTO.
+ doAnswer(invocation -> {
+ final SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
+
+ when(apiClient.getController(eq(expectedClusterUrl))).thenReturn(controllerDTO);
+
+ when(apiClient.getBaseUrl()).thenReturn(expectedActiveClusterUrl);
+
+ return apiClient;
+ }).when(siteInfoProvider).createSiteToSiteRestApiClient(any(), any());
+
+ // siteInfoProvider should expose correct information of the remote NiFi cluster.
+ assertEquals(controllerDTO.getRemoteSiteListeningPort(), siteInfoProvider.getSiteToSitePort());
+ assertEquals(controllerDTO.getRemoteSiteHttpListeningPort(), siteInfoProvider.getSiteToSiteHttpPort());
+ assertEquals(controllerDTO.isSiteToSiteSecure(), siteInfoProvider.isSecure());
+ assertFalse(siteInfoProvider.isWebInterfaceSecure());
+
+ assertEquals(expectedActiveClusterUrl, siteInfoProvider.getActiveClusterUrl().toString());
+
+ }
+
+ @Test
+ public void testConnectException() throws Exception {
+
+ final Set<String> expectedClusterUrl = new LinkedHashSet<>(Arrays.asList(new String[]{"http://node1:8443, http://node2:8443"}));
+
+ final SiteInfoProvider siteInfoProvider = spy(new SiteInfoProvider());
+ siteInfoProvider.setClusterUrls(expectedClusterUrl);
+
+ final ControllerDTO controllerDTO = new ControllerDTO();
+
+ controllerDTO.setInputPorts(Collections.emptySet());
+ controllerDTO.setOutputPorts(Collections.emptySet());
+ controllerDTO.setRemoteSiteListeningPort(8081);
+ controllerDTO.setRemoteSiteHttpListeningPort(8080);
+ controllerDTO.setSiteToSiteSecure(false);
+
+ // SiteInfoProvider uses SiteToSIteRestApiClient to get ControllerDTO.
+ doAnswer(invocation -> {
+ final SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
+
+ when(apiClient.getController(eq(expectedClusterUrl))).thenThrow(new IOException("Connection refused."));
+
+ return apiClient;
+ }).when(siteInfoProvider).createSiteToSiteRestApiClient(any(), any());
+
+ try {
+ siteInfoProvider.getSiteToSitePort();
+ fail();
+ } catch (IOException e) {
+ }
+
+ try {
+ siteInfoProvider.getActiveClusterUrl();
+ fail();
+ } catch (IOException e) {
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
index 9e76a78..1ae9f2e 100644
--- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
@@ -80,6 +80,7 @@ import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -730,10 +731,10 @@ public class TestHttpClient {
final URI uri = server.getURI();
try (
- SiteToSiteClient client = getDefaultBuilder()
- .url("http://" + uri.getHost() + ":" + uri.getPort() + "/wrong")
- .portName("input-running")
- .build()
+ SiteToSiteClient client = getDefaultBuilder()
+ .url("http://" + uri.getHost() + ":" + uri.getPort() + "/wrong")
+ .portName("input-running")
+ .build()
) {
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
@@ -812,6 +813,24 @@ public class TestHttpClient {
}
@Test
+ public void testSendSuccessMultipleUrls() throws Exception {
+
+ final Set<String> urls = new LinkedHashSet<>();
+ urls.add("http://localhost:9999");
+ urls.add("http://localhost:" + httpConnector.getLocalPort() + "/nifi");
+
+ try (
+ final SiteToSiteClient client = getDefaultBuilder()
+ .urls(urls)
+ .portName("input-running")
+ .build()
+ ) {
+ testSend(client);
+ }
+
+ }
+
+ @Test
public void testSendSuccessWithProxy() throws Exception {
try (
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
index 194a167..c0b5e83 100644
--- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
@@ -35,7 +35,9 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
+import java.util.LinkedHashSet;
import java.util.Map;
+import java.util.Set;
public class TestSiteToSiteClient {
@@ -128,10 +130,23 @@ public class TestSiteToSiteClient {
try {
SiteToSiteClientConfig clientConfig2 = kryo.readObject(input, SiteToSiteClient.StandardSiteToSiteClientConfig.class);
- Assert.assertEquals(clientConfig.getUrl(), clientConfig2.getUrl());
+ Assert.assertEquals(clientConfig.getUrls(), clientConfig2.getUrls());
} finally {
input.close();
}
}
+ @Test
+ public void testGetUrlBackwardCompatibility() {
+ final Set<String> urls = new LinkedHashSet<>();
+ urls.add("http://node1:8080/nifi");
+ urls.add("http://node2:8080/nifi");
+ final SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
+ .urls(urls)
+ .buildConfig();
+
+ Assert.assertEquals("http://node1:8080/nifi", config.getUrl());
+ Assert.assertEquals(urls, config.getUrls());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java
index 0dfb90c..22b192b 100644
--- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java
@@ -16,119 +16,163 @@
*/
package org.apache.nifi.remote.util;
-import org.apache.nifi.events.EventReporter;
import org.junit.Assert;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.apache.nifi.remote.util.SiteToSiteRestApiClient.parseClusterUrls;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class TestSiteToSiteRestApiClient {
+ private static void assertSingleUri(final String expected, final Set<String> urls) {
+ Assert.assertEquals(1, urls.size());
+ Assert.assertEquals(expected, urls.iterator().next().toString());
+ }
+
@Test
public void testResolveBaseUrlHttp() throws Exception{
-
- final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
-
- final String baseUrl = apiClient.resolveBaseUrl("http://nifi.example.com/nifi");
- Assert.assertEquals("http://nifi.example.com/nifi-api", baseUrl);
+ assertSingleUri("http://nifi.example.com/nifi-api", parseClusterUrls("http://nifi.example.com/nifi"));
}
@Test
public void testResolveBaseUrlHttpSub() throws Exception{
-
- final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
-
- final String baseUrl = apiClient.resolveBaseUrl("http://nifi.example.com/foo/bar/baz/nifi");
- Assert.assertEquals("http://nifi.example.com/foo/bar/baz/nifi-api", baseUrl);
+ assertSingleUri("http://nifi.example.com/foo/bar/baz/nifi-api", parseClusterUrls("http://nifi.example.com/foo/bar/baz/nifi"));
}
@Test
public void testResolveBaseUrlHttpPort() {
- final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
-
- final String baseUrl = apiClient.resolveBaseUrl("http://nifi.example.com:8080/nifi");
- Assert.assertEquals("http://nifi.example.com:8080/nifi-api", baseUrl);
+ assertSingleUri("http://nifi.example.com:8080/nifi-api", parseClusterUrls("http://nifi.example.com:8080/nifi"));
}
@Test
public void testResolveBaseUrlHttps() throws Exception{
-
- final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
-
- final String baseUrl = apiClient.resolveBaseUrl("https://nifi.example.com/nifi");
- Assert.assertEquals("https://nifi.example.com/nifi-api", baseUrl);
+ assertSingleUri("https://nifi.example.com/nifi-api", parseClusterUrls("https://nifi.example.com/nifi"));
}
@Test
public void testResolveBaseUrlHttpsPort() {
- final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
-
- final String baseUrl = apiClient.resolveBaseUrl("https://nifi.example.com:8443/nifi");
- Assert.assertEquals("https://nifi.example.com:8443/nifi-api", baseUrl);
+ assertSingleUri("https://nifi.example.com:8443/nifi-api", parseClusterUrls("https://nifi.example.com:8443/nifi"));
}
@Test
public void testResolveBaseUrlLeniency() {
- final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
String expectedUri = "http://localhost:8080/nifi-api";
- assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080 "));
- assertEquals(expectedUri, apiClient.resolveBaseUrl(" http://localhost:8080 "));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi/"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi/ "));
- assertEquals(expectedUri, apiClient.resolveBaseUrl(" http://localhost:8080/nifi/ "));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi-api"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi-api/"));
+ assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/"));
+ assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080"));
+ assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080 "));
+ assertSingleUri(expectedUri, parseClusterUrls(" http://localhost:8080 "));
+ assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/nifi"));
+ assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/nifi/"));
+ assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/nifi/ "));
+ assertSingleUri(expectedUri, parseClusterUrls(" http://localhost:8080/nifi/ "));
+ assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/nifi-api"));
+ assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/nifi-api/"));
expectedUri = "http://localhost/nifi-api";
- assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost/"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost/nifi"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost/nifi-api"));
+ assertSingleUri(expectedUri, parseClusterUrls("http://localhost"));
+ assertSingleUri(expectedUri, parseClusterUrls("http://localhost/"));
+ assertSingleUri(expectedUri, parseClusterUrls("http://localhost/nifi"));
+ assertSingleUri(expectedUri, parseClusterUrls("http://localhost/nifi-api"));
expectedUri = "http://localhost:8080/some/path/nifi-api";
- assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl(" http://localhost:8080/some/path"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path "));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path/nifi"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path/nifi/"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path/nifi-api"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path/nifi-api/"));
+ assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/some/path"));
+ assertSingleUri(expectedUri, parseClusterUrls(" http://localhost:8080/some/path"));
+ assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/some/path "));
+ assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/some/path/nifi"));
+ assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/some/path/nifi/"));
+ assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/some/path/nifi-api"));
+ assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/some/path/nifi-api/"));
}
@Test
public void testResolveBaseUrlLeniencyHttps() {
- final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
String expectedUri = "https://localhost:8443/nifi-api";
- assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443 "));
- assertEquals(expectedUri, apiClient.resolveBaseUrl(" https://localhost:8443 "));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi/"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi/ "));
- assertEquals(expectedUri, apiClient.resolveBaseUrl(" https://localhost:8443/nifi/ "));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi-api"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi-api/"));
+ assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/"));
+ assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443"));
+ assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443 "));
+ assertSingleUri(expectedUri, parseClusterUrls(" https://localhost:8443 "));
+ assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/nifi"));
+ assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/nifi/"));
+ assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/nifi/ "));
+ assertSingleUri(expectedUri, parseClusterUrls(" https://localhost:8443/nifi/ "));
+ assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/nifi-api"));
+ assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/nifi-api/"));
expectedUri = "https://localhost/nifi-api";
- assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost/"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost/nifi"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost/nifi-api"));
+ assertSingleUri(expectedUri, parseClusterUrls("https://localhost"));
+ assertSingleUri(expectedUri, parseClusterUrls("https://localhost/"));
+ assertSingleUri(expectedUri, parseClusterUrls("https://localhost/nifi"));
+ assertSingleUri(expectedUri, parseClusterUrls("https://localhost/nifi-api"));
expectedUri = "https://localhost:8443/some/path/nifi-api";
- assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl(" https://localhost:8443/some/path"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path "));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path/nifi"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path/nifi/"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path/nifi-api"));
- assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path/nifi-api/"));
+ assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/some/path"));
+ assertSingleUri(expectedUri, parseClusterUrls(" https://localhost:8443/some/path"));
+ assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/some/path "));
+ assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/some/path/nifi"));
+ assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/some/path/nifi/"));
+ assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/some/path/nifi-api"));
+ assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/some/path/nifi-api/"));
+ }
+
+ @Test
+ public void testGetUrlsEmpty() throws Exception {
+ try {
+ parseClusterUrls(null);
+ fail("Should fail if cluster URL was not specified.");
+ } catch (IllegalArgumentException e) {
+ }
+
+ try {
+ parseClusterUrls("");
+ fail("Should fail if cluster URL was not specified.");
+ } catch (IllegalArgumentException e) {
+ }
}
+ @Test
+ public void testGetUrlsOne() throws Exception {
+ final Set<String> urls = parseClusterUrls("http://localhost:8080/nifi");
+
+ Assert.assertEquals(1, urls.size());
+ Assert.assertEquals("http://localhost:8080/nifi-api", urls.iterator().next());
+ }
+
+ @Test
+ public void testGetUrlsThree() throws Exception {
+ final Set<String> urls = parseClusterUrls("http://host1:8080/nifi,http://host2:8080/nifi,http://host3:8080/nifi");
+
+ Assert.assertEquals(3, urls.size());
+ final Iterator<String> iterator = urls.iterator();
+ Assert.assertEquals("http://host1:8080/nifi-api", iterator.next());
+ Assert.assertEquals("http://host2:8080/nifi-api", iterator.next());
+ Assert.assertEquals("http://host3:8080/nifi-api", iterator.next());
+ }
+
+ @Test
+ public void testGetUrlsDifferentProtocols() throws Exception {
+
+ try {
+ parseClusterUrls("http://host1:8080/nifi,https://host2:8080/nifi,http://host3:8080/nifi");
+ fail("Should fail if cluster URLs contain different protocols.");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("Different protocols"));
+ }
+ }
+
+ @Test
+ public void testGetUrlsMalformed() throws Exception {
+
+ try {
+ parseClusterUrls("http://host1:8080/nifi,host&2:8080,http://host3:8080/nifi");
+ fail("Should fail if cluster URLs contain illegal URL.");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("malformed"));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java
index 0afc1d5..df01b82 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java
@@ -31,6 +31,7 @@ import java.util.Date;
public class RemoteProcessGroupDTO extends ComponentDTO {
private String targetUri;
+ private String targetUris;
private Boolean targetSecure;
private String name;
@@ -74,15 +75,60 @@ public class RemoteProcessGroupDTO extends ComponentDTO {
}
/**
- * @return target uri of this remote process group
+ * @return target uri of this remote process group.
+ * If target uri is not set, but uris are set, then returns the first url in the urls.
+ * If neither target uri nor uris are set, then returns null.
*/
@ApiModelProperty(
- value = "The target URI of the remote process group."
+ value = "The target URI of the remote process group." +
+ " If target uri is not set, but uris are set, then returns the first url in the urls." +
+ " If neither target uri nor uris are set, then returns null."
)
public String getTargetUri() {
+ if (targetUri == null || targetUri.length() == 0) {
+ synchronized (this) {
+ if (targetUri == null || targetUri.length() == 0) {
+ if (targetUris != null && targetUris.length() > 0) {
+ if (targetUris.indexOf(',') > -1) {
+ targetUri = targetUris.substring(0, targetUris.indexOf(','));
+ } else {
+ targetUri = targetUris;
+ }
+ }
+ }
+ }
+ }
+
return this.targetUri;
}
+ public void setTargetUris(String targetUris) {
+ this.targetUris = targetUris;
+ }
+
+ /**
+ * @return target uris of this remote process group
+ * If targetUris was not set but target uri was set, then returns a collection containing the single uri.
+ * If neither target uris nor uri were set, then returns null.
+ */
+ @ApiModelProperty(
+ value = "The target URI of the remote process group." +
+ " If target uris is not set but target uri is set," +
+ " then returns a collection containing the single target uri." +
+ " If neither target uris nor uris are set, then returns null."
+ )
+ public String getTargetUris() {
+ if (targetUris == null || targetUris.length() == 0) {
+ synchronized (this) {
+ if (targetUris == null || targetUris.length() == 0) {
+ targetUris = targetUri;
+ }
+ }
+ }
+
+ return this.targetUris;
+ }
+
/**
* @param name of this remote process group
*/
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/test/java/org/apache/nifi/web/api/dto/TestRemoteProcessGroupDTO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/test/java/org/apache/nifi/web/api/dto/TestRemoteProcessGroupDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/test/java/org/apache/nifi/web/api/dto/TestRemoteProcessGroupDTO.java
new file mode 100644
index 0000000..ff8c61b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/test/java/org/apache/nifi/web/api/dto/TestRemoteProcessGroupDTO.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.dto;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestRemoteProcessGroupDTO {
+
+ @Test
+ public void testGetTargetUriAndUris() {
+ final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO();
+
+ assertNull(dto.getTargetUri());
+
+ dto.setTargetUris("http://node1:8080/nifi, http://node2:8080/nifi");
+ assertEquals("If targetUris are set but targetUri is not, it should returns the first uru of the targetUris",
+ "http://node1:8080/nifi", dto.getTargetUri());
+ assertEquals("http://node1:8080/nifi, http://node2:8080/nifi", dto.getTargetUris());
+
+ dto.setTargetUri("http://node3:9090/nifi");
+ assertEquals("If both targetUri and targetUris are set, each returns its own values",
+ "http://node3:9090/nifi", dto.getTargetUri());
+ assertEquals("http://node1:8080/nifi, http://node2:8080/nifi", dto.getTargetUris());
+
+ dto.setTargetUris(null);
+ assertEquals("http://node3:9090/nifi", dto.getTargetUri());
+ assertEquals("getTargetUris should return targetUri when it's not set",
+ "http://node3:9090/nifi", dto.getTargetUris());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index 133f274..64e2ca0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -23,7 +23,6 @@ import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
-import java.net.URI;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -32,7 +31,9 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable
String getIdentifier();
- URI getTargetUri();
+ String getTargetUri();
+
+ String getTargetUris();
ProcessGroup getProcessGroup();
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index f78363e..0f22f51 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -1226,13 +1226,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* given URI
*
* @param id group id
- * @param uri group uri
+ * @param uris group uris, multiple url can be specified in comma-separated format
* @return new group
* @throws NullPointerException if either argument is null
* @throws IllegalArgumentException if <code>uri</code> is not a valid URI.
*/
- public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uri) {
- return new StandardRemoteProcessGroup(requireNonNull(id).intern(), requireNonNull(uri).intern(), null, this, sslContext, nifiProperties);
+ public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uris) {
+ return new StandardRemoteProcessGroup(requireNonNull(id).intern(), uris, null, this, sslContext, nifiProperties);
}
public ProcessGroup getRootGroup() {
@@ -1769,7 +1769,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// Instantiate Remote Process Groups
//
for (final RemoteProcessGroupDTO remoteGroupDTO : dto.getRemoteProcessGroups()) {
- final RemoteProcessGroup remoteGroup = createRemoteProcessGroup(remoteGroupDTO.getId(), remoteGroupDTO.getTargetUri());
+ final RemoteProcessGroup remoteGroup = createRemoteProcessGroup(remoteGroupDTO.getId(), remoteGroupDTO.getTargetUris());
remoteGroup.setComments(remoteGroupDTO.getComments());
remoteGroup.setPosition(toPosition(remoteGroupDTO.getPosition()));
remoteGroup.setCommunicationsTimeout(remoteGroupDTO.getCommunicationsTimeout());
@@ -2608,7 +2608,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final RemoteProcessGroupStatus status = new RemoteProcessGroupStatus();
status.setGroupId(remoteGroup.getProcessGroup().getIdentifier());
status.setName(isRemoteProcessGroupAuthorized ? remoteGroup.getName() : remoteGroup.getIdentifier());
- status.setTargetUri(isRemoteProcessGroupAuthorized ? remoteGroup.getTargetUri().toString() : null);
+ status.setTargetUri(isRemoteProcessGroupAuthorized ? remoteGroup.getTargetUri() : null);
long lineageMillis = 0L;
int flowFilesRemoved = 0;
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 1d7ebde..d865475 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -1091,7 +1091,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final List<Element> remoteProcessGroupNodeList = getChildrenByTagName(processGroupElement, "remoteProcessGroup");
for (final Element remoteProcessGroupElement : remoteProcessGroupNodeList) {
final RemoteProcessGroupDTO remoteGroupDto = FlowFromDOMFactory.getRemoteProcessGroup(remoteProcessGroupElement, encryptor);
- final RemoteProcessGroup remoteGroup = controller.createRemoteProcessGroup(remoteGroupDto.getId(), remoteGroupDto.getTargetUri());
+ final RemoteProcessGroup remoteGroup = controller.createRemoteProcessGroup(remoteGroupDto.getId(), remoteGroupDto.getTargetUris());
remoteGroup.setComments(remoteGroupDto.getComments());
remoteGroup.setPosition(toPosition(remoteGroupDto.getPosition()));
final String name = remoteGroupDto.getName();
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
index f1e4232..6c39f16 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
@@ -252,6 +252,7 @@ public class FlowFromDOMFactory {
dto.setId(getString(element, "id"));
dto.setName(getString(element, "name"));
dto.setTargetUri(getString(element, "url"));
+ dto.setTargetUris(getString(element, "urls"));
dto.setTransmitting(getBoolean(element, "transmitting"));
dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
dto.setCommunicationsTimeout(getString(element, "timeout"));
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c5bd876/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
index 0ead668..b8936ba 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
@@ -246,7 +246,8 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(element, "name", remoteRef.getName());
addPosition(element, remoteRef.getPosition());
addTextElement(element, "comment", remoteRef.getComments());
- addTextElement(element, "url", remoteRef.getTargetUri().toString());
+ addTextElement(element, "url", remoteRef.getTargetUri());
+ addTextElement(element, "urls", remoteRef.getTargetUris());
addTextElement(element, "timeout", remoteRef.getCommunicationsTimeout());
addTextElement(element, "yieldPeriod", remoteRef.getYieldDuration());
addTextElement(element, "transmitting", String.valueOf(remoteRef.isTransmitting()));