You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:14:26 UTC
[38/50] nifi git commit: NIFI-4436: Bug fix to ensure that RPG's
ports are not removed until after connections are established to the ports;
ensure that if a registry's name is changed that it is updated immediately in
VersionControlInformation objects
NIFI-4436: Bug fix to ensure that RPG's ports are not removed until after connections are established to the ports; ensure that if a registry's name is changed that it is updated immediately in VersionControlInformation objects
Signed-off-by: Matt Gilman <ma...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/014c542f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/014c542f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/014c542f
Branch: refs/heads/master
Commit: 014c542f48b61c027152129ae7cd8c8535dd6a64
Parents: 49aad2c
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Dec 1 16:31:22 2017 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Jan 8 12:44:55 2018 -0500
----------------------------------------------------------------------
.../apache/nifi/groups/RemoteProcessGroup.java | 2 +
.../apache/nifi/controller/FlowController.java | 1 +
.../controller/StandardFlowSynchronizer.java | 2 +
.../nifi/groups/StandardProcessGroup.java | 12 ++-
.../registry/flow/RestBasedFlowRegistry.java | 42 +++++++---
.../nifi/remote/StandardRemoteProcessGroup.java | 88 ++++++++++++--------
.../org/apache/nifi/web/NiFiServiceFacade.java | 11 +++
.../nifi/web/StandardNiFiServiceFacade.java | 25 ++++++
.../nifi/web/api/ProcessGroupResource.java | 12 +--
.../apache/nifi/web/api/VersionsResource.java | 15 +++-
.../org/apache/nifi/web/api/dto/DtoFactory.java | 2 +-
.../dao/impl/StandardRemoteProcessGroupDAO.java | 1 +
12 files changed, 156 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index 7d92246..39be045 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit;
public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable, VersionedComponent {
+ void initialize();
+
@Override
String getIdentifier();
http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 2afa9dc..158aaa2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -1779,6 +1779,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
*/
public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException {
instantiateSnippet(group, dto, true);
+ group.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize);
}
private void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto, final boolean topLevel) throws ProcessorInstantiationException {
http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 28d9b79..9cbf323 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -360,6 +360,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
rootGroup = updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion);
}
+ rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
+
// If there are any Templates that do not exist in the Proposed Flow that do exist in the 'existing flow', we need
// to ensure that we also add those to the appropriate Process Groups, so that we don't lose them.
final Document existingFlowConfiguration = parseFlowBytes(existingFlow);
http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 4b186a9..fb3d3a6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -2964,6 +2964,13 @@ public final class StandardProcessGroup implements ProcessGroup {
versionControlInformation.getStatus()) {
@Override
+ public String getRegistryName() {
+ final String registryId = versionControlInformation.getRegistryIdentifier();
+ final FlowRegistry registry = flowController.getFlowRegistryClient().getFlowRegistry(registryId);
+ return registry == null ? registryId : registry.getName();
+ }
+
+ @Override
public boolean isModified() {
boolean updated = false;
while (true) {
@@ -3220,7 +3227,7 @@ public final class StandardProcessGroup implements ProcessGroup {
updated = flowStatus.compareAndSet(status, updatedStatus);
}
} catch (final IOException | NiFiRegistryException e) {
- final String message = String.format("Failed to synchronize Process Group with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry");
+ final String message = String.format("Failed to synchronize Process Group with Flow Registry : " + e.getMessage());
setSyncFailedState(message);
LOG.error("Failed to synchronize {} with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry", this, e);
@@ -3451,6 +3458,7 @@ public final class StandardProcessGroup implements ProcessGroup {
if (childGroup == null) {
final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip);
+ added.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize);
LOG.info("Added {} to {}", added, this);
} else if (childCoordinates == null || updateDescendantVersionedGroups) {
updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName, updateDescendantVersionedGroups, variablesToSkip);
@@ -3759,7 +3767,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final Connectable destination = getConnectable(destinationGroup, proposed.getDestination());
if (destination == null) {
- throw new IllegalArgumentException("Connection has a destination with identifier " + proposed.getIdentifier()
+ throw new IllegalArgumentException("Connection has a destination with identifier " + proposed.getDestination().getId()
+ " but no component could be found in the Process Group with a corresponding identifier");
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
index 1147b9e..21e5e0c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
@@ -115,40 +115,61 @@ public class RestBasedFlowRegistry implements FlowRegistry {
return (user == null || user.isAnonymous()) ? null : user.getIdentity();
}
+ private BucketClient getBucketClient(final NiFiUser user) {
+ final String identity = getIdentity(user);
+ final NiFiRegistryClient registryClient = getRegistryClient();
+ final BucketClient bucketClient = identity == null ? registryClient.getBucketClient() : registryClient.getBucketClient(identity);
+ return bucketClient;
+ }
+
+ private FlowSnapshotClient getFlowSnapshotClient(final NiFiUser user) {
+ final String identity = getIdentity(user);
+ final NiFiRegistryClient registryClient = getRegistryClient();
+ final FlowSnapshotClient snapshotClient = identity == null ? registryClient.getFlowSnapshotClient() : registryClient.getFlowSnapshotClient(identity);
+ return snapshotClient;
+ }
+
+ private FlowClient getFlowClient(final NiFiUser user) {
+ final String identity = getIdentity(user);
+ final NiFiRegistryClient registryClient = getRegistryClient();
+ final FlowClient flowClient = identity == null ? registryClient.getFlowClient() : registryClient.getFlowClient(identity);
+ return flowClient;
+ }
+
@Override
public Set<Bucket> getBuckets(final NiFiUser user) throws IOException, NiFiRegistryException {
- final BucketClient bucketClient = getRegistryClient().getBucketClient(getIdentity(user));
+ final BucketClient bucketClient = getBucketClient(user);
return new HashSet<>(bucketClient.getAll());
}
@Override
public Bucket getBucket(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
- final BucketClient bucketClient = getRegistryClient().getBucketClient(getIdentity(user));
+ final BucketClient bucketClient = getBucketClient(user);
return bucketClient.get(bucketId);
}
@Override
public Set<VersionedFlow> getFlows(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
- final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
+ final FlowClient flowClient = getFlowClient(user);
return new HashSet<>(flowClient.getByBucket(bucketId));
}
@Override
public Set<VersionedFlowSnapshotMetadata> getFlowVersions(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
- final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user));
+ final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(user);
return new HashSet<>(snapshotClient.getSnapshotMetadata(bucketId, flowId));
}
@Override
public VersionedFlow registerVersionedFlow(final VersionedFlow flow, final NiFiUser user) throws IOException, NiFiRegistryException {
- final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
+ final FlowClient flowClient = getFlowClient(user);
return flowClient.create(flow);
}
@Override
public VersionedFlow deleteVersionedFlow(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
- final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
+ final FlowClient flowClient = getFlowClient(user);
return flowClient.delete(bucketId, flowId);
}
@@ -156,7 +177,7 @@ public class RestBasedFlowRegistry implements FlowRegistry {
public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot,
final String comments, final int expectedVersion, final NiFiUser user) throws IOException, NiFiRegistryException {
- final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user));
+ final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(user);
final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot();
versionedFlowSnapshot.setFlowContents(snapshot);
@@ -174,13 +195,14 @@ public class RestBasedFlowRegistry implements FlowRegistry {
@Override
public int getLatestVersion(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
- return (int) getRegistryClient().getFlowClient(getIdentity(user)).get(bucketId, flowId).getVersionCount();
+ return (int) getFlowClient(user).get(bucketId, flowId).getVersionCount();
}
@Override
public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows, final NiFiUser user)
throws IOException, NiFiRegistryException {
- final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user));
+
+ final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(user);
final VersionedFlowSnapshot flowSnapshot = snapshotClient.get(bucketId, flowId, version);
if (fetchRemoteFlows) {
@@ -241,7 +263,7 @@ public class RestBasedFlowRegistry implements FlowRegistry {
@Override
public VersionedFlow getVersionedFlow(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
- final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
+ final FlowClient flowClient = getFlowClient(user);
return flowClient.get(bucketId, flowId);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index ef05a1b..5808500 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -16,6 +16,40 @@
*/
package org.apache.nifi.remote;
+import static java.util.Objects.requireNonNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.core.Response;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
@@ -34,7 +68,6 @@ import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
@@ -51,39 +84,6 @@ import org.apache.nifi.web.api.dto.PortDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
-import javax.ws.rs.core.Response;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static java.util.Objects.requireNonNull;
-
/**
* Represents the Root Process Group of a remote NiFi Instance. Holds
* information about that remote instance, as well as {@link IncomingPort}s and
@@ -104,6 +104,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private final EventReporter eventReporter;
private final NiFiProperties nifiProperties;
private final long remoteContentsCacheExpiration;
+ private volatile boolean initialized = false;
private final AtomicReference<String> name = new AtomicReference<>();
private final AtomicReference<Position> position = new AtomicReference<>(new Position(0D, 0D));
@@ -179,7 +180,16 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
final Runnable checkAuthorizations = new InitializationTask();
backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id, true);
- backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 5L, 30L, TimeUnit.SECONDS);
+ backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 30L, 30L, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void initialize() {
+ if (initialized) {
+ return;
+ }
+
+ initialized = true;
backgroundThreadExecutor.submit(() -> {
try {
refreshFlowContents();
@@ -820,6 +830,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
@Override
public void refreshFlowContents() throws CommunicationsException {
+ if (!initialized) {
+ return;
+ }
+
try {
// perform the request
final ControllerDTO dto;
@@ -1153,6 +1167,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
@Override
public void run() {
+ if (!initialized) {
+ return;
+ }
+
try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()) {
try {
final ControllerDTO dto = apiClient.getController(targetUris);
http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 02df16b..be77d10 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -418,6 +418,17 @@ public interface NiFiServiceFacade {
void verifyComponentTypes(VersionedProcessGroup versionedGroup);
/**
+ * Verifies that the flow identified by the given Version Control Information can be imported into the Process Group
+ * with the given id
+ *
+ * @param versionControlInfo the information about the versioned flow
+ * @param groupId the ID of the Process Group where the flow should be instantiated
+ *
+ * @throws IllegalStateException if the flow cannot be imported into the specified group
+ */
+ void verifyImportProcessGroup(VersionControlInformationDTO versionControlInfo, String groupId);
+
+ /**
* Creates a new Template based off the specified snippet.
*
* @param name name
http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 4945296..4adb85b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -1863,6 +1863,31 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
+ public void verifyImportProcessGroup(final VersionControlInformationDTO versionControlInfo, final String groupId) {
+ final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
+ verifyImportProcessGroup(versionControlInfo, group);
+ }
+
+ private void verifyImportProcessGroup(final VersionControlInformationDTO vciDto, final ProcessGroup group) {
+ if (group == null) {
+ return;
+ }
+
+ final VersionControlInformation vci = group.getVersionControlInformation();
+ if (vci != null) {
+ if (Objects.equals(vciDto.getRegistryId(), vci.getRegistryIdentifier())
+ && Objects.equals(vciDto.getBucketId(), vci.getBucketIdentifier())
+ && Objects.equals(vciDto.getFlowId(), vci.getFlowIdentifier())) {
+
+ throw new IllegalStateException("Cannot import the specified Versioned Flow into the Process Group because doing so would cause a recursive dataflow. "
+ + "If Process Group A contains Process Group B, then Process Group B is not allowed to contain the flow identified by Process Group A.");
+ }
+ }
+
+ verifyImportProcessGroup(vciDto, group.getParent());
+ }
+
+ @Override
public TemplateDTO createTemplate(final String name, final String description, final String snippetId, final String groupId, final Optional<String> idGenerationSeed) {
// get the specified snippet
final Snippet snippet = snippetDAO.getSnippet(snippetId);
http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 7b753d6..a3bb5b2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -1641,6 +1641,10 @@ public class ProcessGroupResource extends ApplicationResource {
// Step 6: Replicate the request or call serviceFacade.updateProcessGroup
final VersionControlInformationDTO versionControlInfo = requestProcessGroupEntity.getComponent().getVersionControlInformation();
+ if (versionControlInfo != null) {
+ serviceFacade.verifyImportProcessGroup(versionControlInfo, groupId);
+ }
+
if (versionControlInfo != null && requestProcessGroupEntity.getVersionedFlowSnapshot() == null) {
// Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail.
// Step 2: Retrieve flow from Flow Registry
@@ -1685,12 +1689,8 @@ public class ProcessGroupResource extends ApplicationResource {
}
}
},
- () -> {
- final VersionedFlowSnapshot versionedFlowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot();
- if (versionedFlowSnapshot != null) {
- serviceFacade.verifyComponentTypes(versionedFlowSnapshot.getFlowContents());
- }
- },
+ () -> {
+ },
processGroupEntity -> {
final ProcessGroupDTO processGroup = processGroupEntity.getComponent();
http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
index 245713e..6dd641b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
@@ -424,15 +424,24 @@ public class VersionsResource extends ApplicationResource {
if (versionedFlowDto == null) {
throw new IllegalArgumentException("Version Control Information must be supplied.");
}
- if (versionedFlowDto.getBucketId() == null) {
+ if (StringUtils.isEmpty(versionedFlowDto.getBucketId())) {
throw new IllegalArgumentException("The Bucket ID must be supplied.");
}
- if (versionedFlowDto.getFlowName() == null && versionedFlowDto.getFlowId() == null) {
+ if (StringUtils.isEmpty(versionedFlowDto.getFlowName()) && StringUtils.isEmpty(versionedFlowDto.getFlowId())) {
throw new IllegalArgumentException("The Flow Name or Flow ID must be supplied.");
}
- if (versionedFlowDto.getRegistryId() == null) {
+ if (versionedFlowDto.getFlowName().length() > 1000) {
+ throw new IllegalArgumentException("The Flow Name cannot exceed 1,000 characters");
+ }
+ if (StringUtils.isEmpty(versionedFlowDto.getRegistryId())) {
throw new IllegalArgumentException("The Registry ID must be supplied.");
}
+ if (versionedFlowDto.getDescription() != null && versionedFlowDto.getDescription().length() > 65535) {
+ throw new IllegalArgumentException("Flow Description cannot exceed 65,535 characters");
+ }
+ if (versionedFlowDto.getComments() != null && versionedFlowDto.getComments().length() > 65535) {
+ throw new IllegalArgumentException("Comments cannot exceed 65,535 characters");
+ }
// ensure we're not attempting to version the root group
final ProcessGroupEntity root = serviceFacade.getProcessGroup(FlowController.ROOT_GROUP_ID_ALIAS);
http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 8e5974a..7a1442d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -1792,7 +1792,7 @@ public final class DtoFactory {
componentDto.setId(processorDto.getId());
componentDto.setName(processorDto.getName());
componentDto.setProcessGroupId(processorDto.getParentGroupId());
- componentDto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+ componentDto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
componentDto.setState(processorDto.getState());
componentDto.setValidationErrors(processorDto.getValidationErrors());
component.setComponent(componentDto);
http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
index c570dfc..941aae0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
@@ -85,6 +85,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
// create the remote process group
RemoteProcessGroup remoteProcessGroup = flowController.createRemoteProcessGroup(remoteProcessGroupDTO.getId(), targetUris);
+ remoteProcessGroup.initialize();
// set other properties
updateRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupDTO);