You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:14:26 UTC

[38/50] nifi git commit: NIFI-4436: Bug fix to ensure that RPG's ports are not removed until after connections are established to the ports; ensure that if a registry's name is changed that it is updated immediately in VersionControlInformation objects

NIFI-4436: Bug fix to ensure that RPG's ports are not removed until after connections are established to the ports; ensure that if a registry's name is changed that it is updated immediately in VersionControlInformation objects

Signed-off-by: Matt Gilman <ma...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/014c542f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/014c542f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/014c542f

Branch: refs/heads/master
Commit: 014c542f48b61c027152129ae7cd8c8535dd6a64
Parents: 49aad2c
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Dec 1 16:31:22 2017 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Jan 8 12:44:55 2018 -0500

----------------------------------------------------------------------
 .../apache/nifi/groups/RemoteProcessGroup.java  |  2 +
 .../apache/nifi/controller/FlowController.java  |  1 +
 .../controller/StandardFlowSynchronizer.java    |  2 +
 .../nifi/groups/StandardProcessGroup.java       | 12 ++-
 .../registry/flow/RestBasedFlowRegistry.java    | 42 +++++++---
 .../nifi/remote/StandardRemoteProcessGroup.java | 88 ++++++++++++--------
 .../org/apache/nifi/web/NiFiServiceFacade.java  | 11 +++
 .../nifi/web/StandardNiFiServiceFacade.java     | 25 ++++++
 .../nifi/web/api/ProcessGroupResource.java      | 12 +--
 .../apache/nifi/web/api/VersionsResource.java   | 15 +++-
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  2 +-
 .../dao/impl/StandardRemoteProcessGroupDAO.java |  1 +
 12 files changed, 156 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index 7d92246..39be045 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit;
 
 public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable, VersionedComponent {
 
+    void initialize();
+
     @Override
     String getIdentifier();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 2afa9dc..158aaa2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -1779,6 +1779,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      */
     public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException {
         instantiateSnippet(group, dto, true);
+        group.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize);
     }
 
     private void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto, final boolean topLevel) throws ProcessorInstantiationException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 28d9b79..9cbf323 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -360,6 +360,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
                         rootGroup = updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion);
                     }
 
+                    rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
+
                     // If there are any Templates that do not exist in the Proposed Flow that do exist in the 'existing flow', we need
                     // to ensure that we also add those to the appropriate Process Groups, so that we don't lose them.
                     final Document existingFlowConfiguration = parseFlowBytes(existingFlow);

http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 4b186a9..fb3d3a6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -2964,6 +2964,13 @@ public final class StandardProcessGroup implements ProcessGroup {
             versionControlInformation.getStatus()) {
 
             @Override
+            public String getRegistryName() {
+                final String registryId = versionControlInformation.getRegistryIdentifier();
+                final FlowRegistry registry = flowController.getFlowRegistryClient().getFlowRegistry(registryId);
+                return registry == null ? registryId : registry.getName();
+            }
+
+            @Override
             public boolean isModified() {
                 boolean updated = false;
                 while (true) {
@@ -3220,7 +3227,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 updated = flowStatus.compareAndSet(status, updatedStatus);
             }
         } catch (final IOException | NiFiRegistryException e) {
-            final String message = String.format("Failed to synchronize Process Group with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry");
+            final String message = String.format("Failed to synchronize Process Group with Flow Registry : " + e.getMessage());
             setSyncFailedState(message);
 
             LOG.error("Failed to synchronize {} with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry", this, e);
@@ -3451,6 +3458,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
             if (childGroup == null) {
                 final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip);
+                added.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize);
                 LOG.info("Added {} to {}", added, this);
             } else if (childCoordinates == null || updateDescendantVersionedGroups) {
                 updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName, updateDescendantVersionedGroups, variablesToSkip);
