You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:14:23 UTC
[35/50] nifi git commit: NIFI-4436: Bug fixes;
ensure correct Exception types are thrown
NIFI-4436: Bug fixes; ensure correct Exception types are thrown
Signed-off-by: Matt Gilman <ma...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/181d6809
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/181d6809
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/181d6809
Branch: refs/heads/master
Commit: 181d6809c126da862417e79fb5d794ed5f8eefac
Parents: 1266235
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Dec 11 15:36:56 2017 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Jan 8 12:44:55 2018 -0500
----------------------------------------------------------------------
.../manager/ProcessGroupEntityMerger.java | 3 +
.../org/apache/nifi/groups/ProcessGroup.java | 2 +-
.../apache/nifi/registry/flow/FlowRegistry.java | 1 -
.../apache/nifi/controller/FlowController.java | 14 ++-
.../nifi/groups/StandardProcessGroup.java | 50 +++++++---
.../flow/StandardFlowRegistryClient.java | 11 ++-
.../java/org/apache/nifi/util/SnippetUtils.java | 99 ++++++++++++++++++++
.../StandardFlowSynchronizerSpec.groovy | 2 +
.../service/mock/MockProcessGroup.java | 2 +-
.../org/apache/nifi/web/NiFiServiceFacade.java | 22 ++---
.../nifi/web/StandardNiFiServiceFacade.java | 72 +++++++++-----
.../nifi/web/api/ProcessGroupResource.java | 9 +-
.../apache/nifi/web/api/VersionsResource.java | 4 +-
.../dao/impl/StandardControllerServiceDAO.java | 30 +++---
.../web/dao/impl/StandardProcessGroupDAO.java | 2 +-
.../org/apache/nifi/web/util/SnippetUtils.java | 1 -
16 files changed, 246 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/181d6809/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java
index d2eb749..d74fdeb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java
@@ -48,6 +48,9 @@ public class ProcessGroupEntityMerger implements ComponentEntityMerger<ProcessGr
private void mergeVersionControlInformation(ProcessGroupEntity targetGroup, ProcessGroupEntity toMerge) {
final ProcessGroupDTO targetGroupDto = targetGroup.getComponent();
final ProcessGroupDTO toMergeGroupDto = toMerge.getComponent();
+ if (targetGroupDto == null || toMergeGroupDto == null) {
+ return;
+ }
final VersionControlInformationDTO targetVersionControl = targetGroupDto.getVersionControlInformation();
final VersionControlInformationDTO toMergeVersionControl = toMergeGroupDto.getVersionControlInformation();
http://git-wip-us.apache.org/repos/asf/nifi/blob/181d6809/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index 17131dd..3f32580 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -965,7 +965,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
/**
* Disconnects this Process Group from version control. If not currently under version control, this method does nothing.
*/
- void disconnectVersionControl();
+ void disconnectVersionControl(boolean removeVersionedComponentIds);
/**
* Synchronizes the Process Group with the given Flow Registry, determining whether or not the local flow
http://git-wip-us.apache.org/repos/asf/nifi/blob/181d6809/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
index ae43bb5..b883274 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
@@ -212,6 +212,5 @@ public interface FlowRegistry {
* @throws IOException if unable to communicate with the Flow Registry
* @throws NiFiRegistryException if unable to find a flow with the given bucket ID and flow ID
*/
- // TODO: Do we still need this?
VersionedFlow getVersionedFlow(String bucketId, String flowId) throws IOException, NiFiRegistryException;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/181d6809/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 158aaa2..f7c2545 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -234,6 +234,7 @@ import org.apache.nifi.util.ComponentIdGenerator;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
+import org.apache.nifi.util.SnippetUtils;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.BatchSettingsDTO;
import org.apache.nifi.web.api.dto.BundleDTO;
@@ -2372,26 +2373,29 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* </p>
*
* @param group group
- * @param templateContents contents
+ * @param snippetContents contents
*/
- private void validateSnippetContents(final ProcessGroup group, final FlowSnippetDTO templateContents) {
+ private void validateSnippetContents(final ProcessGroup group, final FlowSnippetDTO snippetContents) {
// validate the names of Input Ports
- for (final PortDTO port : templateContents.getInputPorts()) {
+ for (final PortDTO port : snippetContents.getInputPorts()) {
if (group.getInputPortByName(port.getName()) != null) {
throw new IllegalStateException("One or more of the proposed Port names is not available in the process group");
}
}
// validate the names of Output Ports
- for (final PortDTO port : templateContents.getOutputPorts()) {
+ for (final PortDTO port : snippetContents.getOutputPorts()) {
if (group.getOutputPortByName(port.getName()) != null) {
throw new IllegalStateException("One or more of the proposed Port names is not available in the process group");
}
}
- verifyComponentTypesInSnippet(templateContents);
+ verifyComponentTypesInSnippet(snippetContents);
+
+ SnippetUtils.verifyNoVersionControlConflicts(snippetContents, group);
}
+
/**
* Recursively finds all ConnectionDTO's
*
http://git-wip-us.apache.org/repos/asf/nifi/blob/181d6809/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 7d184df..5d5d0f4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -112,6 +112,7 @@ import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
+import org.apache.nifi.util.SnippetUtils;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.slf4j.Logger;
@@ -2294,6 +2295,7 @@ public final class StandardProcessGroup implements ProcessGroup {
try {
verifyContents(snippet);
verifyDestinationNotInSnippet(snippet, destination);
+ SnippetUtils.verifyNoVersionControlConflicts(snippet, this, destination);
if (!isDisconnected(snippet)) {
throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved.");
@@ -3087,13 +3089,15 @@ public final class StandardProcessGroup implements ProcessGroup {
}
@Override
- public void disconnectVersionControl() {
+ public void disconnectVersionControl(final boolean removeVersionedComponentIds) {
writeLock.lock();
try {
this.versionControlInfo.set(null);
- // remove version component ids from each component (until another versioned PG is encountered)
- applyVersionedComponentIds(this, id -> null);
+ if (removeVersionedComponentIds) {
+ // remove version component ids from each component (until another versioned PG is encountered)
+ applyVersionedComponentIds(this, id -> null);
+ }
} finally {
writeLock.unlock();
}
@@ -3278,7 +3282,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final Set<String> knownVariables = getKnownVariableNames();
updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings, updateDescendantVersionedFlows, knownVariables);
} catch (final ProcessorInstantiationException pie) {
- throw new RuntimeException(pie);
+ throw new IllegalStateException("Failed to update flow", pie);
} finally {
writeLock.unlock();
}
@@ -3366,7 +3370,9 @@ public final class StandardProcessGroup implements ProcessGroup {
group.setVariables(updatedVariableMap);
final VersionedFlowCoordinates remoteCoordinates = proposed.getVersionedFlowCoordinates();
- if (remoteCoordinates != null) {
+ if (remoteCoordinates == null) {
+ group.disconnectVersionControl(false);
+ } else {
final String registryId = flowController.getFlowRegistryClient().getFlowRegistryId(remoteCoordinates.getRegistryUrl());
final String bucketId = remoteCoordinates.getBucketId();
final String flowId = remoteCoordinates.getFlowId();
@@ -3681,8 +3687,6 @@ public final class StandardProcessGroup implements ProcessGroup {
}
private String generateUuid(final String propposedId, final String destinationGroupId, final String seed) {
- // TODO: I think we can get rid of all of those LinkedHashSet's now in the VersionedProcessGroup because
- /// the UUID is properly keyed off of the ID of the component in the VersionedProcessGroup.
long msb = UUID.nameUUIDFromBytes((propposedId + destinationGroupId).getBytes(StandardCharsets.UTF_8)).getMostSignificantBits();
UUID uuid;
@@ -3733,7 +3737,7 @@ public final class StandardProcessGroup implements ProcessGroup {
try {
return flowController.createPrioritizer(prioritizerName);
} catch (final Exception e) {
- throw new RuntimeException("Failed to create Prioritizer of type " + prioritizerName + " for Connection with ID " + connection.getIdentifier());
+ throw new IllegalStateException("Failed to create Prioritizer of type " + prioritizerName + " for Connection with ID " + connection.getIdentifier());
}
})
.collect(Collectors.toList());
@@ -4016,7 +4020,14 @@ public final class StandardProcessGroup implements ProcessGroup {
// The value given is instead the Versioned Component ID of the Controller Service. We want to resolve this
// to the instance ID of the Controller Service.
final String serviceVersionedComponentId = entry.getValue();
- final String instanceId = getServiceInstanceId(serviceVersionedComponentId, group);
+ String instanceId = getServiceInstanceId(serviceVersionedComponentId, group);
+ if (instanceId == null) {
+ // We didn't find the instance ID based on the Versioned Component ID. So we want to just
+ // leave the value set to whatever it currently is, if it's currently set.
+ final PropertyDescriptor propertyDescriptor = new PropertyDescriptor.Builder().name(entry.getKey()).build();
+ instanceId = currentProperties.get(propertyDescriptor);
+ }
+
value = instanceId == null ? serviceVersionedComponentId : instanceId;
} else {
value = entry.getValue();
@@ -4169,15 +4180,22 @@ public final class StandardProcessGroup implements ProcessGroup {
+ " reverted to its original form before changing the version.");
}
}
+
+ verifyNoDescendantsWithLocalModifications("be updated");
}
final VersionedProcessGroup flowContents = updatedFlow.getFlowContents();
if (verifyConnectionRemoval) {
// Determine which Connections have been removed.
final Map<String, Connection> removedConnectionByVersionedId = new HashMap<>();
+
+ // Populate the 'removedConnectionByVersionId' map with all Connections. We key off of the connection's VersionedComponentID
+ // if it is populated. Otherwise, we key off of its actual ID. We do this because it allows us to then remove from this Map
+ // any connection that does exist in the proposed flow. This results in us having a Map whose values are those Connections
+ // that were removed. We can then check for any connections that have data in them. If any Connection is to be removed but
+ // has data, then we should throw an IllegalStateException.
findAllConnections().stream()
- .filter(conn -> conn.getVersionedComponentId().isPresent())
- .forEach(conn -> removedConnectionByVersionedId.put(conn.getVersionedComponentId().get(), conn));
+ .forEach(conn -> removedConnectionByVersionedId.put(conn.getVersionedComponentId().orElse(conn.getIdentifier()), conn));
final Set<String> proposedFlowConnectionIds = new HashSet<>();
findAllConnectionIds(flowContents, proposedFlowConnectionIds);
@@ -4252,8 +4270,8 @@ public final class StandardProcessGroup implements ProcessGroup {
if (!proposedProcessGroups.containsKey(versionedId)) {
// Process Group was removed.
throw new IllegalStateException(this + " cannot be updated to the proposed version of the flow because the child " + childGroup
- + " that exists locally has one or more Templates, and the proposed flow does not contain this Process Group. "
- + "A Process Group cannot be deleted while it contains Templates. Please remove the Templates before attempting to chnage the version of the flow.");
+ + " that exists locally has one or more Templates, and the proposed flow does not contain these templates. "
+ + "A Process Group cannot be deleted while it contains Templates. Please remove the Templates before attempting to change the version of the flow.");
}
}
@@ -4430,6 +4448,12 @@ public final class StandardProcessGroup implements ProcessGroup {
+ "has local modifications. Each descendant Process Group that is under Version Control must first be reverted or have its changes pushed to the Flow Registry before "
+ "this action can be performed on the parent Process Group.");
}
+
+ if (flowState == VersionedFlowState.SYNC_FAILURE) {
+ throw new IllegalStateException("Process Group cannot " + action + " because it contains a child or descendant Process Group that is under Version Control and "
+ + "is not synchronized with the Flow Registry. Each descendant Process Group must first be synchronized with the Flow Registry before this action can be "
+ + "performed on the parent Process Group. NiFi will continue to attempt to communicate with the Flow Registry periodically in the background.");
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/181d6809/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java
index 8a2447d..4f98a2b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java
@@ -43,6 +43,13 @@ public class StandardFlowRegistryClient implements FlowRegistryClient {
@Override
public void addFlowRegistry(final FlowRegistry registry) {
+ final boolean duplicateName = registryById.values().stream()
+ .anyMatch(reg -> reg.getName().equals(registry.getName()));
+
+ if (duplicateName) {
+ throw new IllegalStateException("Cannot add Flow Registry because a Flow Registry already exists with the name " + registry.getName());
+ }
+
final FlowRegistry existing = registryById.putIfAbsent(registry.getIdentifier(), registry);
if (existing != null) {
throw new IllegalStateException("Cannot add Flow Registry " + registry + " because a Flow Registry already exists with the ID " + registry.getIdentifier());
@@ -58,7 +65,7 @@ public class StandardFlowRegistryClient implements FlowRegistryClient {
if (uriScheme.equalsIgnoreCase("http") || uriScheme.equalsIgnoreCase("https")) {
final SSLContext sslContext = SslContextFactory.createSslContext(nifiProperties, false);
if (sslContext == null && uriScheme.equalsIgnoreCase("https")) {
- throw new RuntimeException("Failed to create Flow Registry for URI " + registryUrl
+ throw new IllegalStateException("Failed to create Flow Registry for URI " + registryUrl
+ " because this NiFi is not configured with a Keystore/Truststore, so it is not capable of communicating with a secure Registry. "
+ "Please populate NiFi's Keystore/Truststore properties or connect to a NiFi Registry over http instead of https.");
}
@@ -68,7 +75,7 @@ public class StandardFlowRegistryClient implements FlowRegistryClient {
} else if (uriScheme.equalsIgnoreCase("http") || uriScheme.equalsIgnoreCase("https")) {
final SSLContext sslContext = SslContextFactory.createSslContext(nifiProperties, false);
if (sslContext == null && uriScheme.equalsIgnoreCase("https")) {
- throw new RuntimeException("Failed to create Flow Registry for URI " + registryUrl
+ throw new IllegalStateException("Failed to create Flow Registry for URI " + registryUrl
+ " because this NiFi is not configured with a Keystore/Truststore, so it is not capable of communicating with a secure Registry. "
+ "Please populate NiFi's Keystore/Truststore properties or connect to a NiFi Registry over http instead of https.");
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/181d6809/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/SnippetUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/SnippetUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/SnippetUtils.java
index b482169..9c04559 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/SnippetUtils.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/SnippetUtils.java
@@ -16,11 +16,15 @@
*/
package org.apache.nifi.util;
+import org.apache.nifi.controller.Snippet;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.web.api.dto.ComponentDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import java.util.ArrayList;
import java.util.Collection;
@@ -307,4 +311,99 @@ public final class SnippetUtils {
connection.setBends(bends);
}
}
+
+ public static void verifyNoVersionControlConflicts(final Snippet snippet, final ProcessGroup parentGroup, final ProcessGroup destination) {
+ if (snippet == null) {
+ return;
+ }
+ if (snippet.getProcessGroups() == null) {
+ return;
+ }
+
+ final List<VersionControlInformation> vcis = new ArrayList<>();
+ for (final String groupId : snippet.getProcessGroups().keySet()) {
+ final ProcessGroup group = parentGroup.getProcessGroup(groupId);
+ if (group != null) {
+ findAllVersionControlInfo(group, vcis);
+ }
+ }
+
+ verifyNoDuplicateVersionControlInfo(destination, vcis);
+ }
+
+ public static void verifyNoVersionControlConflicts(final FlowSnippetDTO snippetContents, final ProcessGroup destination) {
+ final List<VersionControlInformationDTO> vcis = new ArrayList<>();
+ for (final ProcessGroupDTO childGroup : snippetContents.getProcessGroups()) {
+ findAllVersionControlInfo(childGroup, vcis);
+ }
+
+ verifyNoDuplicateVersionControlInfoDtos(destination, vcis);
+ }
+
+ private static void verifyNoDuplicateVersionControlInfoDtos(final ProcessGroup group, final Collection<VersionControlInformationDTO> snippetVcis) {
+ final VersionControlInformation vci = group.getVersionControlInformation();
+ if (vci != null) {
+ for (final VersionControlInformationDTO snippetVci : snippetVcis) {
+ if (vci.getBucketIdentifier().equals(snippetVci.getBucketId()) && vci.getFlowIdentifier().equals(snippetVci.getFlowId())) {
+ throw new IllegalArgumentException("Cannot place the given Process Group into the desired destination because the destination group or one of its ancestor groups is "
+ + "under Version Control and one of the selected Process Groups is also under Version Control with the same Flow. A Process Group that is under Version Control "
+ + "cannot contain a child Process Group that points to the same Versioned Flow.");
+ }
+ }
+ }
+
+ final ProcessGroup parent = group.getParent();
+ if (parent != null) {
+ verifyNoDuplicateVersionControlInfoDtos(parent, snippetVcis);
+ }
+ }
+
+ private static void verifyNoDuplicateVersionControlInfo(final ProcessGroup group, final Collection<VersionControlInformation> snippetVcis) {
+ final VersionControlInformation vci = group.getVersionControlInformation();
+ if (vci != null) {
+ for (final VersionControlInformation snippetVci : snippetVcis) {
+ if (vci.getBucketIdentifier().equals(snippetVci.getBucketIdentifier()) && vci.getFlowIdentifier().equals(snippetVci.getFlowIdentifier())) {
+ throw new IllegalArgumentException("Cannot place the given Process Group into the desired destination because the destination group or one of its ancestor groups is "
+ + "under Version Control and one of the selected Process Groups is also under Version Control with the same Flow. A Process Group that is under Version Control "
+ + "cannot contain a child Process Group that points to the same Versioned Flow.");
+ }
+ }
+ }
+
+ final ProcessGroup parent = group.getParent();
+ if (parent != null) {
+ verifyNoDuplicateVersionControlInfo(parent, snippetVcis);
+ }
+ }
+
+
+ private static void findAllVersionControlInfo(final ProcessGroupDTO dto, final List<VersionControlInformationDTO> found) {
+ final VersionControlInformationDTO vci = dto.getVersionControlInformation();
+ if (vci != null) {
+ found.add(vci);
+ }
+
+ final FlowSnippetDTO contents = dto.getContents();
+ if (contents != null) {
+ for (final ProcessGroupDTO child : contents.getProcessGroups()) {
+ findAllVersionControlInfo(child, found);
+ }
+ }
+ }
+
+ private static void findAllVersionControlInfo(final ProcessGroup group, final List<VersionControlInformation> found) {
+ if (group == null) {
+ return;
+ }
+
+ final VersionControlInformation vci = group.getVersionControlInformation();
+ if (vci != null) {
+ found.add(vci);
+ }
+
+ for (final ProcessGroup childGroup : group.findAllProcessGroups()) {
+ findAllVersionControlInfo(childGroup, found);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/181d6809/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy
index 7483228..897d77e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy
@@ -127,6 +127,8 @@ class StandardFlowSynchronizerSpec extends Specification {
}
}
}
+ _ * processGroup.findAllRemoteProcessGroups() >> []
+
positionableMocksById.put(pgId, processGroup)
return processGroup
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/181d6809/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index d006cff..95e2d6a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -667,7 +667,7 @@ public class MockProcessGroup implements ProcessGroup {
}
@Override
- public void disconnectVersionControl() {
+ public void disconnectVersionControl(final boolean removeVersionedComponentIds) {
this.versionControlInfo = null;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/181d6809/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 514cd18..6c20eac 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -23,7 +23,6 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
@@ -117,7 +116,6 @@ import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
import org.apache.nifi.web.api.entity.VersionedFlowEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
-import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -423,11 +421,12 @@ public interface NiFiServiceFacade {
* with the given id
*
* @param versionControlInfo the information about the versioned flow
+ * @param versionedProcessGroup the contents to be imported
* @param groupId the ID of the Process Group where the flow should be instantiated
*
* @throws IllegalStateException if the flow cannot be imported into the specified group
*/
- void verifyImportProcessGroup(VersionControlInformationDTO versionControlInfo, String groupId);
+ void verifyImportProcessGroup(VersionControlInformationDTO versionControlInfo, final VersionedProcessGroup versionedProcessGroup, String groupId);
/**
* Creates a new Template based off the specified snippet.
@@ -1295,7 +1294,7 @@ public interface NiFiServiceFacade {
* was last synchronized with the Flow Registry
* @throws IllegalStateException if the Process Group with the given ID is not under version control
*/
- FlowComparisonEntity getLocalModifications(String processGroupId) throws IOException, NiFiRegistryException;
+ FlowComparisonEntity getLocalModifications(String processGroupId);
/**
* Returns the Version Control information for the Process Group with the given ID
@@ -1314,9 +1313,9 @@ public interface NiFiServiceFacade {
* @param flow the flow to add to the registry
* @return a VersionedFlow that is fully populated, including identifiers
*
- * @throws IOException if unable to communicate with the Flow Registry
+ * @throws NiFiCoreException if unable to register flow
*/
- VersionedFlow registerVersionedFlow(String registryId, VersionedFlow flow) throws IOException, NiFiRegistryException;
+ VersionedFlow registerVersionedFlow(String registryId, VersionedFlow flow);
/**
* Creates a snapshot of the Process Group with the given identifier, then creates a new Flow entity in the NiFi Registry
@@ -1337,7 +1336,7 @@ public interface NiFiServiceFacade {
* @param flowId the ID of the flow
* @return the VersionedFlow that was deleted
*/
- VersionedFlow deleteVersionedFlow(String registryId, String bucketId, String flowId) throws IOException, NiFiRegistryException;
+ VersionedFlow deleteVersionedFlow(String registryId, String bucketId, String flowId);
/**
* Adds the given snapshot to the already existing Versioned Flow, which resides in the given Flow Registry with the given id
@@ -1349,10 +1348,9 @@ public interface NiFiServiceFacade {
* @param expectedVersion the version to save the flow as
* @return the snapshot that represents what was stored in the registry
*
- * @throws IOException if unable to communicate with the Flow Registry
+ * @throws NiFiCoreException if unable to register the snapshot with the flow registry
*/
- VersionedFlowSnapshot registerVersionedFlowSnapshot(String registryId, VersionedFlow flow, VersionedProcessGroup snapshot, String comments, int expectedVersion)
- throws IOException, NiFiRegistryException;
+ VersionedFlowSnapshot registerVersionedFlowSnapshot(String registryId, VersionedFlow flow, VersionedProcessGroup snapshot, String comments, int expectedVersion);
/**
* Updates the Version Control Information on the Process Group with the given ID
@@ -1386,7 +1384,7 @@ public interface NiFiServiceFacade {
*
* @throws ResourceNotFoundException if the Versioned Flow Snapshot could not be found
*/
- VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo, boolean fetchRemoteFlows) throws IOException;
+ VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo, boolean fetchRemoteFlows);
/**
* Returns the name of the Flow Registry that is registered with the given ID. If no Flow Registry exists with the given ID, will return
@@ -1406,7 +1404,7 @@ public interface NiFiServiceFacade {
* @param user the user making the request
* @return the set of all components that would be affected by updating the Process Group
*/
- Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(String processGroupId, VersionedFlowSnapshot updatedSnapshot, NiFiUser user) throws IOException;
+ Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(String processGroupId, VersionedFlowSnapshot updatedSnapshot, NiFiUser user);
/**
* Verifies that the Process Group with the given identifier can be updated to the proposed flow
http://git-wip-us.apache.org/repos/asf/nifi/blob/181d6809/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index a2d6e41..11bc39d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -95,6 +95,7 @@ import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedComponent;
import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedFlowState;
@@ -1866,20 +1867,21 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public void verifyImportProcessGroup(final VersionControlInformationDTO versionControlInfo, final String groupId) {
+ public void verifyImportProcessGroup(final VersionControlInformationDTO versionControlInfo, final VersionedProcessGroup contents, final String groupId) {
final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
- verifyImportProcessGroup(versionControlInfo, group);
+ verifyImportProcessGroup(versionControlInfo, contents, group);
}
- private void verifyImportProcessGroup(final VersionControlInformationDTO vciDto, final ProcessGroup group) {
+ private void verifyImportProcessGroup(final VersionControlInformationDTO vciDto, final VersionedProcessGroup contents, final ProcessGroup group) {
if (group == null) {
return;
}
final VersionControlInformation vci = group.getVersionControlInformation();
if (vci != null) {
- if (Objects.equals(vciDto.getRegistryId(), vci.getRegistryIdentifier())
- && Objects.equals(vciDto.getBucketId(), vci.getBucketIdentifier())
+ // Note that we do not compare the Registry ID here because there could be two registry clients
+ // that point to the same server (one could point to localhost while another points to 127.0.0.1, for instance)..
+ if (Objects.equals(vciDto.getBucketId(), vci.getBucketIdentifier())
&& Objects.equals(vciDto.getFlowId(), vci.getFlowIdentifier())) {
throw new IllegalStateException("Cannot import the specified Versioned Flow into the Process Group because doing so would cause a recursive dataflow. "
@@ -1887,7 +1889,20 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
}
- verifyImportProcessGroup(vciDto, group.getParent());
+ final Set<VersionedProcessGroup> childGroups = contents.getProcessGroups();
+ if (childGroups != null) {
+ for (final VersionedProcessGroup childGroup : childGroups) {
+ final VersionedFlowCoordinates childCoordinates = childGroup.getVersionedFlowCoordinates();
+ if (childCoordinates != null) {
+ final VersionControlInformationDTO childVci = new VersionControlInformationDTO();
+ childVci.setBucketId(childCoordinates.getBucketId());
+ childVci.setFlowId(childCoordinates.getFlowId());
+ verifyImportProcessGroup(childVci, childGroup, group);
+ }
+ }
+ }
+
+ verifyImportProcessGroup(vciDto, contents, group.getParent());
}
@Override
@@ -3447,8 +3462,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
entity.setPoliciesPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getPolicies()));
entity.setSystemPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getSystem()));
entity.setRestrictedComponentsPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getRestrictedComponents()));
-
- // TODO - update to be user specific
entity.setCanVersionFlows(CollectionUtils.isNotEmpty(flowRegistryClient.getRegistryIdentifiers()));
return entity;
@@ -3722,13 +3735,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public VersionedFlow deleteVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
+ public VersionedFlow deleteVersionedFlow(final String registryId, final String bucketId, final String flowId) {
final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
if (registry == null) {
throw new IllegalArgumentException("No Flow Registry exists with ID " + registryId);
}
- return registry.deleteVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser());
+ try {
+ return registry.deleteVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser());
+ } catch (final IOException | NiFiRegistryException e) {
+ throw new NiFiCoreException("Failed to remove flow from Flow Registry due to " + e.getMessage(), e);
+ }
}
@Override
@@ -3752,7 +3769,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public FlowComparisonEntity getLocalModifications(final String processGroupId) throws IOException, NiFiRegistryException {
+ public FlowComparisonEntity getLocalModifications(final String processGroupId) {
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
if (versionControlInfo == null) {
@@ -3765,11 +3782,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
+ " but cannot find a Flow Registry with that identifier");
}
- final VersionedFlowSnapshot versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(),
- versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), false, NiFiUserUtils.getNiFiUser());
+ final VersionedFlowSnapshot versionedFlowSnapshot;
+ try {
+ versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(),
+ versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), true, NiFiUserUtils.getNiFiUser());
+ } catch (final IOException | NiFiRegistryException e) {
+ throw new NiFiCoreException("Failed to retrieve flow with Flow Registry in order to calculate local differences due to " + e.getMessage(), e);
+ }
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
- final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, false);
+ final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, true);
final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents();
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup);
@@ -3802,13 +3824,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) throws IOException, NiFiRegistryException {
+ public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) {
final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
if (registry == null) {
throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
}
- return registry.registerVersionedFlow(flow, NiFiUserUtils.getNiFiUser());
+ try {
+ return registry.registerVersionedFlow(flow, NiFiUserUtils.getNiFiUser());
+ } catch (final IOException | NiFiRegistryException e) {
+ throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e);
+ }
}
private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
@@ -3822,13 +3848,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public VersionedFlowSnapshot registerVersionedFlowSnapshot(final String registryId, final VersionedFlow flow,
- final VersionedProcessGroup snapshot, final String comments, final int expectedVersion) throws IOException, NiFiRegistryException {
+ final VersionedProcessGroup snapshot, final String comments, final int expectedVersion) {
final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
if (registry == null) {
throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
}
- return registry.registerVersionedFlowSnapshot(flow, snapshot, comments, expectedVersion, NiFiUserUtils.getNiFiUser());
+ try {
+ return registry.registerVersionedFlowSnapshot(flow, snapshot, comments, expectedVersion, NiFiUserUtils.getNiFiUser());
+ } catch (final IOException | NiFiRegistryException e) {
+ throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e);
+ }
}
@Override
@@ -3881,7 +3911,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot, final NiFiUser user) throws IOException {
+ public Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot, final NiFiUser user) {
final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
@@ -4057,7 +4087,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo, final boolean fetchRemoteFlows) throws IOException {
+ public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo, final boolean fetchRemoteFlows) {
final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryId());
if (flowRegistry == null) {
throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + versionControlInfo.getRegistryId());
@@ -4066,7 +4096,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final VersionedFlowSnapshot snapshot;
try {
snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), fetchRemoteFlows, NiFiUserUtils.getNiFiUser());
- } catch (final NiFiRegistryException e) {
+ } catch (final NiFiRegistryException | IOException e) {
throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket "
+ versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion());
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/181d6809/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index b3ccefb..a2c16ed 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -1644,10 +1644,6 @@ public class ProcessGroupResource extends ApplicationResource {
// Step 6: Replicate the request or call serviceFacade.updateProcessGroup
final VersionControlInformationDTO versionControlInfo = requestProcessGroupEntity.getComponent().getVersionControlInformation();
- if (versionControlInfo != null) {
- serviceFacade.verifyImportProcessGroup(versionControlInfo, groupId);
- }
-
if (versionControlInfo != null && requestProcessGroupEntity.getVersionedFlowSnapshot() == null) {
// Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail.
// Step 2: Retrieve flow from Flow Registry
@@ -1670,6 +1666,11 @@ public class ProcessGroupResource extends ApplicationResource {
requestProcessGroupEntity.setVersionedFlowSnapshot(flowSnapshot);
}
+ if (versionControlInfo != null) {
+ final VersionedFlowSnapshot flowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot();
+ serviceFacade.verifyImportProcessGroup(versionControlInfo, flowSnapshot.getFlowContents(), groupId);
+ }
+
// Step 6: Replicate the request or call serviceFacade.updateProcessGroup
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestProcessGroupEntity);
http://git-wip-us.apache.org/repos/asf/nifi/blob/181d6809/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
index 3090c6e..1d4cd88 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
@@ -1110,7 +1110,7 @@ public class VersionsResource extends ApplicationResource {
// Create an asynchronous request that will occur in the background, because this request may
// result in stopping components, which can take an indeterminate amount of time.
final String requestId = UUID.randomUUID().toString();
- final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Stopping Processors");
+ final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Stopping Affected Processors");
// Submit the request to be performed in the background
final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {
@@ -1275,7 +1275,7 @@ public class VersionsResource extends ApplicationResource {
// Create an asynchronous request that will occur in the background, because this request may
// result in stopping components, which can take an indeterminate amount of time.
final String requestId = UUID.randomUUID().toString();
- final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Stopping Processors");
+ final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Stopping Affected Processors");
// Submit the request to be performed in the background
final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {
http://git-wip-us.apache.org/repos/asf/nifi/blob/181d6809/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
index 4d8e984..5622097 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
@@ -172,21 +172,23 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
}
}
- controllerService.getProcessGroup().onComponentModified();
-
- // For any component that references this Controller Service, find the component's Process Group
- // and notify the Process Group that a component has been modified. This way, we know to re-calculate
- // whether or not the Process Group has local modifications.
final ProcessGroup group = controllerService.getProcessGroup();
- controllerService.getReferences().getReferencingComponents().stream()
- .map(ConfiguredComponent::getProcessGroupIdentifier)
- .filter(id -> !id.equals(group.getIdentifier()))
- .forEach(groupId -> {
- final ProcessGroup descendant = group.findProcessGroup(groupId);
- if (descendant != null) {
- descendant.onComponentModified();
- }
- });
+ if (group != null) {
+ group.onComponentModified();
+
+ // For any component that references this Controller Service, find the component's Process Group
+ // and notify the Process Group that a component has been modified. This way, we know to re-calculate
+ // whether or not the Process Group has local modifications.
+ controllerService.getReferences().getReferencingComponents().stream()
+ .map(ConfiguredComponent::getProcessGroupIdentifier)
+ .filter(id -> !id.equals(group.getIdentifier()))
+ .forEach(groupId -> {
+ final ProcessGroup descendant = group.findProcessGroup(groupId);
+ if (descendant != null) {
+ descendant.onComponentModified();
+ }
+ });
+ }
return controllerService;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/181d6809/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index 52a18dc..e7e85af 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -274,7 +274,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
@Override
public ProcessGroup disconnectVersionControl(final String groupId) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
- group.disconnectVersionControl();
+ group.disconnectVersionControl(true);
return group;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/181d6809/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
index 11bd1b8..2dbd1ca 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
@@ -90,7 +90,6 @@ public final class SnippetUtils {
private DtoFactory dtoFactory;
private AccessPolicyDAO accessPolicyDAO;
-
/**
* Populates the specified snippet and returns the details.
*