You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:14:16 UTC
[28/50] nifi git commit: NIFI-4436: More intelligently flag a
ProcessGroup to indicate whether or not it has any local modifications
compared to Versioned Flow - Bug fixes - Updated to include status of a
Versioned Process Group to include VersionedFlowS
NIFI-4436: More intelligently flag a ProcessGroup to indicate whether or not it has any local modifications compared to Versioned Flow - Bug fixes - Updated to include status of a Versioned Process Group to include VersionedFlowState and explanation
Signed-off-by: Matt Gilman <ma...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fdef5b56
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fdef5b56
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fdef5b56
Branch: refs/heads/master
Commit: fdef5b560544a8da33068b4acb6d4404fe193ed9
Parents: d34fb5e
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Nov 28 12:33:00 2017 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Jan 8 12:44:54 2018 -0500
----------------------------------------------------------------------
.../api/dto/VersionControlInformationDTO.java | 22 ++
.../service/ControllerServiceNode.java | 3 +-
.../org/apache/nifi/groups/ProcessGroup.java | 13 +-
.../apache/nifi/groups/RemoteProcessGroup.java | 9 +-
.../flow/VersionControlInformation.java | 5 +
.../nifi/registry/flow/VersionedFlowState.java | 52 +++
.../nifi/registry/flow/VersionedFlowStatus.java | 31 ++
.../apache/nifi/controller/FlowController.java | 76 ++--
.../controller/StandardFlowSynchronizer.java | 5 +-
.../nifi/groups/StandardProcessGroup.java | 382 +++++++++++++++++--
.../groups/StandardVersionedFlowStatus.java | 50 +++
.../flow/StandardVersionControlInformation.java | 17 +-
.../flow/mapping/NiFiRegistryDtoMapper.java | 328 ----------------
.../flow/mapping/NiFiRegistryFlowMapper.java | 97 +++--
.../nifi/remote/StandardRemoteProcessGroup.java | 126 ++----
.../service/mock/MockProcessGroup.java | 6 +-
.../nifi/web/StandardNiFiServiceFacade.java | 31 +-
.../nifi/web/api/ProcessGroupResource.java | 2 +-
.../org/apache/nifi/web/api/dto/DtoFactory.java | 29 +-
.../nifi/web/controller/ControllerFacade.java | 5 +
.../dao/impl/StandardControllerServiceDAO.java | 20 +-
.../nifi/web/dao/impl/StandardFunnelDAO.java | 2 +
.../nifi/web/dao/impl/StandardInputPortDAO.java | 1 +
.../nifi/web/dao/impl/StandardLabelDAO.java | 1 +
.../web/dao/impl/StandardOutputPortDAO.java | 1 +
.../web/dao/impl/StandardProcessGroupDAO.java | 13 +-
.../nifi/web/dao/impl/StandardProcessorDAO.java | 1 +
.../dao/impl/StandardRemoteProcessGroupDAO.java | 5 +-
.../nifi/web/util/AffectedComponentUtils.java | 4 +
.../ClusterReplicationComponentLifecycle.java | 6 +-
30 files changed, 751 insertions(+), 592 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java
index c31a957..944b10a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java
@@ -34,6 +34,8 @@ public class VersionControlInformationDTO {
private Integer version;
private Boolean modified;
private Boolean current;
+ private String state;
+ private String stateExplanation;
@ApiModelProperty("The ID of the Process Group that is under version control")
public String getGroupId() {
@@ -135,4 +137,24 @@ public class VersionControlInformationDTO {
public void setCurrent(Boolean current) {
this.current = current;
}
+
+ @ApiModelProperty(readOnly = true,
+ value = "The current state of the Process Group, as it relates to the Versioned Flow",
+ allowableValues = "LOCALLY_MODIFIED_DESCENDANT, LOCALLY_MODIFIED, STALE, LOCALLY_MODIFIED_AND_STALE, UP_TO_DATE")
+ public String getState() {
+ return state;
+ }
+
+ public void setState(final String state) {
+ this.state = state;
+ }
+
+ @ApiModelProperty(readOnly = true, value = "Explanation of why the group is in the specified state")
+ public String getStateExplanation() {
+ return stateExplanation;
+ }
+
+ public void setStateExplanation(String explanation) {
+ this.stateExplanation = explanation;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index 2f28963..2219d6d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.controller.service;
+import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.VersionedComponent;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
@@ -27,7 +28,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
-public interface ControllerServiceNode extends ConfiguredComponent, VersionedComponent {
+public interface ControllerServiceNode extends ConfiguredComponent, ConfigurableComponent, VersionedComponent {
/**
* @return the Process Group that this Controller Service belongs to, or <code>null</code> if the Controller Service
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index d81b7d3..17131dd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -462,11 +462,11 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
/**
* @param id of the Controller Service
- * @return the Controller Service with the given ID, if it exists as a child or
- * descendant of this ProcessGroup. This performs a recursive search of all
- * descendant ProcessGroups
+ * @param includeDescendantGroups whether or not to include descendant process groups
+ * @param includeAncestorGroups whether or not to include ancestor process groups
+ * @return the Controller Service with the given ID
*/
- ControllerServiceNode findControllerService(String id);
+ ControllerServiceNode findControllerService(String id, boolean includeDescendantGroups, boolean includeAncestorGroups);
/**
* @return a List of all Controller Services contained within this ProcessGroup and any child Process Groups
@@ -976,4 +976,9 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
* @param flowRegistry the Flow Registry to synchronize with
*/
void synchronizeWithFlowRegistry(FlowRegistryClient flowRegistry);
+
+ /**
+ * Called whenever a component within this group or the group itself is modified
+ */
+ void onComponentModified();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index 0dd6070..7d92246 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -61,9 +61,9 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable,
void setName(String name);
- void setInputPorts(Set<RemoteProcessGroupPortDescriptor> ports);
+ void setInputPorts(Set<RemoteProcessGroupPortDescriptor> ports, boolean pruneUnusedPorts);
- void setOutputPorts(Set<RemoteProcessGroupPortDescriptor> ports);
+ void setOutputPorts(Set<RemoteProcessGroupPortDescriptor> ports, boolean pruneUnusedPorts);
Set<RemoteGroupPort> getInputPorts();
@@ -216,11 +216,6 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable,
void reinitialize(boolean isClustered);
/**
- * Removes all non existent ports from this RemoteProcessGroup.
- */
- void removeAllNonExistentPorts();
-
- /**
* Removes a port that no longer exists on the remote instance from this
* RemoteProcessGroup
*
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
index b54a1c9..1f65a19 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
@@ -77,6 +77,11 @@ public interface VersionControlInformation {
boolean isCurrent();
/**
+ * @return the current status of the Process Group as it relates to the associated Versioned Flow.
+ */
+ VersionedFlowStatus getStatus();
+
+ /**
* @return the snapshot of the flow that was synchronized with the Flow Registry
*/
VersionedProcessGroup getFlowSnapshot();
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java
new file mode 100644
index 0000000..d20a13f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+public enum VersionedFlowState {
+
+ /**
+ * We are unable to communicate with the Flow Registry in order to determine the appropriate state
+ */
+ SYNC_FAILURE,
+
+ /**
+ * This Process Group (or a child/descendant Process Group that is not itself under Version Control)
+ * is on the latest version of the Versioned Flow, but is different than the Versioned Flow that is
+ * stored in the Flow Registry.
+ */
+ LOCALLY_MODIFIED,
+
+ /**
+ * This Process Group has not been modified since it was last synchronized with the Flow Registry, but
+ * the Flow Registry has a newer version of the flow than what is contained in this Process Group.
+ */
+ STALE,
+
+ /**
+ * This Process Group (or a child/descendant Process Group that is not itself under Version Control)
+ * has been modified since it was last synchronized with the Flow Registry, and the Flow Registry has
+ * a newer version of the flow than what is contained in this Process Group.
+ */
+ LOCALLY_MODIFIED_AND_STALE,
+
+ /**
+ * This Process Group and all child/descendant Process Groups are on the latest version of the flow in
+ * the Flow Registry and have no local modifications.
+ */
+ UP_TO_DATE;
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowStatus.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowStatus.java
new file mode 100644
index 0000000..9b58d9a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowStatus.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+public interface VersionedFlowStatus {
+
+ /**
+ * @return the current state of the versioned process group
+ */
+ VersionedFlowState getState();
+
+ /**
+ * @return an explanation of why the process group is in the state that it is in.
+ */
+ String getStateExplanation();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 3909387..2afa9dc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -16,6 +16,39 @@
*/
package org.apache.nifi.controller;
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.SSLContext;
+
import org.apache.commons.collections4.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
@@ -169,7 +202,6 @@ import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
-import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
import org.apache.nifi.remote.HttpRemoteSiteListener;
@@ -225,38 +257,6 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import static java.util.Objects.requireNonNull;
-
public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider,
QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider, IdentifierLookup, ReloadComponent {
@@ -1983,14 +1983,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
if (remoteGroupDTO.getContents() != null) {
final RemoteProcessGroupContentsDTO contents = remoteGroupDTO.getContents();
- // ensure there input ports
+ // ensure there are input ports
if (contents.getInputPorts() != null) {
- remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts()));
+ remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts()), false);
}
// ensure there are output ports
if (contents.getOutputPorts() != null) {
- remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts()));
+ remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts()), false);
}
}
@@ -2035,12 +2035,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
instantiateSnippet(childGroup, childTemplateDTO, false);
if (groupDTO.getVersionControlInformation() != null) {
- final NiFiRegistryFlowMapper flowMapper = new NiFiRegistryFlowMapper();
- final VersionedProcessGroup versionedGroup = flowMapper.mapProcessGroup(childGroup, getFlowRegistryClient(), false);
-
final VersionControlInformation vci = StandardVersionControlInformation.Builder
.fromDto(groupDTO.getVersionControlInformation())
- .flowSnapshot(versionedGroup)
.build();
childGroup.setVersionControlInformation(vci, Collections.emptyMap());
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 71a587c..28d9b79 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -931,6 +931,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final Label label = controller.createLabel(labelDTO.getId(), labelDTO.getLabel());
label.setStyle(labelDTO.getStyle());
label.setPosition(new Position(labelDTO.getPosition().getX(), labelDTO.getPosition().getY()));
+ label.setVersionedComponentId(labelDTO.getVersionedComponentId());
if (labelDTO.getWidth() != null && labelDTO.getHeight() != null) {
label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight()));
}
@@ -1327,13 +1328,13 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
for (final Element portElement : getChildrenByTagName(remoteProcessGroupElement, "inputPort")) {
inputPorts.add(FlowFromDOMFactory.getRemoteProcessGroupPort(portElement));
}
- remoteGroup.setInputPorts(inputPorts);
+ remoteGroup.setInputPorts(inputPorts, false);
final Set<RemoteProcessGroupPortDescriptor> outputPorts = new HashSet<>();
for (final Element portElement : getChildrenByTagName(remoteProcessGroupElement, "outputPort")) {
outputPorts.add(FlowFromDOMFactory.getRemoteProcessGroupPort(portElement));
}
- remoteGroup.setOutputPorts(outputPorts);
+ remoteGroup.setOutputPorts(outputPorts, false);
processGroup.addRemoteProcessGroup(remoteGroup);
for (final RemoteProcessGroupPortDescriptor remoteGroupPortDTO : outputPorts) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 9a14464..4b186a9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -83,11 +83,14 @@ import org.apache.nifi.registry.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowState;
+import org.apache.nifi.registry.flow.VersionedFlowStatus;
import org.apache.nifi.registry.flow.VersionedFunnel;
import org.apache.nifi.registry.flow.VersionedLabel;
import org.apache.nifi.registry.flow.VersionedPort;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.VersionedProcessor;
+import org.apache.nifi.registry.flow.VersionedPropertyDescriptor;
import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
@@ -166,6 +169,8 @@ public final class StandardProcessGroup implements ProcessGroup {
private final Map<String, Template> templates = new HashMap<>();
private final StringEncryptor encryptor;
private final MutableVariableRegistry variableRegistry;
+ private final AtomicReference<StandardVersionedFlowStatus> flowStatus = new AtomicReference<>(
+ new StandardVersionedFlowStatus(null, "Not yet synchronized with Flow Registry", null));
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
@@ -494,6 +499,7 @@ public final class StandardProcessGroup implements ProcessGroup {
port.setProcessGroup(this);
inputPorts.put(requireNonNull(port).getIdentifier(), port);
flowController.onInputPortAdded(port);
+ onComponentModified();
} finally {
writeLock.unlock();
}
@@ -528,6 +534,8 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException(port.getIdentifier() + " is not an Input Port of this Process Group");
}
+ onComponentModified();
+
flowController.onInputPortRemoved(port);
LOG.info("Input Port {} removed from flow", port);
} finally {
@@ -575,6 +583,7 @@ public final class StandardProcessGroup implements ProcessGroup {
port.setProcessGroup(this);
outputPorts.put(port.getIdentifier(), port);
flowController.onOutputPortAdded(port);
+ onComponentModified();
} finally {
writeLock.unlock();
}
@@ -600,6 +609,8 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException(port.getIdentifier() + " is not an Output Port of this Process Group");
}
+ onComponentModified();
+
flowController.onOutputPortRemoved(port);
LOG.info("Output Port {} removed from flow", port);
} finally {
@@ -640,6 +651,7 @@ public final class StandardProcessGroup implements ProcessGroup {
processGroups.put(Objects.requireNonNull(group).getIdentifier(), group);
flowController.onProcessGroupAdded(group);
+ onComponentModified();
} finally {
writeLock.unlock();
}
@@ -679,6 +691,8 @@ public final class StandardProcessGroup implements ProcessGroup {
removeComponents(group);
processGroups.remove(group.getIdentifier());
+ onComponentModified();
+
flowController.onProcessGroupRemoved(group);
LOG.info("{} removed from flow", group);
} finally {
@@ -734,6 +748,7 @@ public final class StandardProcessGroup implements ProcessGroup {
remoteGroup.setProcessGroup(this);
remoteGroups.put(Objects.requireNonNull(remoteGroup).getIdentifier(), remoteGroup);
+ onComponentModified();
} finally {
writeLock.unlock();
}
@@ -767,6 +782,8 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
+ onComponentModified();
+
for (final RemoteGroupPort port : remoteGroup.getOutputPorts()) {
// must copy to avoid a concurrent modification
final Set<Connection> copy = new HashSet<>(port.getConnections());
@@ -802,6 +819,7 @@ public final class StandardProcessGroup implements ProcessGroup {
processor.getVariableRegistry().setParent(getVariableRegistry());
processors.put(processorId, processor);
flowController.onProcessorAdded(processor);
+ onComponentModified();
} finally {
writeLock.unlock();
}
@@ -843,6 +861,8 @@ public final class StandardProcessGroup implements ProcessGroup {
}
processors.remove(id);
+ onComponentModified();
+
flowController.onProcessorRemoved(processor);
LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers();
@@ -912,6 +932,7 @@ public final class StandardProcessGroup implements ProcessGroup {
writeLock.lock();
try {
connections.put(connection.getIdentifier(), connection);
+ onComponentModified();
connection.setProcessGroup(this);
} finally {
writeLock.unlock();
@@ -983,6 +1004,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
connections.put(connection.getIdentifier(), connection);
flowController.onConnectionAdded(connection);
+ onComponentModified();
} finally {
writeLock.unlock();
}
@@ -1042,6 +1064,8 @@ public final class StandardProcessGroup implements ProcessGroup {
// remove the connection from our map
connections.remove(connection.getIdentifier());
LOG.info("{} removed from flow", connection);
+ onComponentModified();
+
flowController.onConnectionRemoved(connection);
} finally {
writeLock.unlock();
@@ -1109,6 +1133,7 @@ public final class StandardProcessGroup implements ProcessGroup {
label.setProcessGroup(this);
labels.put(label.getIdentifier(), label);
+ onComponentModified();
} finally {
writeLock.unlock();
}
@@ -1123,6 +1148,7 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException(label + " is not a member of this Process Group.");
}
+ onComponentModified();
LOG.info("Label with ID {} removed from flow", label.getIdentifier());
} finally {
writeLock.unlock();
@@ -1828,6 +1854,8 @@ public final class StandardProcessGroup implements ProcessGroup {
if (autoStart) {
startFunnel(funnel);
}
+
+ onComponentModified();
} finally {
writeLock.unlock();
}
@@ -1859,18 +1887,43 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
- public ControllerServiceNode findControllerService(final String id) {
- return findControllerService(id, this);
+ public ControllerServiceNode findControllerService(final String id, final boolean includeDescendants, final boolean includeAncestors) {
+ ControllerServiceNode serviceNode;
+ if (includeDescendants) {
+ serviceNode = findDescendantControllerService(id, this);
+ } else {
+ serviceNode = getControllerService(id);
+ }
+
+ if (serviceNode == null && includeAncestors) {
+ serviceNode = findAncestorControllerService(id, getParent());
+ }
+
+ return serviceNode;
+ }
+
+ private ControllerServiceNode findAncestorControllerService(final String id, final ProcessGroup start) {
+ if (start == null) {
+ return null;
+ }
+
+ final ControllerServiceNode serviceNode = start.getControllerService(id);
+ if (serviceNode != null) {
+ return serviceNode;
+ }
+
+ final ProcessGroup parent = start.getParent();
+ return findAncestorControllerService(id, parent);
}
- private ControllerServiceNode findControllerService(final String id, final ProcessGroup start) {
+ private ControllerServiceNode findDescendantControllerService(final String id, final ProcessGroup start) {
ControllerServiceNode service = start.getControllerService(id);
if (service != null) {
return service;
}
for (final ProcessGroup group : start.getProcessGroups()) {
- service = findControllerService(id, group);
+ service = findDescendantControllerService(id, group);
if (service != null) {
return service;
}
@@ -1916,6 +1969,8 @@ public final class StandardProcessGroup implements ProcessGroup {
}
funnels.remove(funnel.getIdentifier());
+ onComponentModified();
+
flowController.onFunnelRemoved(funnel);
LOG.info("{} removed from flow", funnel);
} finally {
@@ -1947,6 +2002,7 @@ public final class StandardProcessGroup implements ProcessGroup {
service.getVariableRegistry().setParent(getVariableRegistry());
this.controllerServices.put(service.getIdentifier(), service);
LOG.info("{} added to {}", service, this);
+ onComponentModified();
} finally {
writeLock.unlock();
}
@@ -2010,6 +2066,21 @@ public final class StandardProcessGroup implements ProcessGroup {
}
controllerServices.remove(service.getIdentifier());
+ onComponentModified();
+
+ // For any component that references this Controller Service, find the component's Process Group
+ // and notify the Process Group that a component has been modified. This way, we know to re-calculate
+ // whether or not the Process Group has local modifications.
+ service.getReferences().getReferencingComponents().stream()
+ .map(ConfiguredComponent::getProcessGroupIdentifier)
+ .filter(id -> !id.equals(getIdentifier()))
+ .forEach(groupId -> {
+ final ProcessGroup descendant = findProcessGroup(groupId);
+ if (descendant != null) {
+ descendant.onComponentModified();
+ }
+ });
+
flowController.getStateManagerProvider().onComponentRemoved(service.getIdentifier());
removed = true;
@@ -2043,6 +2114,7 @@ public final class StandardProcessGroup implements ProcessGroup {
templates.put(id, template);
template.setProcessGroup(this);
LOG.info("{} added to {}", template, this);
+ onComponentModified();
} finally {
writeLock.unlock();
}
@@ -2112,6 +2184,8 @@ public final class StandardProcessGroup implements ProcessGroup {
}
templates.remove(template.getIdentifier());
+ onComponentModified();
+
LOG.info("{} removed from flow", template);
} finally {
writeLock.unlock();
@@ -2172,6 +2246,8 @@ public final class StandardProcessGroup implements ProcessGroup {
toRemove.verifyCanDelete(true);
}
+ onComponentModified();
+
for (final String id : connectionIdsToRemove) {
removeConnection(connections.get(id));
}
@@ -2224,6 +2300,8 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException("Cannot move Ports into the root group");
}
+ onComponentModified();
+
for (final String id : getKeys(snippet.getInputPorts())) {
destination.addInputPort(inputPorts.remove(id));
}
@@ -2845,6 +2923,34 @@ public final class StandardProcessGroup implements ProcessGroup {
}
@Override
+ public void onComponentModified() {
+ // We no longer know if or how the Process Group has changed, so the next time that we
+ // get the local modifications, we must re-calculate it. We cannot simply assume that
+ // the flow was modified now, because if a Processor Property changed from 'A' to 'B',
+ // then back to 'A', then we have to know that it was not modified. So we set it to null
+ // to indicate that we must calculate the local modifications.
+ final StandardVersionControlInformation svci = this.versionControlInfo.get();
+ if (svci == null) {
+ // This group is not under version control directly. Notify parent.
+ final ProcessGroup parentGroup = parent.get();
+ if (parentGroup != null) {
+ parentGroup.onComponentModified();
+ }
+ }
+
+ clearFlowDifferences();
+ }
+
+ private void clearFlowDifferences() {
+ boolean updated = false;
+ while (!updated) {
+ final StandardVersionedFlowStatus status = flowStatus.get();
+ final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(status.getState(), status.getStateExplanation(), null);
+ updated = flowStatus.compareAndSet(status, updatedStatus);
+ }
+ }
+
+ @Override
public void setVersionControlInformation(final VersionControlInformation versionControlInformation, final Map<String, String> versionedComponentIds) {
final StandardVersionControlInformation svci = new StandardVersionControlInformation(
versionControlInformation.getRegistryIdentifier(),
@@ -2854,16 +2960,63 @@ public final class StandardProcessGroup implements ProcessGroup {
versionControlInformation.getVersion(),
stripContentsFromRemoteDescendantGroups(versionControlInformation.getFlowSnapshot(), true),
versionControlInformation.isModified(),
- versionControlInformation.isCurrent()) {
+ versionControlInformation.isCurrent(),
+ versionControlInformation.getStatus()) {
@Override
public boolean isModified() {
- final Set<FlowDifference> differences = StandardProcessGroup.this.getModifications();
- if (differences == null) {
- return false;
+ boolean updated = false;
+ while (true) {
+ final StandardVersionedFlowStatus status = flowStatus.get();
+ Set<FlowDifference> differences = status.getCurrentDifferences();
+ if (differences == null) {
+ differences = getModifications();
+ if (differences == null) {
+ return false;
+ }
+
+ final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(status.getState(), status.getStateExplanation(), differences);
+ updated = flowStatus.compareAndSet(status, updatedStatus);
+
+ if (updated) {
+ return !differences.isEmpty();
+ }
+
+ continue;
+ }
+
+ return !differences.isEmpty();
+ }
+ }
+
+ @Override
+ public VersionedFlowStatus getStatus() {
+ // If current state is a sync failure, then
+ final StandardVersionedFlowStatus status = flowStatus.get();
+ final VersionedFlowState state = status.getState();
+ if (state == VersionedFlowState.SYNC_FAILURE) {
+ return status;
}
- return !differences.isEmpty();
+ final boolean modified = isModified();
+ if (!modified) {
+ final VersionControlInformation vci = StandardProcessGroup.this.versionControlInfo.get();
+ if (vci.getFlowSnapshot() == null) {
+ return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Process Group has not yet been synchronized with Flow Registry", null);
+ }
+ }
+
+ final boolean stale = !isCurrent();
+
+ if (modified && stale) {
+ return new StandardVersionedFlowStatus(VersionedFlowState.LOCALLY_MODIFIED_AND_STALE, "Local changes have been made and a newer version of this flow is available", null);
+ } else if (modified) {
+ return new StandardVersionedFlowStatus(VersionedFlowState.LOCALLY_MODIFIED, "Local changes have been made", null);
+ } else if (stale) {
+ return new StandardVersionedFlowStatus(VersionedFlowState.STALE, "A newer version of this flow is available", null);
+ } else {
+ return new StandardVersionedFlowStatus(VersionedFlowState.UP_TO_DATE, "Flow version is current", null);
+ }
}
};
@@ -2875,6 +3028,7 @@ public final class StandardProcessGroup implements ProcessGroup {
try {
updateVersionedComponentIds(this, versionedComponentIds);
this.versionControlInfo.set(svci);
+ clearFlowDifferences();
} finally {
writeLock.unlock();
}
@@ -2901,6 +3055,7 @@ public final class StandardProcessGroup implements ProcessGroup {
copy.setProcessors(processGroup.getProcessors());
copy.setRemoteProcessGroups(processGroup.getRemoteProcessGroups());
copy.setVariables(processGroup.getVariables());
+ copy.setLabels(processGroup.getLabels());
final Set<VersionedProcessGroup> copyChildren = new HashSet<>();
@@ -2944,8 +3099,22 @@ public final class StandardProcessGroup implements ProcessGroup {
}
applyVersionedComponentIds(processGroup, versionedComponentIds::get);
+
+ // If we versioned any parent groups' Controller Services, set their versioned component id's too.
+ final ProcessGroup parent = processGroup.getParent();
+ if (parent != null) {
+ for (final ControllerServiceNode service : parent.getControllerServices(true)) {
+ if (!service.getVersionedComponentId().isPresent()) {
+ final String versionedId = versionedComponentIds.get(service.getIdentifier());
+ if (versionedId != null) {
+ service.setVersionedComponentId(versionedId);
+ }
+ }
+ }
+ }
}
+
private void applyVersionedComponentIds(final ProcessGroup processGroup, final Function<String, String> lookup) {
processGroup.setVersionedComponentId(lookup.apply(processGroup.getIdentifier()));
@@ -2980,6 +3149,14 @@ public final class StandardProcessGroup implements ProcessGroup {
.forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup));
}
+ private void setSyncFailedState(final String explanation) {
+ boolean updated = false;
+ while (!updated) {
+ final StandardVersionedFlowStatus status = flowStatus.get();
+ final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, explanation, status.getCurrentDifferences());
+ updated = flowStatus.compareAndSet(status, updatedStatus);
+ }
+ }
@Override
public void synchronizeWithFlowRegistry(final FlowRegistryClient flowRegistryClient) {
@@ -2991,6 +3168,10 @@ public final class StandardProcessGroup implements ProcessGroup {
final String registryId = vci.getRegistryIdentifier();
final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
if (flowRegistry == null) {
+ final String message = String.format("Unable to synchronize Process Group with Flow Registry because Process Group was placed under Version Control using Flow Registry "
+ + "with identifier %s but cannot find any Flow Registry with this identifier", registryId);
+ setSyncFailedState(message);
+
LOG.error("Unable to synchronize {} with Flow Registry because Process Group was placed under Version Control using Flow Registry "
+ "with identifier {} but cannot find any Flow Registry with this identifier", this, registryId);
return;
@@ -3005,8 +3186,12 @@ public final class StandardProcessGroup implements ProcessGroup {
final VersionedProcessGroup registryFlow = registrySnapshot.getFlowContents();
vci.setFlowSnapshot(registryFlow);
} catch (final IOException | NiFiRegistryException e) {
+ final String message = String.format("Failed to synchronize Process Group with Flow Registry because could not retrieve version %s of flow with identifier %s in bucket %s",
+ vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier());
+ setSyncFailedState(message);
+
LOG.error("Failed to synchronize {} with Flow Registry because could not retrieve version {} of flow with identifier {} in bucket {}",
- new Object[] {this, vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier()}, e);
+ this, vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier(), e);
return;
}
}
@@ -3027,7 +3212,17 @@ public final class StandardProcessGroup implements ProcessGroup {
LOG.info("{} is not the most recent version of the flow that is under Version Control; current version is {}; most recent version is {}",
new Object[] {this, vci.getVersion(), latestVersion});
}
+
+ boolean updated = false;
+ while (!updated) {
+ final StandardVersionedFlowStatus status = flowStatus.get();
+ final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(null, null, status.getCurrentDifferences());
+ updated = flowStatus.compareAndSet(status, updatedStatus);
+ }
} catch (final IOException | NiFiRegistryException e) {
+ final String message = String.format("Failed to synchronize Process Group with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry");
+ setSyncFailedState(message);
+
LOG.error("Failed to synchronize {} with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry", this, e);
}
}
@@ -3041,12 +3236,12 @@ public final class StandardProcessGroup implements ProcessGroup {
verifyCanUpdate(proposedSnapshot, true, verifyNotDirty);
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
- final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), true);
+ final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, controllerServiceProvider, flowController.getFlowRegistryClient(), true);
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
final ComparableDataFlow remoteFlow = new StandardComparableDataFlow("Remote Flow", proposedSnapshot.getFlowContents());
- final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow, new StaticDifferenceDescriptor());
+ final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow, getAncestorGroupServiceIds(), new StaticDifferenceDescriptor());
final FlowComparison flowComparison = flowComparator.compare();
final Set<String> updatedVersionedComponentIds = new HashSet<>();
@@ -3055,6 +3250,25 @@ public final class StandardProcessGroup implements ProcessGroup {
continue;
}
+ // If this update adds a new Controller Service, then we need to check if the service already exists at a higher level
+ // and if so compare our VersionedControllerService to the existing service.
+ if (diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED) {
+ final VersionedComponent component = diff.getComponentA() == null ? diff.getComponentB() : diff.getComponentA();
+ if (ComponentType.CONTROLLER_SERVICE == component.getComponentType()) {
+ final ControllerServiceNode serviceNode = getVersionedControllerService(this, component.getIdentifier());
+ if (serviceNode != null) {
+ final VersionedControllerService versionedService = mapper.mapControllerService(serviceNode, controllerServiceProvider);
+ final Set<FlowDifference> differences = flowComparator.compareControllerServices(versionedService, (VersionedControllerService) component);
+
+ if (!differences.isEmpty()) {
+ updatedVersionedComponentIds.add(component.getIdentifier());
+ }
+
+ continue;
+ }
+ }
+ }
+
final VersionedComponent component = diff.getComponentA() == null ? diff.getComponentB() : diff.getComponentA();
updatedVersionedComponentIds.add(component.getIdentifier());
@@ -3081,6 +3295,35 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
+ private Set<String> getAncestorGroupServiceIds() {
+ final Set<String> ancestorServiceIds;
+ ProcessGroup parentGroup = getParent();
+
+ if (parentGroup == null) {
+ ancestorServiceIds = Collections.emptySet();
+ } else {
+ ancestorServiceIds = parentGroup.getControllerServices(true).stream()
+ .map(ControllerServiceNode::getIdentifier)
+ .collect(Collectors.toSet());
+ }
+
+ return ancestorServiceIds;
+ }
+
+ private ControllerServiceNode getVersionedControllerService(final ProcessGroup group, final String versionedComponentId) {
+ if (group == null) {
+ return null;
+ }
+
+ for (final ControllerServiceNode serviceNode : group.getControllerServices(false)) {
+ if (serviceNode.getVersionedComponentId().isPresent() && serviceNode.getVersionedComponentId().get().equals(versionedComponentId)) {
+ return serviceNode;
+ }
+ }
+
+ return getVersionedControllerService(group.getParent(), versionedComponentId);
+ }
+
private Set<String> getKnownVariableNames() {
final Set<String> variableNames = new HashSet<>();
populateKnownVariableNames(this, variableNames);
@@ -3159,6 +3402,44 @@ public final class StandardProcessGroup implements ProcessGroup {
group.setVersionControlInformation(vci, Collections.emptyMap());
}
+
+ // Controller Services
+ // Controller Services have to be handled a bit differently than other components. This is because Processors and Controller
+ // Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding
+ // Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each
+ // Controller Service. This way, we ensure that all services have been created before setting the properties. This allows us to
+ // properly obtain the correct mapping of Controller Service VersionedComponentID to Controller Service instance id.
+ final Map<String, ControllerServiceNode> servicesByVersionedId = group.getControllerServices(false).stream()
+ .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
+
+ final Set<String> controllerServicesRemoved = new HashSet<>(servicesByVersionedId.keySet());
+
+ final Map<ControllerServiceNode, VersionedControllerService> services = new HashMap<>();
+
+ // Add any Controller Service that does not yet exist.
+ for (final VersionedControllerService proposedService : proposed.getControllerServices()) {
+ ControllerServiceNode service = servicesByVersionedId.get(proposedService.getIdentifier());
+ if (service == null) {
+ service = addControllerService(group, proposedService, componentIdSeed);
+ LOG.info("Added {} to {}", service, this);
+ }
+
+ services.put(service, proposedService);
+ }
+
+ // Update all of the Controller Services to match the VersionedControllerService
+ for (final Map.Entry<ControllerServiceNode, VersionedControllerService> entry : services.entrySet()) {
+ final ControllerServiceNode service = entry.getKey();
+ final VersionedControllerService proposedService = entry.getValue();
+
+ if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) {
+ updateControllerService(service, proposedService);
+ LOG.info("Updated {}", service);
+ }
+
+ controllerServicesRemoved.remove(proposedService.getIdentifier());
+ }
+
// Child groups
final Map<String, ProcessGroup> childGroupsByVersionedId = group.getProcessGroups().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
@@ -3179,26 +3460,6 @@ public final class StandardProcessGroup implements ProcessGroup {
childGroupsRemoved.remove(proposedChildGroup.getIdentifier());
}
-
- // Controller Services
- final Map<String, ControllerServiceNode> servicesByVersionedId = group.getControllerServices(false).stream()
- .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
- final Set<String> controllerServicesRemoved = new HashSet<>(servicesByVersionedId.keySet());
-
- for (final VersionedControllerService proposedService : proposed.getControllerServices()) {
- final ControllerServiceNode service = servicesByVersionedId.get(proposedService.getIdentifier());
- if (service == null) {
- final ControllerServiceNode added = addControllerService(group, proposedService, componentIdSeed);
- LOG.info("Added {} to {}", added, this);
- } else if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) {
- updateControllerService(service, proposedService);
- LOG.info("Updated {}", service);
- }
-
- controllerServicesRemoved.remove(proposedService.getIdentifier());
- }
-
-
// Funnels
final Map<String, Funnel> funnelsByVersionedId = group.getFunnels().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
@@ -3608,7 +3869,7 @@ public final class StandardProcessGroup implements ProcessGroup {
service.setAnnotationData(proposed.getAnnotationData());
service.setComments(proposed.getComments());
service.setName(proposed.getName());
- service.setProperties(populatePropertiesMap(service.getProperties(), proposed.getProperties()));
+ service.setProperties(populatePropertiesMap(service.getProperties(), proposed.getProperties(), proposed.getPropertyDescriptors(), service.getProcessGroup()));
if (!isEqual(service.getBundleCoordinate(), proposed.getBundle())) {
final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
@@ -3728,7 +3989,7 @@ public final class StandardProcessGroup implements ProcessGroup {
processor.setExecutionNode(ExecutionNode.valueOf(proposed.getExecutionNode()));
processor.setName(proposed.getName());
processor.setPenalizationPeriod(proposed.getPenaltyDuration());
- processor.setProperties(populatePropertiesMap(processor.getProperties(), proposed.getProperties()));
+ processor.setProperties(populatePropertiesMap(processor.getProperties(), proposed.getProperties(), proposed.getPropertyDescriptors(), processor.getProcessGroup()));
processor.setRunDuration(proposed.getRunDurationMillis(), TimeUnit.MILLISECONDS);
processor.setScheduldingPeriod(proposed.getSchedulingPeriod());
processor.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy()));
@@ -3745,19 +4006,60 @@ public final class StandardProcessGroup implements ProcessGroup {
}
- private Map<String, String> populatePropertiesMap(final Map<PropertyDescriptor, String> currentProperties, final Map<String, String> proposedProperties) {
+ private Map<String, String> populatePropertiesMap(final Map<PropertyDescriptor, String> currentProperties, final Map<String, String> proposedProperties,
+ final Map<String, VersionedPropertyDescriptor> proposedDescriptors, final ProcessGroup group) {
+
final Map<String, String> fullPropertyMap = new HashMap<>();
for (final PropertyDescriptor property : currentProperties.keySet()) {
fullPropertyMap.put(property.getName(), null);
}
if (proposedProperties != null) {
- fullPropertyMap.putAll(proposedProperties);
+ for (final Map.Entry<String, String> entry : proposedProperties.entrySet()) {
+ final String propertyName = entry.getKey();
+ final VersionedPropertyDescriptor descriptor = proposedDescriptors.get(propertyName);
+
+ String value;
+ if (descriptor != null && descriptor.getIdentifiesControllerService()) {
+ // Property identifies a Controller Service. So the value that we want to assign is not the value given.
+ // The value given is instead the Versioned Component ID of the Controller Service. We want to resolve this
+ // to the instance ID of the Controller Service.
+ final String serviceVersionedComponentId = entry.getValue();
+ final String instanceId = getServiceInstanceId(serviceVersionedComponentId, group);
+ value = instanceId == null ? serviceVersionedComponentId : instanceId;
+ } else {
+ value = entry.getValue();
+ }
+
+ fullPropertyMap.put(propertyName, value);
+ }
}
return fullPropertyMap;
}
+ private String getServiceInstanceId(final String serviceVersionedComponentId, final ProcessGroup group) {
+ for (final ControllerServiceNode serviceNode : group.getControllerServices(false)) {
+ final Optional<String> optionalVersionedId = serviceNode.getVersionedComponentId();
+ if (!optionalVersionedId.isPresent()) {
+ continue;
+ }
+
+ final String versionedId = optionalVersionedId.get();
+ if (versionedId.equals(serviceVersionedComponentId)) {
+ return serviceNode.getIdentifier();
+ }
+ }
+
+ final ProcessGroup parent = group.getParent();
+ if (parent == null) {
+ return null;
+ }
+
+ return getServiceInstanceId(serviceVersionedComponentId, parent);
+
+ }
+
private RemoteProcessGroup addRemoteProcessGroup(final ProcessGroup destination, final VersionedRemoteProcessGroup proposed, final String componentIdSeed) {
final RemoteProcessGroup rpg = flowController.createRemoteProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getTargetUris());
rpg.setVersionedComponentId(proposed.getIdentifier());
@@ -3773,12 +4075,12 @@ public final class StandardProcessGroup implements ProcessGroup {
rpg.setCommunicationsTimeout(proposed.getCommunicationsTimeout());
rpg.setInputPorts(proposed.getInputPorts() == null ? Collections.emptySet() : proposed.getInputPorts().stream()
.map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier()))
- .collect(Collectors.toSet()));
+ .collect(Collectors.toSet()), false);
rpg.setName(proposed.getName());
rpg.setNetworkInterface(proposed.getLocalNetworkInterface());
rpg.setOutputPorts(proposed.getOutputPorts() == null ? Collections.emptySet() : proposed.getOutputPorts().stream()
.map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier()))
- .collect(Collectors.toSet()));
+ .collect(Collectors.toSet()), false);
rpg.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
rpg.setProxyHost(proposed.getProxyHost());
rpg.setProxyPort(proposed.getProxyPort());
@@ -3831,12 +4133,12 @@ public final class StandardProcessGroup implements ProcessGroup {
}
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
- final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), false);
+ final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, controllerServiceProvider, flowController.getFlowRegistryClient(), false);
final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
final ComparableDataFlow snapshotFlow = new StandardComparableDataFlow("Versioned Flow", vci.getFlowSnapshot());
- final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, new EvolvingDifferenceDescriptor());
+ final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorGroupServiceIds(), new EvolvingDifferenceDescriptor());
final FlowComparison comparison = flowComparator.compare();
final Set<FlowDifference> differences = comparison.getDifferences();
final Set<FlowDifference> functionalDifferences = differences.stream()
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java
new file mode 100644
index 0000000..f362c1e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.groups;
+
+import java.util.Set;
+
+import org.apache.nifi.registry.flow.VersionedFlowState;
+import org.apache.nifi.registry.flow.VersionedFlowStatus;
+import org.apache.nifi.registry.flow.diff.FlowDifference;
+
+class StandardVersionedFlowStatus implements VersionedFlowStatus {
+ private final VersionedFlowState state;
+ private final String explanation;
+ private final Set<FlowDifference> currentDifferences;
+
+ StandardVersionedFlowStatus(final VersionedFlowState state, final String explanation, final Set<FlowDifference> differences) {
+ this.state = state;
+ this.explanation = explanation;
+ this.currentDifferences = differences;
+ }
+
+ @Override
+ public VersionedFlowState getState() {
+ return state;
+ }
+
+ @Override
+ public String getStateExplanation() {
+ return explanation;
+ }
+
+ Set<FlowDifference> getCurrentDifferences() {
+ return currentDifferences;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
index 92a4166..106d19a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
@@ -34,6 +34,7 @@ public class StandardVersionControlInformation implements VersionControlInformat
private volatile VersionedProcessGroup flowSnapshot;
private volatile boolean modified;
private volatile boolean current;
+ private final VersionedFlowStatus status;
public static class Builder {
private String registryIdentifier;
@@ -47,6 +48,7 @@ public class StandardVersionControlInformation implements VersionControlInformat
private VersionedProcessGroup flowSnapshot;
private Boolean modified = null;
private Boolean current = null;
+ private VersionedFlowStatus status;
public Builder registryId(String registryId) {
this.registryIdentifier = registryId;
@@ -103,6 +105,11 @@ public class StandardVersionControlInformation implements VersionControlInformat
return this;
}
+ public Builder status(final VersionedFlowStatus status) {
+ this.status = status;
+ return this;
+ }
+
public static Builder fromDto(VersionControlInformationDTO dto) {
Builder builder = new Builder();
builder.registryId(dto.getRegistryId())
@@ -126,7 +133,7 @@ public class StandardVersionControlInformation implements VersionControlInformat
Objects.requireNonNull(version, "Version must be specified");
final StandardVersionControlInformation svci = new StandardVersionControlInformation(registryIdentifier, registryName,
- bucketIdentifier, flowIdentifier, version, flowSnapshot, modified, current);
+ bucketIdentifier, flowIdentifier, version, flowSnapshot, modified, current, status);
svci.setBucketName(bucketName);
svci.setFlowName(flowName);
@@ -138,7 +145,7 @@ public class StandardVersionControlInformation implements VersionControlInformat
public StandardVersionControlInformation(final String registryId, final String registryName, final String bucketId, final String flowId, final int version,
- final VersionedProcessGroup snapshot, final boolean modified, final boolean current) {
+ final VersionedProcessGroup snapshot, final boolean modified, final boolean current, final VersionedFlowStatus status) {
this.registryIdentifier = registryId;
this.registryName = registryName;
this.bucketIdentifier = bucketId;
@@ -147,6 +154,7 @@ public class StandardVersionControlInformation implements VersionControlInformat
this.flowSnapshot = snapshot;
this.modified = modified;
this.current = current;
+ this.status = status;
}
@@ -232,4 +240,9 @@ public class StandardVersionControlInformation implements VersionControlInformat
public void setFlowSnapshot(final VersionedProcessGroup flowSnapshot) {
this.flowSnapshot = flowSnapshot;
}
+
+ @Override
+ public VersionedFlowStatus getStatus() {
+ return status;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java
deleted file mode 100644
index 193bde8..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.registry.flow.mapping;
-
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-import org.apache.nifi.registry.flow.BatchSize;
-import org.apache.nifi.registry.flow.Bundle;
-import org.apache.nifi.registry.flow.ComponentType;
-import org.apache.nifi.registry.flow.ConnectableComponent;
-import org.apache.nifi.registry.flow.ConnectableComponentType;
-import org.apache.nifi.registry.flow.ControllerServiceAPI;
-import org.apache.nifi.registry.flow.PortType;
-import org.apache.nifi.registry.flow.Position;
-import org.apache.nifi.registry.flow.VersionedConnection;
-import org.apache.nifi.registry.flow.VersionedControllerService;
-import org.apache.nifi.registry.flow.VersionedFunnel;
-import org.apache.nifi.registry.flow.VersionedLabel;
-import org.apache.nifi.registry.flow.VersionedPort;
-import org.apache.nifi.registry.flow.VersionedProcessGroup;
-import org.apache.nifi.registry.flow.VersionedProcessor;
-import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
-import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
-import org.apache.nifi.web.api.dto.BatchSettingsDTO;
-import org.apache.nifi.web.api.dto.BundleDTO;
-import org.apache.nifi.web.api.dto.ConnectableDTO;
-import org.apache.nifi.web.api.dto.ConnectionDTO;
-import org.apache.nifi.web.api.dto.ControllerServiceApiDTO;
-import org.apache.nifi.web.api.dto.ControllerServiceDTO;
-import org.apache.nifi.web.api.dto.FlowSnippetDTO;
-import org.apache.nifi.web.api.dto.FunnelDTO;
-import org.apache.nifi.web.api.dto.LabelDTO;
-import org.apache.nifi.web.api.dto.PortDTO;
-import org.apache.nifi.web.api.dto.PositionDTO;
-import org.apache.nifi.web.api.dto.ProcessGroupDTO;
-import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
-import org.apache.nifi.web.api.dto.ProcessorDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
-
-
-public class NiFiRegistryDtoMapper {
- // We need to keep a mapping of component id to versionedComponentId as we transform these objects. This way, when
- // we call #mapConnectable, instead of generating a new UUID for the ConnectableComponent, we can lookup the 'versioned'
- // identifier based on the comopnent's actual id. We do connections last, so that all components will already have been
- // created before attempting to create the connection, where the ConnectableDTO is converted.
- private Map<String, String> versionedComponentIds = new HashMap<>();
-
- public VersionedProcessGroup mapProcessGroup(final ProcessGroupDTO dto) {
- versionedComponentIds.clear();
- return mapGroup(dto);
- }
-
- private VersionedProcessGroup mapGroup(final ProcessGroupDTO dto) {
- final VersionedProcessGroup versionedGroup = new VersionedProcessGroup();
- versionedGroup.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
- versionedGroup.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
- versionedGroup.setName(dto.getName());
- versionedGroup.setComments(dto.getComments());
- versionedGroup.setPosition(mapPosition(dto.getPosition()));
-
- final FlowSnippetDTO contents = dto.getContents();
-
- versionedGroup.setControllerServices(contents.getControllerServices().stream()
- .map(this::mapControllerService)
- .collect(Collectors.toCollection(LinkedHashSet::new)));
-
- versionedGroup.setFunnels(contents.getFunnels().stream()
- .map(this::mapFunnel)
- .collect(Collectors.toCollection(LinkedHashSet::new)));
-
- versionedGroup.setInputPorts(contents.getInputPorts().stream()
- .map(this::mapPort)
- .collect(Collectors.toCollection(LinkedHashSet::new)));
-
- versionedGroup.setOutputPorts(contents.getOutputPorts().stream()
- .map(this::mapPort)
- .collect(Collectors.toCollection(LinkedHashSet::new)));
-
- versionedGroup.setLabels(contents.getLabels().stream()
- .map(this::mapLabel)
- .collect(Collectors.toCollection(LinkedHashSet::new)));
-
- versionedGroup.setProcessors(contents.getProcessors().stream()
- .map(this::mapProcessor)
- .collect(Collectors.toCollection(LinkedHashSet::new)));
-
- versionedGroup.setRemoteProcessGroups(contents.getRemoteProcessGroups().stream()
- .map(this::mapRemoteProcessGroup)
- .collect(Collectors.toCollection(LinkedHashSet::new)));
-
- versionedGroup.setProcessGroups(contents.getProcessGroups().stream()
- .map(this::mapGroup)
- .collect(Collectors.toCollection(LinkedHashSet::new)));
-
- versionedGroup.setConnections(contents.getConnections().stream()
- .map(this::mapConnection)
- .collect(Collectors.toCollection(LinkedHashSet::new)));
-
- return versionedGroup;
- }
-
- private String getId(final String currentVersionedId, final String componentId) {
- final String versionedId;
- if (currentVersionedId == null) {
- versionedId = UUID.nameUUIDFromBytes(componentId.getBytes(StandardCharsets.UTF_8)).toString();
- } else {
- versionedId = currentVersionedId;
- }
-
- versionedComponentIds.put(componentId, versionedId);
- return versionedId;
- }
-
- private String getGroupId(final String groupId) {
- return versionedComponentIds.get(groupId);
- }
-
- public VersionedConnection mapConnection(final ConnectionDTO dto) {
- final VersionedConnection connection = new VersionedConnection();
- connection.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
- connection.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
- connection.setName(dto.getName());
- connection.setBackPressureDataSizeThreshold(dto.getBackPressureDataSizeThreshold());
- connection.setBackPressureObjectThreshold(dto.getBackPressureObjectThreshold());
- connection.setFlowFileExpiration(dto.getFlowFileExpiration());
- connection.setLabelIndex(dto.getLabelIndex());
- connection.setPosition(mapPosition(dto.getPosition()));
- connection.setPrioritizers(dto.getPrioritizers());
- connection.setSelectedRelationships(dto.getSelectedRelationships());
- connection.setzIndex(dto.getzIndex());
-
- connection.setBends(dto.getBends().stream()
- .map(this::mapPosition)
- .collect(Collectors.toList()));
-
- connection.setSource(mapConnectable(dto.getSource()));
- connection.setDestination(mapConnectable(dto.getDestination()));
-
- return connection;
- }
-
- public ConnectableComponent mapConnectable(final ConnectableDTO dto) {
- final ConnectableComponent component = new ConnectableComponent();
-
- final String versionedId = dto.getVersionedComponentId();
- if (versionedId == null) {
- final String resolved = versionedComponentIds.get(dto.getId());
- if (resolved == null) {
- throw new IllegalArgumentException("Unable to map Connectable Component with identifier " + dto.getId() + " to any version-controlled component");
- }
-
- component.setId(resolved);
- } else {
- component.setId(versionedId);
- }
-
- component.setComments(dto.getComments());
- component.setGroupId(dto.getGroupId());
- component.setName(dto.getName());
- component.setType(ConnectableComponentType.valueOf(dto.getType()));
- return component;
- }
-
- public VersionedControllerService mapControllerService(final ControllerServiceDTO dto) {
- final VersionedControllerService service = new VersionedControllerService();
- service.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
- service.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
- service.setName(dto.getName());
- service.setAnnotationData(dto.getAnnotationData());
- service.setBundle(mapBundle(dto.getBundle()));
- service.setComments(dto.getComments());
- service.setControllerServiceApis(dto.getControllerServiceApis().stream()
- .map(this::mapControllerServiceApi)
- .collect(Collectors.toList()));
- service.setProperties(dto.getProperties());
- service.setType(dto.getType());
- return null;
- }
-
- private Bundle mapBundle(final BundleDTO dto) {
- final Bundle bundle = new Bundle();
- bundle.setGroup(dto.getGroup());
- bundle.setArtifact(dto.getArtifact());
- bundle.setVersion(dto.getVersion());
- return bundle;
- }
-
- private ControllerServiceAPI mapControllerServiceApi(final ControllerServiceApiDTO dto) {
- final ControllerServiceAPI api = new ControllerServiceAPI();
- api.setBundle(mapBundle(dto.getBundle()));
- api.setType(dto.getType());
- return api;
- }
-
- public VersionedFunnel mapFunnel(final FunnelDTO dto) {
- final VersionedFunnel funnel = new VersionedFunnel();
- funnel.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
- funnel.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
- funnel.setPosition(mapPosition(dto.getPosition()));
- return funnel;
- }
-
- public VersionedLabel mapLabel(final LabelDTO dto) {
- final VersionedLabel label = new VersionedLabel();
- label.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
- label.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
- label.setHeight(dto.getHeight());
- label.setWidth(dto.getWidth());
- label.setLabel(dto.getLabel());
- label.setPosition(mapPosition(dto.getPosition()));
- label.setStyle(dto.getStyle());
- return label;
- }
-
- public VersionedPort mapPort(final PortDTO dto) {
- final VersionedPort port = new VersionedPort();
- port.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
- port.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
- port.setComments(dto.getComments());
- port.setConcurrentlySchedulableTaskCount(dto.getConcurrentlySchedulableTaskCount());
- port.setName(dto.getName());
- port.setPosition(mapPosition(dto.getPosition()));
- port.setType(PortType.valueOf(dto.getType()));
- return port;
- }
-
- public Position mapPosition(final PositionDTO dto) {
- final Position position = new Position();
- position.setX(dto.getX());
- position.setY(dto.getY());
- return position;
- }
-
- public VersionedProcessor mapProcessor(final ProcessorDTO dto) {
- final ProcessorConfigDTO config = dto.getConfig();
-
- final VersionedProcessor processor = new VersionedProcessor();
- processor.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
- processor.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
- processor.setType(dto.getType());
- processor.setAnnotationData(config.getAnnotationData());
- processor.setAutoTerminatedRelationships(config.getAutoTerminatedRelationships());
- processor.setBulletinLevel(config.getBulletinLevel());
- processor.setBundle(mapBundle(dto.getBundle()));
- processor.setComments(config.getComments());
- processor.setConcurrentlySchedulableTaskCount(config.getConcurrentlySchedulableTaskCount());
- processor.setExecutionNode(config.getExecutionNode());
- processor.setName(dto.getName());
- processor.setPenaltyDuration(config.getPenaltyDuration());
- processor.setPosition(mapPosition(dto.getPosition()));
- processor.setProperties(config.getProperties());
- processor.setRunDurationMillis(config.getRunDurationMillis());
- processor.setSchedulingPeriod(config.getSchedulingPeriod());
- processor.setSchedulingStrategy(config.getSchedulingStrategy());
- processor.setStyle(dto.getStyle());
- processor.setYieldDuration(config.getYieldDuration());
- return processor;
- }
-
- public VersionedRemoteProcessGroup mapRemoteProcessGroup(final RemoteProcessGroupDTO dto) {
- final VersionedRemoteProcessGroup rpg = new VersionedRemoteProcessGroup();
- rpg.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
- rpg.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
- rpg.setComments(dto.getComments());
- rpg.setCommunicationsTimeout(dto.getCommunicationsTimeout());
- rpg.setLocalNetworkInterface(dto.getLocalNetworkInterface());
- rpg.setName(dto.getName());
- rpg.setInputPorts(dto.getContents().getInputPorts().stream()
- .map(port -> mapRemotePort(port, ComponentType.REMOTE_INPUT_PORT))
- .collect(Collectors.toSet()));
- rpg.setOutputPorts(dto.getContents().getOutputPorts().stream()
- .map(port -> mapRemotePort(port, ComponentType.REMOTE_OUTPUT_PORT))
- .collect(Collectors.toSet()));
- rpg.setPosition(mapPosition(dto.getPosition()));
- rpg.setProxyHost(dto.getProxyHost());
- rpg.setProxyPort(dto.getProxyPort());
- rpg.setProxyUser(dto.getProxyUser());
- rpg.setTargetUri(dto.getTargetUri());
- rpg.setTargetUris(dto.getTargetUris());
- rpg.setTransportProtocol(dto.getTransportProtocol());
- rpg.setYieldDuration(dto.getYieldDuration());
- return rpg;
- }
-
- public VersionedRemoteGroupPort mapRemotePort(final RemoteProcessGroupPortDTO dto, final ComponentType componentType) {
- final VersionedRemoteGroupPort port = new VersionedRemoteGroupPort();
- port.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
- port.setGroupIdentifier(getGroupId(dto.getGroupId()));
- port.setComments(dto.getComments());
- port.setConcurrentlySchedulableTaskCount(dto.getConcurrentlySchedulableTaskCount());
- port.setRemoteGroupId(dto.getGroupId());
- port.setName(dto.getName());
- port.setUseCompression(dto.getUseCompression());
- port.setBatchSize(mapBatchSettings(dto.getBatchSettings()));
- port.setTargetId(dto.getTargetId());
- port.setComponentType(componentType);
- return port;
- }
-
- private BatchSize mapBatchSettings(final BatchSettingsDTO dto) {
- final BatchSize batchSize = new BatchSize();
- batchSize.setCount(dto.getCount());
- batchSize.setDuration(dto.getDuration());
- batchSize.setSize(dto.getSize());
- return batchSize;
- }
-}