@@ -3759,7 +3767,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
         final Connectable destination = getConnectable(destinationGroup, proposed.getDestination());
         if (destination == null) {
-            throw new IllegalArgumentException("Connection has a destination with identifier " + proposed.getIdentifier()
+            throw new IllegalArgumentException("Connection has a destination with identifier " + proposed.getDestination().getId()
                 + " but no component could be found in the Process Group with a corresponding identifier");
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
index 1147b9e..21e5e0c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
@@ -115,40 +115,61 @@ public class RestBasedFlowRegistry implements FlowRegistry {
         return (user == null || user.isAnonymous()) ? null : user.getIdentity();
     }
 
+    private BucketClient getBucketClient(final NiFiUser user) {
+        final String identity = getIdentity(user);
+        final NiFiRegistryClient registryClient = getRegistryClient();
+        final BucketClient bucketClient = identity == null ? registryClient.getBucketClient() : registryClient.getBucketClient(identity);
+        return bucketClient;
+    }
+
+    private FlowSnapshotClient getFlowSnapshotClient(final NiFiUser user) {
+        final String identity = getIdentity(user);
+        final NiFiRegistryClient registryClient = getRegistryClient();
+        final FlowSnapshotClient snapshotClient = identity == null ? registryClient.getFlowSnapshotClient() : registryClient.getFlowSnapshotClient(identity);
+        return snapshotClient;
+    }
+
+    private FlowClient getFlowClient(final NiFiUser user) {
+        final String identity = getIdentity(user);
+        final NiFiRegistryClient registryClient = getRegistryClient();
+        final FlowClient flowClient = identity == null ? registryClient.getFlowClient() : registryClient.getFlowClient(identity);
+        return flowClient;
+    }
+
     @Override
     public Set<Bucket> getBuckets(final NiFiUser user) throws IOException, NiFiRegistryException {
-        final BucketClient bucketClient = getRegistryClient().getBucketClient(getIdentity(user));
+        final BucketClient bucketClient = getBucketClient(user);
         return new HashSet<>(bucketClient.getAll());
     }
 
     @Override
     public Bucket getBucket(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
-        final BucketClient bucketClient = getRegistryClient().getBucketClient(getIdentity(user));
+        final BucketClient bucketClient = getBucketClient(user);
         return bucketClient.get(bucketId);
     }
 
 
     @Override
     public Set<VersionedFlow> getFlows(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
-        final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
+        final FlowClient flowClient = getFlowClient(user);
         return new HashSet<>(flowClient.getByBucket(bucketId));
     }
 
     @Override
     public Set<VersionedFlowSnapshotMetadata> getFlowVersions(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
-        final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user));
+        final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(user);
         return new HashSet<>(snapshotClient.getSnapshotMetadata(bucketId, flowId));
     }
 
     @Override
     public VersionedFlow registerVersionedFlow(final VersionedFlow flow, final NiFiUser user) throws IOException, NiFiRegistryException {
-        final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
+        final FlowClient flowClient = getFlowClient(user);
         return flowClient.create(flow);
     }
 
     @Override
     public VersionedFlow deleteVersionedFlow(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
-        final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
+        final FlowClient flowClient = getFlowClient(user);
         return flowClient.delete(bucketId, flowId);
     }
 
@@ -156,7 +177,7 @@ public class RestBasedFlowRegistry implements FlowRegistry {
     public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot,
         final String comments, final int expectedVersion, final NiFiUser user) throws IOException, NiFiRegistryException {
 
-        final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user));
+        final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(user);
         final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot();
         versionedFlowSnapshot.setFlowContents(snapshot);
 
@@ -174,13 +195,14 @@ public class RestBasedFlowRegistry implements FlowRegistry {
 
     @Override
     public int getLatestVersion(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
-        return (int) getRegistryClient().getFlowClient(getIdentity(user)).get(bucketId, flowId).getVersionCount();
+        return (int) getFlowClient(user).get(bucketId, flowId).getVersionCount();
     }
 
     @Override
     public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows, final NiFiUser user)
             throws IOException, NiFiRegistryException {
-        final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user));
+
+        final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(user);
         final VersionedFlowSnapshot flowSnapshot = snapshotClient.get(bucketId, flowId, version);
 
         if (fetchRemoteFlows) {
@@ -241,7 +263,7 @@ public class RestBasedFlowRegistry implements FlowRegistry {
 
     @Override
     public VersionedFlow getVersionedFlow(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
-        final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
+        final FlowClient flowClient = getFlowClient(user);
         return flowClient.get(bucketId, flowId);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index ef05a1b..5808500 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -16,6 +16,40 @@
  */
 package org.apache.nifi.remote;
 
+import static java.util.Objects.requireNonNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.core.Response;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.resource.Authorizable;
@@ -34,7 +68,6 @@ import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.groups.ProcessGroupCounts;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroupCounts;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
@@ -51,39 +84,6 @@ import org.apache.nifi.web.api.dto.PortDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
-import javax.ws.rs.core.Response;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static java.util.Objects.requireNonNull;
-
 /**
  * Represents the Root Process Group of a remote NiFi Instance. Holds
  * information about that remote instance, as well as {@link IncomingPort}s and
@@ -104,6 +104,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
     private final EventReporter eventReporter;
     private final NiFiProperties nifiProperties;
     private final long remoteContentsCacheExpiration;
+    private volatile boolean initialized = false;
 
     private final AtomicReference<String> name = new AtomicReference<>();
     private final AtomicReference<Position> position = new AtomicReference<>(new Position(0D, 0D));
@@ -179,7 +180,16 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
 
         final Runnable checkAuthorizations = new InitializationTask();
         backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id, true);
-        backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 5L, 30L, TimeUnit.SECONDS);
+        backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 30L, 30L, TimeUnit.SECONDS);
+    }
+
+    @Override
+    public void initialize() {
+        if (initialized) {
+            return;
+        }
+
+        initialized = true;
         backgroundThreadExecutor.submit(() -> {
             try {
                 refreshFlowContents();
@@ -820,6 +830,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
 
     @Override
     public void refreshFlowContents() throws CommunicationsException {
+        if (!initialized) {
+            return;
+        }
+
         try {
             // perform the request
             final ControllerDTO dto;
@@ -1153,6 +1167,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
 
         @Override
         public void run() {
+            if (!initialized) {
+                return;
+            }
+
             try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()) {
                 try {
                     final ControllerDTO dto = apiClient.getController(targetUris);

http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 02df16b..be77d10 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -418,6 +418,17 @@ public interface NiFiServiceFacade {
     void verifyComponentTypes(VersionedProcessGroup versionedGroup);
 
     /**
+     * Verifies that the flow identified by the given Version Control Information can be imported into the Process Group
+     * with the given id
+     *
+     * @param versionControlInfo the information about the versioned flow
+     * @param groupId the ID of the Process Group where the flow should be instantiated
+     *
+     * @throws IllegalStateException if the flow cannot be imported into the specified group
+     */
+    void verifyImportProcessGroup(VersionControlInformationDTO versionControlInfo, String groupId);
+
+    /**
      * Creates a new Template based off the specified snippet.
      *
      * @param name name

http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 4945296..4adb85b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -1863,6 +1863,31 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
+    public void verifyImportProcessGroup(final VersionControlInformationDTO versionControlInfo, final String groupId) {
+        final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
+        verifyImportProcessGroup(versionControlInfo, group);
+    }
+
+    private void verifyImportProcessGroup(final VersionControlInformationDTO vciDto, final ProcessGroup group) {
+        if (group == null) {
+            return;
+        }
+
+        final VersionControlInformation vci = group.getVersionControlInformation();
+        if (vci != null) {
+            if (Objects.equals(vciDto.getRegistryId(), vci.getRegistryIdentifier())
+                && Objects.equals(vciDto.getBucketId(), vci.getBucketIdentifier())
+                && Objects.equals(vciDto.getFlowId(), vci.getFlowIdentifier())) {
+
+                throw new IllegalStateException("Cannot import the specified Versioned Flow into the Process Group because doing so would cause a recursive dataflow. "
+                    + "If Process Group A contains Process Group B, then Process Group B is not allowed to contain the flow identified by Process Group A.");
+            }
+        }
+
+        verifyImportProcessGroup(vciDto, group.getParent());
+    }
+
+    @Override
     public TemplateDTO createTemplate(final String name, final String description, final String snippetId, final String groupId, final Optional<String> idGenerationSeed) {
         // get the specified snippet
         final Snippet snippet = snippetDAO.getSnippet(snippetId);

http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 7b753d6..a3bb5b2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -1641,6 +1641,10 @@ public class ProcessGroupResource extends ApplicationResource {
         // Step 6: Replicate the request or call serviceFacade.updateProcessGroup
 
         final VersionControlInformationDTO versionControlInfo = requestProcessGroupEntity.getComponent().getVersionControlInformation();
+        if (versionControlInfo != null) {
+            serviceFacade.verifyImportProcessGroup(versionControlInfo, groupId);
+        }
+
         if (versionControlInfo != null && requestProcessGroupEntity.getVersionedFlowSnapshot() == null) {
             // Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail.
             // Step 2: Retrieve flow from Flow Registry
@@ -1685,12 +1689,8 @@ public class ProcessGroupResource extends ApplicationResource {
                         }
                     }
                 },
-                () -> {
-                    final VersionedFlowSnapshot versionedFlowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot();
-                    if (versionedFlowSnapshot != null) {
-                        serviceFacade.verifyComponentTypes(versionedFlowSnapshot.getFlowContents());
-                    }
-                },
+            () -> {
+            },
                 processGroupEntity -> {
                     final ProcessGroupDTO processGroup = processGroupEntity.getComponent();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
index 245713e..6dd641b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
@@ -424,15 +424,24 @@ public class VersionsResource extends ApplicationResource {
         if (versionedFlowDto == null) {
             throw new IllegalArgumentException("Version Control Information must be supplied.");
         }
-        if (versionedFlowDto.getBucketId() == null) {
+        if (StringUtils.isEmpty(versionedFlowDto.getBucketId())) {
             throw new IllegalArgumentException("The Bucket ID must be supplied.");
         }
-        if (versionedFlowDto.getFlowName() == null && versionedFlowDto.getFlowId() == null) {
+        if (StringUtils.isEmpty(versionedFlowDto.getFlowName()) && StringUtils.isEmpty(versionedFlowDto.getFlowId())) {
             throw new IllegalArgumentException("The Flow Name or Flow ID must be supplied.");
         }
-        if (versionedFlowDto.getRegistryId() == null) {
+        if (versionedFlowDto.getFlowName().length() > 1000) {
+            throw new IllegalArgumentException("The Flow Name cannot exceed 1,000 characters");
+        }
+        if (StringUtils.isEmpty(versionedFlowDto.getRegistryId())) {
             throw new IllegalArgumentException("The Registry ID must be supplied.");
         }
+        if (versionedFlowDto.getDescription() != null && versionedFlowDto.getDescription().length() > 65535) {
+            throw new IllegalArgumentException("Flow Description cannot exceed 65,535 characters");
+        }
+        if (versionedFlowDto.getComments() != null && versionedFlowDto.getComments().length() > 65535) {
+            throw new IllegalArgumentException("Comments cannot exceed 65,535 characters");
+        }
 
         // ensure we're not attempting to version the root group
         final ProcessGroupEntity root = serviceFacade.getProcessGroup(FlowController.ROOT_GROUP_ID_ALIAS);

http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 8e5974a..7a1442d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -1792,7 +1792,7 @@ public final class DtoFactory {
         componentDto.setId(processorDto.getId());
         componentDto.setName(processorDto.getName());
         componentDto.setProcessGroupId(processorDto.getParentGroupId());
-        componentDto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+        componentDto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
         componentDto.setState(processorDto.getState());
         componentDto.setValidationErrors(processorDto.getValidationErrors());
         component.setComponent(componentDto);

http://git-wip-us.apache.org/repos/asf/nifi/blob/014c542f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
index c570dfc..941aae0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
@@ -85,6 +85,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
 
         // create the remote process group
         RemoteProcessGroup remoteProcessGroup = flowController.createRemoteProcessGroup(remoteProcessGroupDTO.getId(), targetUris);
+        remoteProcessGroup.initialize();
 
         // set other properties
         updateRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupDTO);