You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2022/03/04 12:20:06 UTC
[nifi] branch main updated: NIFI-9754: Introduced VersionedExternalFlow - Updated stateless and StandardProcessGroup, etc. to make use of VersionedExternalFlow - Updated StatelessDataflowDefinition to use ExternalVersionedFlow instead of generic type - Updated Stateless Bootstrap to avoid loading stateless engine libs from root class path but instead use a NarClassLoader to load the statelss nar
This is an automated email from the ASF dual-hosted git repository.
jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 8959226 NIFI-9754: Introduced VersionedExternalFlow - Updated stateless and StandardProcessGroup, etc. to make use of VersionedExternalFlow - Updated StatelessDataflowDefinition to use ExternalVersionedFlow instead of generic type - Updated Stateless Bootstrap to avoid loading stateless engine libs from root class path but instead use a NarClassLoader to load the statelss nar
8959226 is described below
commit 8959226b50cb2f3fc46722f32810ec06037985e4
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon Feb 7 12:40:31 2022 -0500
NIFI-9754: Introduced VersionedExternalFlow
- Updated stateless and StandardProcessGroup, etc. to make use of VersionedExternalFlow
- Updated StatelessDataflowDefinition to use ExternalVersionedFlow instead of generic type
- Updated Stateless Bootstrap to avoid loading stateless engine libs from root class path but instead use a NarClassLoader to load the statelss nar
Signed-off-by: Joe Gresock <jg...@gmail.com>
This closes #5832.
---
.../flow/ExternalControllerServiceReference.java | 2 +-
.../apache/nifi/flow/VersionedExternalFlow.java | 59 ++++++++++
.../nifi/flow/VersionedExternalFlowMetadata.java | 84 ++++++++++++++
.../org/apache/nifi}/flow/VersionedParameter.java | 2 +-
.../nifi}/flow/VersionedParameterContext.java | 4 +-
.../kafka/connect/StatelessKafkaConnectorUtil.java | 2 +-
.../nifi/groups/ProcessGroupSynchronizer.java | 6 +-
.../apache/nifi/groups/StandardProcessGroup.java | 9 +-
.../groups/StandardProcessGroupSynchronizer.java | 13 ++-
.../nifi/registry/flow/RestBasedFlowRegistry.java | 2 +
.../mapping/InstantiatedVersionedProcessGroup.java | 2 +-
.../flow/mapping/NiFiRegistryFlowMapper.java | 6 +-
.../flow/mapping/StandardComparableDataFlow.java | 2 +-
.../nifi/controller/flow/VersionedDataflow.java | 2 +-
.../java/org/apache/nifi/groups/ProcessGroup.java | 8 +-
.../apache/nifi/registry/flow/FlowRegistry.java | 2 +
.../serialization/VersionedDataflowMapper.java | 2 +-
.../serialization/VersionedFlowSynchronizer.java | 16 +--
.../controller/service/mock/MockProcessGroup.java | 8 +-
.../integration/MockSingleFlowRegistryClient.java | 4 +-
.../nifi/integration/versioned/ImportFlowIT.java | 122 +++++++++------------
.../flow/mapping/NiFiRegistryFlowMapperTest.java | 6 +-
.../authorization/AuthorizeParameterReference.java | 4 +-
.../org/apache/nifi/web/NiFiServiceFacade.java | 4 +-
.../apache/nifi/web/StandardNiFiServiceFacade.java | 15 ++-
.../apache/nifi/web/api/FlowUpdateResource.java | 2 +-
.../apache/nifi/web/api/ProcessGroupResource.java | 2 +-
.../org/apache/nifi/web/dao/ProcessGroupDAO.java | 4 +-
.../nifi/web/dao/impl/StandardProcessGroupDAO.java | 6 +-
.../nifi/web/StandardNiFiServiceFacadeTest.java | 4 +-
.../processors/stateless/ExecuteStateless.java | 13 ++-
.../nifi/registry/VersionedFlowConverter.java | 54 +++++++++
.../nifi/registry/flow/VersionedFlowSnapshot.java | 4 +-
.../registry/flow/diff/ComparableDataFlow.java | 2 +-
.../flow/diff/StandardComparableDataFlow.java | 2 +-
.../registry/flow/diff/StandardFlowComparator.java | 4 +-
.../serialization/TestFlowContentSerializer.java | 2 +-
.../web/api/UnsecuredNiFiRegistryClientIT.java | 6 +-
.../nifi/stateless/flow/DataflowDefinition.java | 5 +-
.../stateless/flow/DataflowDefinitionParser.java | 4 +-
.../stateless/flow/StatelessDataflowFactory.java | 4 +-
.../nifi/stateless/bootstrap/RunStatelessFlow.java | 2 +-
.../stateless/bootstrap/StatelessBootstrap.java | 58 +++-------
.../reporting/StatelessReportingContext.java | 4 +-
.../reporting/StatelessReportingTaskNode.java | 4 +-
.../scheduling/StatelessProcessScheduler.java | 2 +-
.../nifi/registry/flow/InMemoryFlowRegistry.java | 55 +++++++---
.../config/PropertiesFileFlowDefinitionParser.java | 14 ++-
.../nifi/stateless/engine/ComponentBuilder.java | 5 +-
.../stateless/engine/StandardStatelessEngine.java | 22 ++--
.../nifi/stateless/engine/StatelessEngine.java | 4 +-
.../stateless/engine/StatelessFlowManager.java | 5 +-
.../stateless/engine/StatelessReloadComponent.java | 8 +-
.../stateless/flow/StandardDataflowDefinition.java | 34 +++---
.../flow/StandardStatelessDataflowFactory.java | 13 +--
.../nifi/stateless/flow/StandardStatelessFlow.java | 4 +-
.../TestPropertiesFileFlowDefinitionParser.java | 2 +-
.../apache/nifi/stateless/StatelessSystemIT.java | 21 +++-
.../nifi/stateless/VersionedFlowBuilder.java | 2 +-
.../stateless/parameters/ParameterContextIT.java | 4 +-
60 files changed, 479 insertions(+), 288 deletions(-)
diff --git a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/ExternalControllerServiceReference.java b/nifi-api/src/main/java/org/apache/nifi/flow/ExternalControllerServiceReference.java
similarity index 97%
rename from nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/ExternalControllerServiceReference.java
rename to nifi-api/src/main/java/org/apache/nifi/flow/ExternalControllerServiceReference.java
index 3bc269e..2e7ba92 100644
--- a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/ExternalControllerServiceReference.java
+++ b/nifi-api/src/main/java/org/apache/nifi/flow/ExternalControllerServiceReference.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.registry.flow;
+package org.apache.nifi.flow;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
diff --git a/nifi-api/src/main/java/org/apache/nifi/flow/VersionedExternalFlow.java b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedExternalFlow.java
new file mode 100644
index 0000000..469ae7e
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedExternalFlow.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.flow;
+
+import java.util.Map;
+
+public class VersionedExternalFlow {
+ private VersionedProcessGroup flowContents;
+ private Map<String, ExternalControllerServiceReference> externalControllerServices;
+ private Map<String, VersionedParameterContext> parameterContexts;
+ private VersionedExternalFlowMetadata metadata;
+
+ public VersionedProcessGroup getFlowContents() {
+ return flowContents;
+ }
+
+ public void setFlowContents(final VersionedProcessGroup flowContents) {
+ this.flowContents = flowContents;
+ }
+
+ public Map<String, ExternalControllerServiceReference> getExternalControllerServices() {
+ return externalControllerServices;
+ }
+
+ public void setExternalControllerServices(final Map<String, ExternalControllerServiceReference> externalControllerServices) {
+ this.externalControllerServices = externalControllerServices;
+ }
+
+ public Map<String, VersionedParameterContext> getParameterContexts() {
+ return parameterContexts;
+ }
+
+ public void setParameterContexts(final Map<String, VersionedParameterContext> parameterContexts) {
+ this.parameterContexts = parameterContexts;
+ }
+
+ public VersionedExternalFlowMetadata getMetadata() {
+ return metadata;
+ }
+
+ public void setMetadata(final VersionedExternalFlowMetadata metadata) {
+ this.metadata = metadata;
+ }
+}
diff --git a/nifi-api/src/main/java/org/apache/nifi/flow/VersionedExternalFlowMetadata.java b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedExternalFlowMetadata.java
new file mode 100644
index 0000000..4486b5e
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedExternalFlowMetadata.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.flow;
+
+public class VersionedExternalFlowMetadata {
+ private String bucketId;
+ private String flowId;
+ private int version;
+ private String flowName;
+ private String author;
+ private String comments;
+ private long timestamp;
+
+ public String getBucketIdentifier() {
+ return bucketId;
+ }
+
+ public void setBucketIdentifier(final String bucketIdentifier) {
+ this.bucketId = bucketIdentifier;
+ }
+
+ public String getFlowIdentifier() {
+ return flowId;
+ }
+
+ public void setFlowIdentifier(final String flowId) {
+ this.flowId = flowId;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(final int version) {
+ this.version = version;
+ }
+
+ public String getFlowName() {
+ return flowName;
+ }
+
+ public void setFlowName(final String flowName) {
+ this.flowName = flowName;
+ }
+
+ public String getAuthor() {
+ return author;
+ }
+
+ public void setAuthor(final String author) {
+ this.author = author;
+ }
+
+ public String getComments() {
+ return comments;
+ }
+
+ public void setComments(final String comments) {
+ this.comments = comments;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(final long timestamp) {
+ this.timestamp = timestamp;
+ }
+}
diff --git a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameter.java b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedParameter.java
similarity index 98%
rename from nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameter.java
rename to nifi-api/src/main/java/org/apache/nifi/flow/VersionedParameter.java
index 857dd16..ea5ee7b 100644
--- a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameter.java
+++ b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedParameter.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.registry.flow;
+package org.apache.nifi.flow;
import io.swagger.annotations.ApiModelProperty;
diff --git a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameterContext.java b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedParameterContext.java
similarity index 94%
rename from nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameterContext.java
rename to nifi-api/src/main/java/org/apache/nifi/flow/VersionedParameterContext.java
index 5cfefcd..a3b1d71 100644
--- a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameterContext.java
+++ b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedParameterContext.java
@@ -14,11 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.registry.flow;
+package org.apache.nifi.flow;
import io.swagger.annotations.ApiModelProperty;
-import org.apache.nifi.flow.ComponentType;
-import org.apache.nifi.flow.VersionedComponent;
import java.util.List;
import java.util.Set;
diff --git a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
index 91e3e52..4e33a48 100644
--- a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
+++ b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
@@ -155,7 +155,7 @@ public class StatelessKafkaConnectorUtil {
final List<ParameterOverride> parameterOverrides = parseParameterOverrides(properties);
final String dataflowName = properties.get(DATAFLOW_NAME);
- final DataflowDefinition<?> dataflowDefinition;
+ final DataflowDefinition dataflowDefinition;
final StatelessBootstrap bootstrap;
try {
final Map<String, String> dataflowDefinitionProperties = new HashMap<>();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java
index 688ea1c..25cea88 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java
@@ -18,18 +18,18 @@
package org.apache.nifi.groups;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedProcessGroup;
-import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
public interface ProcessGroupSynchronizer {
/**
* Synchronize the given Process Group to match the proposed snaphsot
* @param group the Process Group to update
- * @param proposedSnapshot the proposed/desired state for the process group
+ * @param proposedFlow the proposed/desired state for the process group
* @param synchronizationOptions options for how to synchronize the group
*/
- void synchronize(ProcessGroup group, VersionedFlowSnapshot proposedSnapshot, GroupSynchronizationOptions synchronizationOptions) throws ProcessorInstantiationException;
+ void synchronize(ProcessGroup group, VersionedExternalFlow proposedFlow, GroupSynchronizationOptions synchronizationOptions) throws ProcessorInstantiationException;
void verifyCanSynchronize(ProcessGroup group, VersionedProcessGroup proposed, boolean verifyConnectionRemoval);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 3e679c7..041d2cc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -64,6 +64,7 @@ import org.apache.nifi.controller.service.ControllerServiceReference;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.encrypt.PropertyEncryptor;
+import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
@@ -3778,7 +3779,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
@Override
- public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings,
+ public void updateFlow(final VersionedExternalFlow proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings,
final boolean updateDescendantVersionedFlows) {
final ComponentIdGenerator idGenerator = (proposedId, instanceId, destinationGroupId) -> generateUuid(proposedId, destinationGroupId, componentIdSeed);
@@ -3815,7 +3816,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
@Override
- public void synchronizeFlow(final VersionedFlowSnapshot proposedSnapshot, final GroupSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) {
+ public void synchronizeFlow(final VersionedExternalFlow proposedSnapshot, final GroupSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) {
writeLock.lock();
try {
verifyCanUpdate(proposedSnapshot, true, !synchronizationOptions.isIgnoreLocalModifications());
@@ -3924,13 +3925,13 @@ public final class StandardProcessGroup implements ProcessGroup {
}
@Override
- public void verifyCanUpdate(final VersionedFlowSnapshot updatedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) {
+ public void verifyCanUpdate(final VersionedExternalFlow updatedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) {
readLock.lock();
try {
// flow id match and not dirty check concepts are only applicable to versioned flows
final VersionControlInformation versionControlInfo = getVersionControlInformation();
if (versionControlInfo != null) {
- if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getSnapshotMetadata().getFlowIdentifier())) {
+ if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getMetadata().getFlowIdentifier())) {
throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with");
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
index baad3c4..3761f65 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
@@ -48,6 +48,7 @@ import org.apache.nifi.flow.ConnectableComponent;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedFlowCoordinates;
import org.apache.nifi.flow.VersionedFunnel;
import org.apache.nifi.flow.VersionedLabel;
@@ -71,8 +72,8 @@ import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowState;
-import org.apache.nifi.registry.flow.VersionedParameter;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedParameter;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
import org.apache.nifi.registry.flow.diff.DifferenceType;
import org.apache.nifi.registry.flow.diff.FlowComparator;
@@ -142,13 +143,13 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
}
@Override
- public void synchronize(final ProcessGroup group, final VersionedFlowSnapshot proposedSnapshot, final GroupSynchronizationOptions options) {
+ public void synchronize(final ProcessGroup group, final VersionedExternalFlow versionedExternalFlow, final GroupSynchronizationOptions options) {
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(context.getExtensionManager(), context.getFlowMappingOptions());
final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(group, context.getControllerServiceProvider(), context.getFlowRegistryClient(), true);
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Currently Loaded Flow", versionedGroup);
- final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Proposed Flow", proposedSnapshot.getFlowContents());
+ final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Proposed Flow", versionedExternalFlow.getFlowContents());
final PropertyDecryptor decryptor = options.getPropertyDecryptor();
final FlowComparator flowComparator = new StandardFlowComparator(proposedFlow, localFlow, group.getAncestorServiceIds(), new StaticDifferenceDescriptor(), decryptor::decrypt);
@@ -199,7 +200,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
.map(FlowDifference::toString)
.collect(Collectors.joining("\n"));
- LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, proposedSnapshot,
+ LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, versionedExternalFlow,
flowComparison.getDifferences().size(), differencesByLine);
}
@@ -217,7 +218,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
context.getFlowManager().withParameterContextResolution(() -> {
try {
- synchronize(group, proposedSnapshot.getFlowContents(), proposedSnapshot.getParameterContexts());
+ synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts());
} catch (final ProcessorInstantiationException pie) {
throw new RuntimeException(pie);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
index 8ca2c37..97d78f6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
@@ -18,7 +18,9 @@
package org.apache.nifi.registry.flow;
import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.client.BucketClient;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java
index b1d1a13..bcc35ca 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java
@@ -17,7 +17,7 @@
package org.apache.nifi.registry.flow.mapping;
-import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
+import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.VersionedProcessGroup;
import javax.xml.bind.annotation.XmlTransient;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
index 2f0d8db..626544d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
@@ -67,12 +67,12 @@ import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.VariableDescriptor;
-import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
+import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionControlInformation;
-import org.apache.nifi.registry.flow.VersionedParameter;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedParameter;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.RemoteGroupPort;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/StandardComparableDataFlow.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/StandardComparableDataFlow.java
index 2f210a0..c1514ed 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/StandardComparableDataFlow.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/StandardComparableDataFlow.java
@@ -20,7 +20,7 @@ package org.apache.nifi.registry.flow.mapping;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedReportingTask;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
import java.util.Collections;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/VersionedDataflow.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/VersionedDataflow.java
index b7e4a03..b5ab3fe 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/VersionedDataflow.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/VersionedDataflow.java
@@ -20,7 +20,7 @@ package org.apache.nifi.controller.flow;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedReportingTask;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedParameterContext;
import java.util.List;
import java.util.Set;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index f642d36..5d253c5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -33,6 +33,7 @@ import org.apache.nifi.controller.Triggerable;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterUpdate;
@@ -40,7 +41,6 @@ import org.apache.nifi.processor.Processor;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionControlInformation;
-import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.mapping.FlowMappingOptions;
import org.apache.nifi.remote.RemoteGroupPort;
@@ -880,7 +880,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
* @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to
* update the contents of that Process Group
*/
- void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVersionedFlows);
+ void updateFlow(VersionedExternalFlow proposedSnapshot, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVersionedFlows);
/**
* Updates the Process Group to match the proposed flow
@@ -889,7 +889,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
* @param synchronizationOptions options for how the synchronization should occur
* @param flowMappingOptions options for how to map the existing dataflow into Versioned components so that it can be compared to the proposed snapshot
*/
- void synchronizeFlow(VersionedFlowSnapshot proposedSnapshot, GroupSynchronizationOptions synchronizationOptions, FlowMappingOptions flowMappingOptions);
+ void synchronizeFlow(VersionedExternalFlow proposedSnapshot, GroupSynchronizationOptions synchronizationOptions, FlowMappingOptions flowMappingOptions);
/**
* Verifies a template with the specified name can be created.
@@ -976,7 +976,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
*
* @throws IllegalStateException if the Process Group is not in a state that will allow the update
*/
- void verifyCanUpdate(VersionedFlowSnapshot updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty);
+ void verifyCanUpdate(VersionedExternalFlow updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty);
/**
* Ensures that the Process Group can have any local changes reverted
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
index 69b891d..5a7d10e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
@@ -18,6 +18,8 @@
package org.apache.nifi.registry.flow;
import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.flow.ExternalControllerServiceReference;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.client.NiFiRegistryException;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedDataflowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedDataflowMapper.java
index 2c7e784..d6951eb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedDataflowMapper.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedDataflowMapper.java
@@ -36,7 +36,7 @@ import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.mapping.ComponentIdLookup;
import org.apache.nifi.registry.flow.mapping.FlowMappingOptions;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index 2262103..14f2799 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -52,6 +52,9 @@ import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.ScheduledState;
import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedParameter;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedReportingTask;
@@ -69,9 +72,6 @@ import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.persistence.FlowConfigurationArchiveManager;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
-import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
-import org.apache.nifi.registry.flow.VersionedParameter;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
import org.apache.nifi.registry.flow.diff.DifferenceDescriptor;
import org.apache.nifi.registry.flow.diff.FlowComparator;
@@ -298,9 +298,9 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
final Map<String, VersionedParameterContext> versionedParameterContextMap = new HashMap<>();
versionedFlow.getParameterContexts().forEach(context -> versionedParameterContextMap.put(context.getName(), context));
- final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot();
- versionedFlowSnapshot.setParameterContexts(versionedParameterContextMap);
- versionedFlowSnapshot.setFlowContents(versionedFlow.getRootGroup());
+ final VersionedExternalFlow versionedExternalFlow = new VersionedExternalFlow();
+ versionedExternalFlow.setParameterContexts(versionedParameterContextMap);
+ versionedExternalFlow.setFlowContents(versionedFlow.getRootGroup());
// Inherit controller-level components.
inheritControllerServices(controller, versionedFlow, affectedComponentSet);
@@ -313,7 +313,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
final ComponentScheduler componentScheduler = new FlowControllerComponentScheduler(controller);
if (rootGroup.isEmpty()) {
- final VersionedProcessGroup versionedRoot = versionedFlowSnapshot.getFlowContents();
+ final VersionedProcessGroup versionedRoot = versionedExternalFlow.getFlowContents();
rootGroup = controller.getFlowManager().createProcessGroup(versionedRoot.getInstanceIdentifier());
rootGroup.setComments(versionedRoot.getComments());
rootGroup.setPosition(new Position(versionedRoot.getPosition().getX(), versionedRoot.getPosition().getY()));
@@ -350,7 +350,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
.mapControllerServiceReferencesToVersionedId(false)
.build();
- rootGroup.synchronizeFlow(versionedFlowSnapshot, syncOptions, flowMappingOptions);
+ rootGroup.synchronizeFlow(versionedExternalFlow, syncOptions, flowMappingOptions);
// Inherit templates, now that all necessary Process Groups have been created
inheritTemplates(controller, versionedFlow);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index 0c9e711..2a2e641 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -33,6 +33,7 @@ import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.groups.BatchCounts;
import org.apache.nifi.groups.DataValve;
import org.apache.nifi.groups.FlowFileConcurrency;
@@ -48,7 +49,6 @@ import org.apache.nifi.parameter.ParameterUpdate;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionControlInformation;
-import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.mapping.FlowMappingOptions;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.remote.RemoteGroupPort;
@@ -703,7 +703,7 @@ public class MockProcessGroup implements ProcessGroup {
}
@Override
- public void verifyCanUpdate(VersionedFlowSnapshot updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty) {
+ public void verifyCanUpdate(VersionedExternalFlow updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty) {
}
@Override
@@ -715,11 +715,11 @@ public class MockProcessGroup implements ProcessGroup {
}
@Override
- public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVerisonedFlows) {
+ public void updateFlow(VersionedExternalFlow proposedFlow, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVerisonedFlows) {
}
@Override
- public void synchronizeFlow(final VersionedFlowSnapshot proposedSnapshot, final GroupSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) {
+ public void synchronizeFlow(final VersionedExternalFlow proposedSnapshot, final GroupSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) {
}
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/MockSingleFlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/MockSingleFlowRegistryClient.java
index 7ee71b5..e226e09 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/MockSingleFlowRegistryClient.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/MockSingleFlowRegistryClient.java
@@ -19,13 +19,13 @@ package org.apache.nifi.integration;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.client.NiFiRegistryException;
-import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
+import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import java.io.IOException;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
index 97a51b1..92f711e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
@@ -26,9 +26,17 @@ import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.StandardSnippet;
import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedExternalFlowMetadata;
import org.apache.nifi.flow.VersionedFunnel;
+import org.apache.nifi.flow.VersionedParameter;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.integration.DirectInjectionExtensionManager;
import org.apache.nifi.integration.FrameworkIntegrationTest;
@@ -44,15 +52,7 @@ import org.apache.nifi.parameter.StandardParameterContext;
import org.apache.nifi.parameter.StandardParameterReferenceManager;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.registry.bucket.Bucket;
-import org.apache.nifi.flow.Bundle;
-import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedFlow;
-import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
-import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
-import org.apache.nifi.registry.flow.VersionedParameter;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
-import org.apache.nifi.flow.VersionedProcessGroup;
-import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor;
import org.apache.nifi.registry.flow.diff.DifferenceType;
@@ -103,7 +103,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
processor.setAutoTerminatedRelationships(Collections.singleton(REL_SUCCESS));
processor.setProperties(Collections.singletonMap(NopServiceReferencingProcessor.SERVICE.getName(), controllerService.getIdentifier()));
- final VersionedFlowSnapshot proposedFlow = createFlowSnapshot(Collections.singletonList(controllerService), Collections.singletonList(processor), null);
+ final VersionedExternalFlow proposedFlow = createFlowSnapshot(Collections.singletonList(controllerService), Collections.singletonList(processor), null);
// Create an Inner Process Group and update it to match the Versioned Flow.
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -146,8 +146,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final ProcessorNode processor = createProcessorNode(UsernamePasswordProcessor.class);
processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "password"));
- // Create a VersionedFlowSnapshot that contains the processor
- final VersionedFlowSnapshot versionedFlowWithExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), null);
+ // Create a VersionedExternalFlow that contains the processor
+ final VersionedExternalFlow versionedFlowWithExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), null);
// Create child group
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -173,7 +173,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
assertEquals(DifferenceType.PROPERTY_PARAMETERIZED, differences.iterator().next().getDifferenceType());
// Create a Versioned Flow that contains the Parameter Reference.
- final VersionedFlowSnapshot versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), null);
+ final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), null);
// Ensure no difference between the current configuration and the versioned flow
differences = getLocalModifications(innerGroup, versionedFlowWithParameterReference);
@@ -191,9 +191,9 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final ProcessorNode processor = createProcessorNode(UsernamePasswordProcessor.class);
processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}"));
- // Create a VersionedFlowSnapshot that contains the processor
+ // Create a VersionedExternalFlow that contains the processor
final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null);
- final VersionedFlowSnapshot versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), Collections.singleton(parameter));
+ final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), Collections.singleton(parameter));
// Create child group
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -223,9 +223,9 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final ProcessorNode processor = createProcessorNode(UsernamePasswordProcessor.class);
processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}"));
- // Create a VersionedFlowSnapshot that contains the processor
+ // Create a VersionedExternalFlow that contains the processor
final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null);
- final VersionedFlowSnapshot versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), Collections.singleton(parameter));
+ final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), Collections.singleton(parameter));
// Create child group
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -253,15 +253,15 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final ProcessorNode initialProcessor = createProcessorNode(UsernamePasswordProcessor.class);
initialProcessor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}"));
- // Create a VersionedFlowSnapshot that contains the processor
+ // Create a VersionedExternalFlow that contains the processor
final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null);
- final VersionedFlowSnapshot versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(),
+ final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(),
Collections.singletonList(initialProcessor), Collections.singleton(parameter));
// Update processor to have an explicit value for the second version of the flow.
initialProcessor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "secret-value"));
- final VersionedFlowSnapshot versionedFlowExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
+ final VersionedExternalFlow versionedFlowExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
// Create child group and update to the first version of the flow, with parameter ref
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -290,7 +290,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
initialProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "pass");
initialProcessor.setProperties(initialProperties);
- final VersionedFlowSnapshot initialVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
+ final VersionedExternalFlow initialVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
// Update processor to have a different explicit value for both sensitive and non-sensitive properties and create a versioned flow for it.
final Map<String, String> updatedProperties = new HashMap<>();
@@ -298,7 +298,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
updatedProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "pass");
initialProcessor.setProperties(updatedProperties);
- final VersionedFlowSnapshot updatedVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
+ final VersionedExternalFlow updatedVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
// Create child group and update to the first version of the flow, with parameter ref
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -335,7 +335,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
initialProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}");
initialProcessor.setProperties(initialProperties);
- final VersionedFlowSnapshot initialVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
+ final VersionedExternalFlow initialVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
// Update processor to have a different explicit value for both sensitive and non-sensitive properties and create a versioned flow for it.
final Map<String, String> updatedProperties = new HashMap<>();
@@ -343,7 +343,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
updatedProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "#{other-param}");
initialProcessor.setProperties(updatedProperties);
- final VersionedFlowSnapshot updatedVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
+ final VersionedExternalFlow updatedVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
// Create child group and update to the first version of the flow, with parameter ref
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -369,12 +369,12 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
processorWithExplicitValue.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "secret-value"));
- // Create a VersionedFlowSnapshot that contains the processor
+ // Create a VersionedExternalFlow that contains the processor
final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null);
- final VersionedFlowSnapshot versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(),
+ final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(),
Collections.singletonList(processorWithParamRef), Collections.singleton(parameter));
- final VersionedFlowSnapshot versionedFlowExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processorWithExplicitValue), null);
+ final VersionedExternalFlow versionedFlowExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processorWithExplicitValue), null);
// Create child group and update to the first version of the flow, with parameter ref
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -406,7 +406,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
groupA.addInputPort(port);
//Create a snapshot
- final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA);
+ final VersionedExternalFlow version1 = createFlowSnapshot(groupA);
//Create Process Group B under Process Group A
final ProcessGroup groupB = createProcessGroup("group-b-id", "Group B", groupA);
@@ -421,7 +421,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final Connection connection = connect(groupA, processor, port, processor.getRelationships());
//Create another snapshot
- final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA);
+ final VersionedExternalFlow version2 = createFlowSnapshot(groupA);
//Change Process Group A version to Version 1
groupA.updateFlow(version1, null, false, true, true);
@@ -463,7 +463,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final Connection connection = connect(group, processor, port, processor.getRelationships());
//Create a snapshot
- final VersionedFlowSnapshot version1 = createFlowSnapshot(group);
+ final VersionedExternalFlow version1 = createFlowSnapshot(group);
//Create Funnel under Process Group
Funnel funnel = getFlowController().getFlowManager().createFunnel("funnel-id");
@@ -476,7 +476,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
group.removeOutputPort(port);
//Create another snapshot
- final VersionedFlowSnapshot version2 = createFlowSnapshot(group);
+ final VersionedExternalFlow version2 = createFlowSnapshot(group);
//Change Process Group version to Version 1
group.updateFlow(version1, null, false, true, true);
@@ -534,7 +534,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final Connection connection2 = connect(groupA, processor1, inputPort, processor1.getRelationships());
//Create a snapshot
- final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA);
+ final VersionedExternalFlow version1 = createFlowSnapshot(groupA);
//Modify Connection 1 to point to Processor 2
connection1.setDestination(processor2);
@@ -543,7 +543,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
moveOutputPort(outputPort, groupB);
//Create another snapshot
- final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA);
+ final VersionedExternalFlow version2 = createFlowSnapshot(groupA);
//Delete connection 2
groupA.removeConnection(connection2);
@@ -552,7 +552,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
groupB.removeInputPort(inputPort);
//Create another snapshot
- final VersionedFlowSnapshot version3 = createFlowSnapshot(groupA);
+ final VersionedExternalFlow version3 = createFlowSnapshot(groupA);
//Change Process Group version to Version 1
groupA.updateFlow(version1, null, false, true, true);
@@ -612,7 +612,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final Connection connection2 = connect(groupA, processor1, inputPort, processor1.getRelationships());
//Create a snapshot
- final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA);
+ final VersionedExternalFlow version1 = createFlowSnapshot(groupA);
//Modify Connection 1 to point to Processor 2
connection1.setDestination(processor2);
@@ -621,7 +621,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
groupA.removeOutputPort(outputPort);
//Create another snapshot
- final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA);
+ final VersionedExternalFlow version2 = createFlowSnapshot(groupA);
//Delete connection 2
groupA.removeConnection(connection2);
@@ -630,7 +630,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
moveInputPort(inputPort, groupA);
//Create another snapshot
- final VersionedFlowSnapshot version3 = createFlowSnapshot(groupA);
+ final VersionedExternalFlow version3 = createFlowSnapshot(groupA);
//Change Process Group version to Version 1
groupA.updateFlow(version1, null, false, true, true);
@@ -686,10 +686,10 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
}
- private Set<FlowDifference> getLocalModifications(final ProcessGroup processGroup, final VersionedFlowSnapshot versionedFlowSnapshot) {
+ private Set<FlowDifference> getLocalModifications(final ProcessGroup processGroup, final VersionedExternalFlow VersionedExternalFlow) {
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(getFlowController().getExtensionManager());
final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, getFlowController().getControllerServiceProvider(), getFlowController().getFlowRegistryClient(), true);
- final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents();
+ final VersionedProcessGroup registryGroup = VersionedExternalFlow.getFlowContents();
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup);
final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup);
@@ -707,14 +707,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
return differences;
}
- private VersionedFlowSnapshot createFlowSnapshot(final ProcessGroup group, final List<ControllerServiceNode> controllerServices,
+ private VersionedExternalFlow createFlowSnapshot(final ProcessGroup group, final List<ControllerServiceNode> controllerServices,
final List<ProcessorNode> processors, final Set<Parameter> parameters) {
- final VersionedFlowSnapshotMetadata snapshotMetadata = createSnapshotMetadata();
-
- final Bucket bucket = createBucket();
-
- final VersionedFlow flow = createVersionedFlow();
-
createBundle();
final NiFiRegistryFlowMapper flowMapper = new NiFiRegistryFlowMapper(getExtensionManager());
@@ -804,7 +798,14 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
flowContents.setFunnels(versionedFunnels);
flowContents.setConnections(versionedConnections);
- final VersionedFlowSnapshot versionedFlowSnapshot = createVersionedFlowSnapshot(snapshotMetadata, bucket, flow, flowContents);
+ final VersionedExternalFlow externalFlow = new VersionedExternalFlow();
+
+ final VersionedExternalFlowMetadata metadata = new VersionedExternalFlowMetadata();
+ externalFlow.setMetadata(metadata);
+ metadata.setBucketIdentifier("unit-test-bucket");
+ metadata.setFlowIdentifier("unit-test-flow");
+ metadata.setVersion(1);
+ metadata.setFlowName("unit-test-flow");
if (parameters != null) {
final Set<VersionedParameter> versionedParameters = new HashSet<>();
@@ -820,31 +821,22 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final VersionedParameterContext versionedParameterContext = new VersionedParameterContext();
versionedParameterContext.setName("Unit Test Context");
versionedParameterContext.setParameters(versionedParameters);
- versionedFlowSnapshot.setParameterContexts(Collections.singletonMap(versionedParameterContext.getName(), versionedParameterContext));
+ externalFlow.setParameterContexts(Collections.singletonMap(versionedParameterContext.getName(), versionedParameterContext));
flowContents.setParameterContextName("Unit Test Context");
}
- return versionedFlowSnapshot;
+ return externalFlow;
}
- private VersionedFlowSnapshot createFlowSnapshot(final List<ControllerServiceNode> controllerServices, final List<ProcessorNode> processors, final Set<Parameter> parameters) {
+ private VersionedExternalFlow createFlowSnapshot(final List<ControllerServiceNode> controllerServices, final List<ProcessorNode> processors, final Set<Parameter> parameters) {
return createFlowSnapshot(null, controllerServices, processors, parameters);
}
- private VersionedFlowSnapshot createFlowSnapshot(final ProcessGroup group) {
- return createFlowSnapshot(group, Collections.EMPTY_LIST, Collections.EMPTY_LIST, null);
+ private VersionedExternalFlow createFlowSnapshot(final ProcessGroup group) {
+ return createFlowSnapshot(group, Collections.emptyList(), Collections.emptyList(), null);
}
- @NotNull
- private VersionedFlowSnapshot createVersionedFlowSnapshot(VersionedFlowSnapshotMetadata snapshotMetadata, Bucket bucket, VersionedFlow flow, VersionedProcessGroup flowContents) {
- final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot();
- versionedFlowSnapshot.setSnapshotMetadata(snapshotMetadata);
- versionedFlowSnapshot.setBucket(bucket);
- versionedFlowSnapshot.setFlow(flow);
- versionedFlowSnapshot.setFlowContents(flowContents);
- return versionedFlowSnapshot;
- }
@NotNull
private VersionedProcessGroup createFlowContents() {
@@ -882,14 +874,4 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
return bucket;
}
- @NotNull
- private VersionedFlowSnapshotMetadata createSnapshotMetadata() {
- final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
- snapshotMetadata.setAuthor("unit-test");
- snapshotMetadata.setBucketIdentifier("unit-test-bucket");
- snapshotMetadata.setFlowIdentifier("unit-test-flow");
- snapshotMetadata.setTimestamp(System.currentTimeMillis());
- snapshotMetadata.setVersion(1);
- return snapshotMetadata;
- }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java
index a02246a..b5e52ae 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java
@@ -51,7 +51,7 @@ import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.flow.ComponentType;
-import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
+import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.flow.PortType;
@@ -61,8 +61,8 @@ import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedFlowCoordinates;
import org.apache.nifi.flow.VersionedFunnel;
import org.apache.nifi.flow.VersionedLabel;
-import org.apache.nifi.registry.flow.VersionedParameter;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedParameter;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/AuthorizeParameterReference.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/AuthorizeParameterReference.java
index 95d4540..5bd8eed 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/AuthorizeParameterReference.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/AuthorizeParameterReference.java
@@ -25,8 +25,8 @@ import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.parameter.ParameterParser;
import org.apache.nifi.parameter.ParameterTokenList;
-import org.apache.nifi.registry.flow.VersionedParameter;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedParameter;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index feb0fbe..1bbc4d9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -29,10 +29,10 @@ import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.parameter.ParameterContext;
-import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
+import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.web.api.dto.AccessPolicyDTO;
import org.apache.nifi.web.api.dto.AffectedComponentDTO;
import org.apache.nifi.web.api.dto.BulletinBoardDTO;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 9c50aba..066b131 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -98,11 +98,14 @@ import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedConfigurableComponent;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedPropertyDescriptor;
@@ -129,10 +132,10 @@ import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.registry.ComponentVariableRegistry;
+import org.apache.nifi.registry.VersionedFlowConverter;
import org.apache.nifi.registry.authorization.Permissions;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.client.NiFiRegistryException;
-import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.RestBasedFlowRegistry;
@@ -141,7 +144,6 @@ import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedFlowState;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor;
import org.apache.nifi.registry.flow.diff.DifferenceType;
@@ -4919,7 +4921,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public void verifyCanUpdate(final String groupId, final VersionedFlowSnapshot proposedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) {
final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
- group.verifyCanUpdate(proposedFlow, verifyConnectionRemoval, verifyNotDirty);
+ final VersionedExternalFlow externalFlow = VersionedFlowConverter.createVersionedExternalFlow(proposedFlow);
+ group.verifyCanUpdate(externalFlow, verifyConnectionRemoval, verifyNotDirty);
}
@Override
@@ -4936,7 +4939,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// verify that the process group can be updated to the given snapshot. We do not verify that connections can
// be removed, because the flow may still be running, and it only matters that the connections can be removed once the components
// have been stopped.
- group.verifyCanUpdate(versionedFlowSnapshot, false, false);
+ final VersionedExternalFlow externalFlow = VersionedFlowConverter.createVersionedExternalFlow(versionedFlowSnapshot);
+ group.verifyCanUpdate(externalFlow, false, false);
}
@Override
@@ -5347,7 +5351,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public RevisionUpdate<ProcessGroupDTO> update() {
// update the Process Group
- final ProcessGroup updatedProcessGroup = processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings,
+ final VersionedExternalFlow externalFlow = VersionedFlowConverter.createVersionedExternalFlow(proposedFlowSnapshot);
+ processGroupDAO.updateProcessGroupFlow(groupId, externalFlow, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings,
updateDescendantVersionedFlows);
// update the revisions
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
index 97a23f8..bcb0924 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
@@ -31,7 +31,7 @@ import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.ResumeFlowException;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 4061a8c..c04204b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -108,7 +108,7 @@ import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowState;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
index 7d5739a..47abf91 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
@@ -19,8 +19,8 @@ package org.apache.nifi.web.dao;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.VariableRegistryDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
@@ -136,7 +136,7 @@ public interface ProcessGroupDAO {
* update the contents of that Process Group
* @return the process group
*/
- ProcessGroup updateProcessGroupFlow(String groupId, VersionedFlowSnapshot proposedSnapshot, VersionControlInformationDTO versionControlInformation, String componentIdSeed,
+ ProcessGroup updateProcessGroupFlow(String groupId, VersionedExternalFlow proposedSnapshot, VersionControlInformationDTO versionControlInformation, String componentIdSeed,
boolean verifyNotModified, boolean updateSettings, boolean updateDescendantVersionedFlows);
/**
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index 2782624..45ee5c2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -28,6 +28,8 @@ import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.groups.FlowFileConcurrency;
import org.apache.nifi.groups.FlowFileOutboundPolicy;
import org.apache.nifi.groups.ProcessGroup;
@@ -36,8 +38,6 @@ import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
-import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
-import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.web.ResourceNotFoundException;
@@ -424,7 +424,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
}
@Override
- public ProcessGroup updateProcessGroupFlow(final String groupId, final VersionedFlowSnapshot proposedSnapshot, final VersionControlInformationDTO versionControlInformation,
+ public ProcessGroup updateProcessGroupFlow(final String groupId, final VersionedExternalFlow proposedSnapshot, final VersionControlInformationDTO versionControlInformation,
final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
index 616e062..87d80c1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
@@ -40,11 +40,11 @@ import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.nar.ExtensionManager;
-import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
+import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.RestBasedFlowRegistry;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.web.api.dto.DtoFactory;
diff --git a/nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/ExecuteStateless.java b/nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/ExecuteStateless.java
index 3cd7a77..f23af81 100644
--- a/nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/ExecuteStateless.java
+++ b/nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/ExecuteStateless.java
@@ -41,6 +41,7 @@ import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedLabel;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessGroup;
@@ -60,6 +61,7 @@ import org.apache.nifi.processors.stateless.retrieval.CachingDataflowProvider;
import org.apache.nifi.processors.stateless.retrieval.DataflowProvider;
import org.apache.nifi.processors.stateless.retrieval.FileSystemDataflowProvider;
import org.apache.nifi.processors.stateless.retrieval.RegistryDataflowProvider;
+import org.apache.nifi.registry.VersionedFlowConverter;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
@@ -462,7 +464,7 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
final StatelessEngineConfiguration engineConfiguration = createEngineConfiguration(context, dataflowIndex);
final StatelessBootstrap bootstrap = StatelessBootstrap.bootstrap(engineConfiguration, Thread.currentThread().getContextClassLoader());
- final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition = createDataflowDefinition(context, flowSnapshot);
+ final DataflowDefinition dataflowDefinition = createDataflowDefinition(context, flowSnapshot);
final StatelessDataflow dataflow = bootstrap.createDataflow(dataflowDefinition);
dataflow.initialize();
@@ -746,7 +748,8 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
}
- private DataflowDefinition<VersionedFlowSnapshot> createDataflowDefinition(final ProcessContext context, final VersionedFlowSnapshot flowSnapshot) {
+ private DataflowDefinition createDataflowDefinition(final ProcessContext context, final VersionedFlowSnapshot flowSnapshot) {
+ final VersionedExternalFlow externalFlow = VersionedFlowConverter.createVersionedExternalFlow(flowSnapshot);
final ParameterValueProviderDefinition parameterValueProviderDefinition = new ParameterValueProviderDefinition();
parameterValueProviderDefinition.setType("org.apache.nifi.stateless.parameter.OverrideParameterValueProvider");
parameterValueProviderDefinition.setName("Parameter Override");
@@ -783,10 +786,10 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
}
};
- return new DataflowDefinition<VersionedFlowSnapshot>() {
+ return new DataflowDefinition() {
@Override
- public VersionedFlowSnapshot getFlowSnapshot() {
- return flowSnapshot;
+ public VersionedExternalFlow getVersionedExternalFlow() {
+ return externalFlow;
}
@Override
diff --git a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/VersionedFlowConverter.java b/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/VersionedFlowConverter.java
new file mode 100644
index 0000000..37718bc
--- /dev/null
+++ b/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/VersionedFlowConverter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry;
+
+import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedExternalFlowMetadata;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+
+public class VersionedFlowConverter {
+ public static VersionedExternalFlow createVersionedExternalFlow(final VersionedFlowSnapshot flowSnapshot) {
+ final VersionedExternalFlowMetadata externalFlowMetadata = new VersionedExternalFlowMetadata();
+ final VersionedFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata();
+ if (snapshotMetadata != null) {
+ externalFlowMetadata.setAuthor(snapshotMetadata.getAuthor());
+ externalFlowMetadata.setBucketIdentifier(snapshotMetadata.getBucketIdentifier());
+ externalFlowMetadata.setComments(snapshotMetadata.getComments());
+ externalFlowMetadata.setFlowIdentifier(snapshotMetadata.getFlowIdentifier());
+ externalFlowMetadata.setTimestamp(snapshotMetadata.getTimestamp());
+ externalFlowMetadata.setVersion(snapshotMetadata.getVersion());
+ }
+
+ final VersionedFlow versionedFlow = flowSnapshot.getFlow();
+ if (versionedFlow == null) {
+ externalFlowMetadata.setFlowName(flowSnapshot.getFlowContents().getName());
+ } else {
+ externalFlowMetadata.setFlowName(versionedFlow.getName());
+ }
+
+ final VersionedExternalFlow externalFlow = new VersionedExternalFlow();
+ externalFlow.setFlowContents(flowSnapshot.getFlowContents());
+ externalFlow.setExternalControllerServices(flowSnapshot.getExternalControllerServices());
+ externalFlow.setParameterContexts(flowSnapshot.getParameterContexts());
+ externalFlow.setMetadata(externalFlowMetadata);
+
+ return externalFlow;
+ }
+}
diff --git a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java b/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java
index e702f55..a9a48e5 100644
--- a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java
+++ b/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java
@@ -19,6 +19,8 @@ package org.apache.nifi.registry.flow;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
+import org.apache.nifi.flow.ExternalControllerServiceReference;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.bucket.Bucket;
@@ -117,7 +119,7 @@ public class VersionedFlowSnapshot {
@ApiModelProperty(value = "The parameter contexts referenced by process groups in the flow contents. " +
"The mapping is from the name of the context to the context instance, and it is expected that any " +
"context in this map is referenced by at least one process group in this flow.")
- public Map<String,VersionedParameterContext> getParameterContexts() {
+ public Map<String, VersionedParameterContext> getParameterContexts() {
return parameterContexts;
}
diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ComparableDataFlow.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ComparableDataFlow.java
index 5e7470c..b47def5 100644
--- a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ComparableDataFlow.java
+++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ComparableDataFlow.java
@@ -20,7 +20,7 @@ package org.apache.nifi.registry.flow.diff;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedReportingTask;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedParameterContext;
import java.util.Set;
diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardComparableDataFlow.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardComparableDataFlow.java
index 68b112a..44ada2c 100644
--- a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardComparableDataFlow.java
+++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardComparableDataFlow.java
@@ -20,7 +20,7 @@ package org.apache.nifi.registry.flow.diff;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedReportingTask;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedParameterContext;
import java.util.Collections;
import java.util.HashSet;
diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java
index 2664425..02751c6 100644
--- a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java
+++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java
@@ -30,8 +30,8 @@ import org.apache.nifi.flow.VersionedPropertyDescriptor;
import org.apache.nifi.flow.VersionedRemoteGroupPort;
import org.apache.nifi.flow.VersionedRemoteProcessGroup;
import org.apache.nifi.flow.VersionedReportingTask;
-import org.apache.nifi.registry.flow.VersionedParameter;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedParameter;
+import org.apache.nifi.flow.VersionedParameterContext;
import java.util.Collection;
import java.util.Collections;
diff --git a/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowContentSerializer.java b/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowContentSerializer.java
index f598370..8bfcaab 100644
--- a/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowContentSerializer.java
+++ b/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowContentSerializer.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.registry.serialization;
-import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
+import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
diff --git a/nifi-registry/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/UnsecuredNiFiRegistryClientIT.java b/nifi-registry/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/UnsecuredNiFiRegistryClientIT.java
index 18a5756..bc43b8f 100644
--- a/nifi-registry/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/UnsecuredNiFiRegistryClientIT.java
+++ b/nifi-registry/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/UnsecuredNiFiRegistryClientIT.java
@@ -60,12 +60,12 @@ import org.apache.nifi.registry.extension.repo.ExtensionRepoGroup;
import org.apache.nifi.registry.extension.repo.ExtensionRepoVersion;
import org.apache.nifi.registry.extension.repo.ExtensionRepoVersionSummary;
import org.apache.nifi.registry.field.Fields;
-import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
+import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
-import org.apache.nifi.registry.flow.VersionedParameter;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedParameter;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedPropertyDescriptor;
diff --git a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/DataflowDefinition.java b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/DataflowDefinition.java
index 2432684..b8a66cd 100644
--- a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/DataflowDefinition.java
+++ b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/DataflowDefinition.java
@@ -17,6 +17,7 @@
package org.apache.nifi.stateless.flow;
+import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.stateless.config.ParameterContextDefinition;
import org.apache.nifi.stateless.config.ParameterValueProviderDefinition;
import org.apache.nifi.stateless.config.ReportingTaskDefinition;
@@ -24,8 +25,8 @@ import org.apache.nifi.stateless.config.ReportingTaskDefinition;
import java.util.List;
import java.util.Set;
-public interface DataflowDefinition<T> {
- T getFlowSnapshot();
+public interface DataflowDefinition {
+ VersionedExternalFlow getVersionedExternalFlow();
String getFlowName();
diff --git a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/DataflowDefinitionParser.java b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/DataflowDefinitionParser.java
index 821171f..7f0b29b 100644
--- a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/DataflowDefinitionParser.java
+++ b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/DataflowDefinitionParser.java
@@ -27,9 +27,9 @@ import java.util.List;
import java.util.Map;
public interface DataflowDefinitionParser {
- DataflowDefinition<?> parseFlowDefinition(File configurationFile, StatelessEngineConfiguration engineConfiguration, List<ParameterOverride> parameterOverrides)
+ DataflowDefinition parseFlowDefinition(File configurationFile, StatelessEngineConfiguration engineConfiguration, List<ParameterOverride> parameterOverrides)
throws StatelessConfigurationException, IOException;
- DataflowDefinition<?> parseFlowDefinition(Map<String, String> configurationProperties, StatelessEngineConfiguration engineConfiguration, List<ParameterOverride> parameterOverrides)
+ DataflowDefinition parseFlowDefinition(Map<String, String> configurationProperties, StatelessEngineConfiguration engineConfiguration, List<ParameterOverride> parameterOverrides)
throws StatelessConfigurationException, IOException;
}
diff --git a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflowFactory.java b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflowFactory.java
index df61abab..7dd538c 100644
--- a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflowFactory.java
+++ b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflowFactory.java
@@ -22,7 +22,7 @@ import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
import java.io.IOException;
-public interface StatelessDataflowFactory<T> {
- StatelessDataflow createDataflow(StatelessEngineConfiguration statelessEngineConfiguration, DataflowDefinition<T> dataflowDefinition, ClassLoader extensionClassLoader)
+public interface StatelessDataflowFactory {
+ StatelessDataflow createDataflow(StatelessEngineConfiguration statelessEngineConfiguration, DataflowDefinition dataflowDefinition, ClassLoader extensionClassLoader)
throws IOException, StatelessConfigurationException;
}
diff --git a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java
index a4ec889..3a9c181 100644
--- a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java
+++ b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java
@@ -89,7 +89,7 @@ public class RunStatelessFlow {
final long initializeStart = System.currentTimeMillis();
final StatelessBootstrap bootstrap = StatelessBootstrap.bootstrap(engineConfiguration);
- final DataflowDefinition<?> dataflowDefinition = bootstrap.parseDataflowDefinition(flowDefinitionFile, parameterOverrides);
+ final DataflowDefinition dataflowDefinition = bootstrap.parseDataflowDefinition(flowDefinitionFile, parameterOverrides);
final StatelessDataflow dataflow = bootstrap.createDataflow(dataflowDefinition);
dataflow.initialize();
diff --git a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
index 80754ad..58b1d4e 100644
--- a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
+++ b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
@@ -19,6 +19,7 @@ package org.apache.nifi.stateless.bootstrap;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.nar.NarClassLoader;
import org.apache.nifi.nar.NarClassLoaders;
import org.apache.nifi.nar.NarUnpacker;
import org.apache.nifi.nar.SystemBundle;
@@ -67,24 +68,24 @@ public class StatelessBootstrap {
this.engineConfiguration = engineConfiguration;
}
- public <T> StatelessDataflow createDataflow(final DataflowDefinition<T> dataflowDefinition)
+ public StatelessDataflow createDataflow(final DataflowDefinition dataflowDefinition)
throws IOException, StatelessConfigurationException {
- final StatelessDataflowFactory<T> dataflowFactory = getSingleInstance(engineClassLoader, StatelessDataflowFactory.class);
+ final StatelessDataflowFactory dataflowFactory = getSingleInstance(engineClassLoader, StatelessDataflowFactory.class);
final StatelessDataflow dataflow = dataflowFactory.createDataflow(engineConfiguration, dataflowDefinition, extensionClassLoader);
return dataflow;
}
- public DataflowDefinition<?> parseDataflowDefinition(final File flowDefinitionFile, final List<ParameterOverride> parameterOverrides)
+ public DataflowDefinition parseDataflowDefinition(final File flowDefinitionFile, final List<ParameterOverride> parameterOverrides)
throws StatelessConfigurationException, IOException {
final DataflowDefinitionParser dataflowDefinitionParser = getSingleInstance(engineClassLoader, DataflowDefinitionParser.class);
- final DataflowDefinition<?> dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionFile, engineConfiguration, parameterOverrides);
+ final DataflowDefinition dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionFile, engineConfiguration, parameterOverrides);
return dataflowDefinition;
}
- public DataflowDefinition<?> parseDataflowDefinition(final Map<String, String> flowDefinitionProperties, final List<ParameterOverride> parameterOverrides)
+ public DataflowDefinition parseDataflowDefinition(final Map<String, String> flowDefinitionProperties, final List<ParameterOverride> parameterOverrides)
throws StatelessConfigurationException, IOException {
final DataflowDefinitionParser dataflowDefinitionParser = getSingleInstance(engineClassLoader, DataflowDefinitionParser.class);
- final DataflowDefinition<?> dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionProperties, engineConfiguration, parameterOverrides);
+ final DataflowDefinition dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionProperties, engineConfiguration, parameterOverrides);
return dataflowDefinition;
}
@@ -119,47 +120,18 @@ public class StatelessBootstrap {
final long unpackMillis = System.currentTimeMillis() - unpackStart;
logger.info("Unpacked NAR files in {} millis", unpackMillis);
- final File statelessNarWorkingDir = locateStatelessNarWorkingDirectory(extensionsWorkingDir);
- final File statelessNarInf = new File(statelessNarWorkingDir, "NAR-INF");
- final File statelessNarDependencies = new File(statelessNarInf, "bundled-dependencies");
- final File[] statelessNarContents = statelessNarDependencies.listFiles();
- if (statelessNarContents == null || statelessNarContents.length == 0) {
- throw new IOException("Could not access contents of Stateless NAR dependencies at " + statelessNarDependencies);
- }
+ final BlockListClassLoader statelessClassLoader = createExtensionRootClassLoader(narDirectory, rootClassLoader);
- final List<URL> urls = new ArrayList<>();
- final List<String> filenames = new ArrayList<>();
- for (final File dependency : statelessNarContents) {
- final URL url = dependency.toURI().toURL();
- urls.add(url);
- filenames.add(dependency.getName());
+ final File statelessNarWorkingDir = locateStatelessNarWorkingDirectory(extensionsWorkingDir);
+ final NarClassLoader engineClassLoader;
+ try {
+ engineClassLoader = new NarClassLoader(statelessNarWorkingDir, statelessClassLoader);
+ } catch (final ClassNotFoundException e) {
+ throw new IOException("Could not create NarClassLoader for Stateless NAR located at " + statelessNarWorkingDir.getAbsolutePath(), e);
}
- logger.info("Creating Stateless Bootstrap with the following files in the classpath: {}", filenames);
-
- final URL[] urlArray = urls.toArray(new URL[0]);
- final BlockListClassLoader extensionClassLoader = createExtensionRootClassLoader(narDirectory, rootClassLoader);
-
- final Set<String> classesBlockedExtensions = extensionClassLoader.getClassesBlocked();
-
- // For the engine ClassLoader, we also want to block everything that we block for extensions except for the classes
- // that the engine needs specifically (i.e., the classes in the stateless engine nar). We do this because there are some
- // classes that may need to be shared between the bootstrap and the caller. For example, VersionedFlowSnapshot.
- final URLClassLoader engineUrlClassLoader = new URLClassLoader(urlArray, rootClassLoader);
- final Set<String> engineSpecificClassNames = new HashSet<>();
- final Set<String> engineSpecificFiles = new HashSet<>();
- findClassNamesInJars(urls, engineSpecificClassNames, engineSpecificFiles);
-
- final Set<String> classesBlockedEngine = new HashSet<>(classesBlockedExtensions);
- classesBlockedEngine.removeAll(engineSpecificClassNames);
-
- logger.debug("Blocking the following classes from being loaded from parent {} for Engine by Stateless ClassLoaders: {}", engineUrlClassLoader, classesBlockedEngine);
- logger.debug("Blocking the following files from being loaded from parent {} for Engine by Stateless ClassLoaders: {}", engineUrlClassLoader, engineSpecificFiles);
-
- final BlockListClassLoader engineClassLoader = new BlockListClassLoader(engineUrlClassLoader, classesBlockedEngine);
-
Thread.currentThread().setContextClassLoader(engineClassLoader);
- return new StatelessBootstrap(engineClassLoader, extensionClassLoader, engineConfiguration);
+ return new StatelessBootstrap(engineClassLoader, statelessClassLoader, engineConfiguration);
}
/**
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingContext.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingContext.java
index 6ae6760..f0b1d29 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingContext.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingContext.java
@@ -30,10 +30,10 @@ import org.apache.nifi.stateless.engine.StatelessEngine;
import java.util.Map;
public class StatelessReportingContext extends AbstractReportingContext implements ReportingContext {
- private final StatelessEngine<?> statelessEngine;
+ private final StatelessEngine statelessEngine;
private final FlowManager flowManager;
- public StatelessReportingContext(final StatelessEngine<?> statelessEngine, final FlowManager flowManager,
+ public StatelessReportingContext(final StatelessEngine statelessEngine, final FlowManager flowManager,
final Map<PropertyDescriptor, String> properties, final ReportingTask reportingTask,
final VariableRegistry variableRegistry, final ParameterLookup parameterLookup) {
super(reportingTask, statelessEngine.getBulletinRepository(), properties, statelessEngine.getControllerServiceProvider(), parameterLookup, variableRegistry);
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java
index 86e4c21..5a24f37 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java
@@ -35,9 +35,9 @@ import org.apache.nifi.stateless.engine.StatelessEngine;
public class StatelessReportingTaskNode extends AbstractReportingTaskNode implements ReportingTaskNode {
private final FlowManager flowManager;
- private final StatelessEngine<?> statelessEngine;
+ private final StatelessEngine statelessEngine;
- public StatelessReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final StatelessEngine<?> statelessEngine,
+ public StatelessReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final StatelessEngine statelessEngine,
final FlowManager flowManager, final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory,
final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final ExtensionManager extensionManager,
final ValidationTrigger validationTrigger) {
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
index c129249..5b32d28 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
@@ -104,7 +104,7 @@ public class StatelessProcessScheduler implements ProcessScheduler {
}
}
- public void initialize(final ProcessContextFactory processContextFactory, final DataflowDefinition<?> dataflowDefinition) {
+ public void initialize(final ProcessContextFactory processContextFactory, final DataflowDefinition dataflowDefinition) {
this.processContextFactory = processContextFactory;
final String threadNameSuffix = dataflowDefinition.getFlowName() == null ? "" : " for dataflow " + dataflowDefinition.getFlowName();
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java
index a058f59..f72baf3 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java
@@ -18,6 +18,10 @@
package org.apache.nifi.registry.flow;
import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.flow.ExternalControllerServiceReference;
+import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedExternalFlowMetadata;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.client.NiFiRegistryException;
@@ -40,7 +44,7 @@ public class InMemoryFlowRegistry implements FlowRegistry {
private volatile String name;
private volatile String url;
- private final Map<FlowCoordinates, List<VersionedFlowSnapshot>> flowSnapshots = new ConcurrentHashMap<>();
+ private final Map<FlowCoordinates, List<VersionedExternalFlow>> flowSnapshots = new ConcurrentHashMap<>();
@Override
public String getIdentifier() {
@@ -109,9 +113,9 @@ public class InMemoryFlowRegistry implements FlowRegistry {
@Override
public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot,
- final Map<String, ExternalControllerServiceReference> externalControllerServices,
- final Map<String, VersionedParameterContext> parameterContexts, final String comments,
- final int expectedVersion, final NiFiUser user) {
+ final Map<String, ExternalControllerServiceReference> externalControllerServices,
+ final Map<String, VersionedParameterContext> parameterContexts, final String comments,
+ final int expectedVersion, final NiFiUser user) {
throw new UnsupportedOperationException(USER_SPECIFIC_ACTIONS_NOT_SUPPORTED);
}
@@ -133,21 +137,44 @@ public class InMemoryFlowRegistry implements FlowRegistry {
@Override
public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows) throws NiFiRegistryException {
final FlowCoordinates flowCoordinates = new FlowCoordinates(bucketId, flowId);
- final List<VersionedFlowSnapshot> snapshots = flowSnapshots.get(flowCoordinates);
+ final List<VersionedExternalFlow> snapshots = flowSnapshots.get(flowCoordinates);
- final VersionedFlowSnapshot versionedFlowSnapshot = snapshots.stream()
- .filter(snapshot -> snapshot.getSnapshotMetadata().getVersion() == version)
+ final VersionedExternalFlow versionedExternalFlow = snapshots.stream()
+ .filter(snapshot -> snapshot.getMetadata().getVersion() == version)
.findAny()
.orElseThrow(() -> new NiFiRegistryException("Could not find flow: bucketId=" + bucketId + ", flowId=" + flowId + ", version=" + version));
+ final VersionedFlowSnapshot versionedFlowSnapshot = convertToVersionedFlowSnapshot(versionedExternalFlow);
return versionedFlowSnapshot;
}
+ private VersionedFlowSnapshot convertToVersionedFlowSnapshot(final VersionedExternalFlow externalFlow) {
+ final VersionedExternalFlowMetadata externalFlowMetadata = externalFlow.getMetadata();
+
+ final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
+ snapshotMetadata.setBucketIdentifier(externalFlowMetadata.getBucketIdentifier());
+ snapshotMetadata.setVersion(externalFlowMetadata.getVersion());
+ snapshotMetadata.setFlowIdentifier(externalFlowMetadata.getFlowIdentifier());
+
+ final VersionedFlow versionedFlow = new VersionedFlow();
+ versionedFlow.setName(externalFlowMetadata.getFlowName());
+ versionedFlow.setIdentifier(externalFlowMetadata.getFlowIdentifier());
+ versionedFlow.setBucketIdentifier(externalFlowMetadata.getBucketIdentifier());
+
+ final VersionedFlowSnapshot flowSnapshot = new VersionedFlowSnapshot();
+ flowSnapshot.setExternalControllerServices(externalFlow.getExternalControllerServices());
+ flowSnapshot.setFlowContents(externalFlow.getFlowContents());
+ flowSnapshot.setParameterContexts(externalFlow.getParameterContexts());
+ flowSnapshot.setSnapshotMetadata(snapshotMetadata);
+ flowSnapshot.setFlow(versionedFlow);
+
+ return flowSnapshot;
+ }
@Override
public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) {
final FlowCoordinates flowCoordinates = new FlowCoordinates(bucketId, flowId);
- final List<VersionedFlowSnapshot> snapshots = flowSnapshots.get(flowCoordinates);
+ final List<VersionedExternalFlow> snapshots = flowSnapshots.get(flowCoordinates);
final VersionedFlow versionedFlow = new VersionedFlow();
versionedFlow.setBucketIdentifier(bucketId);
@@ -160,8 +187,8 @@ public class InMemoryFlowRegistry implements FlowRegistry {
}
- public synchronized void addFlowSnapshot(final VersionedFlowSnapshot flowSnapshot) {
- final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata();
+ public synchronized void addFlowSnapshot(final VersionedExternalFlow versionedExternalFlow) {
+ final VersionedExternalFlowMetadata metadata = versionedExternalFlow.getMetadata();
final String bucketId;
final String flowId;
final int version;
@@ -177,16 +204,16 @@ public class InMemoryFlowRegistry implements FlowRegistry {
final FlowCoordinates coordinates = new FlowCoordinates(bucketId, flowId);
- final List<VersionedFlowSnapshot> snapshots = flowSnapshots.computeIfAbsent(coordinates, key -> Collections.synchronizedList(new ArrayList<>()));
- final Optional<VersionedFlowSnapshot> optionalSnapshot = snapshots.stream()
- .filter(snapshot -> snapshot.getSnapshotMetadata().getVersion() == version)
+ final List<VersionedExternalFlow> snapshots = flowSnapshots.computeIfAbsent(coordinates, key -> Collections.synchronizedList(new ArrayList<>()));
+ final Optional<VersionedExternalFlow> optionalSnapshot = snapshots.stream()
+ .filter(snapshot -> snapshot.getMetadata().getVersion() == version)
.findAny();
if (optionalSnapshot.isPresent()) {
throw new IllegalStateException("Versioned Flow Snapshot already exists for bucketId=" + bucketId + ", flowId=" + flowId + ", version=" + version);
}
- snapshots.add(flowSnapshot);
+ snapshots.add(versionedExternalFlow);
}
private static class FlowCoordinates {
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/config/PropertiesFileFlowDefinitionParser.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/config/PropertiesFileFlowDefinitionParser.java
index 92f23fa..9221bdb 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/config/PropertiesFileFlowDefinitionParser.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/config/PropertiesFileFlowDefinitionParser.java
@@ -24,7 +24,9 @@ import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
+import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.registry.VersionedFlowConverter;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
@@ -98,13 +100,13 @@ public class PropertiesFileFlowDefinitionParser implements DataflowDefinitionPar
private static final String TRANSACTION_THRESHOLD_TIME = "nifi.stateless.transaction.thresholds.time";
- public DataflowDefinition<VersionedFlowSnapshot> parseFlowDefinition(final File propertiesFile, final StatelessEngineConfiguration engineConfig, final List<ParameterOverride> parameterOverrides)
+ public DataflowDefinition parseFlowDefinition(final File propertiesFile, final StatelessEngineConfiguration engineConfig, final List<ParameterOverride> parameterOverrides)
throws IOException, StatelessConfigurationException {
final Map<String, String> properties = readPropertyValues(propertiesFile);
return parseFlowDefinition(properties, engineConfig, parameterOverrides);
}
- public DataflowDefinition<VersionedFlowSnapshot> parseFlowDefinition(final Map<String, String> properties, final StatelessEngineConfiguration engineConfig,
+ public DataflowDefinition parseFlowDefinition(final Map<String, String> properties, final StatelessEngineConfiguration engineConfig,
final List<ParameterOverride> parameterOverrides) throws IOException, StatelessConfigurationException {
// A common problem is users accidentally including whitespace at the beginning or end of property values.
@@ -115,17 +117,16 @@ public class PropertiesFileFlowDefinitionParser implements DataflowDefinitionPar
final Set<String> failurePortNames = getFailurePortNames(properties);
final VersionedFlowSnapshot flowSnapshot = fetchVersionedFlowSnapshot(properties, engineConfig.getSslContext());
+ final VersionedExternalFlow externalFlow = VersionedFlowConverter.createVersionedExternalFlow(flowSnapshot);
final List<ParameterContextDefinition> parameterContextDefinitions = getParameterContexts(properties);
final List<ReportingTaskDefinition> reportingTaskDefinitions = getReportingTasks(properties);
final List<ParameterValueProviderDefinition> parameterValueProviderDefinitions = getParameterValueProviders(properties, parameterOverrides);
final TransactionThresholds transactionThresholds = getTransactionThresholds(properties);
- final String rootGroupName = flowSnapshot.getFlowContents().getName();
- final String flowName = properties.getOrDefault(FLOW_NAME, rootGroupName);
+ final String flowName = properties.getOrDefault(FLOW_NAME, externalFlow.getMetadata().getFlowName());
return new StandardDataflowDefinition.Builder()
- .flowSnapshot(flowSnapshot)
- .flowName(flowName)
+ .versionedExternalFlow(externalFlow)
.failurePortNames(failurePortNames)
.parameterContexts(parameterContextDefinitions)
.reportingTasks(reportingTaskDefinitions)
@@ -480,6 +481,7 @@ public class PropertiesFileFlowDefinitionParser implements DataflowDefinitionPar
return envValue == null ? "" : envValue;
}
+
private VersionedFlowSnapshot fetchVersionedFlowSnapshot(final Map<String, String> properties, final SslContextDefinition sslContextDefinition)
throws IOException, StatelessConfigurationException {
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java
index f61816c..0bbf66b 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java
@@ -56,7 +56,6 @@ import org.apache.nifi.processor.StandardProcessorInitializationContext;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableRegistry;
-import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.ReportingTask;
@@ -74,14 +73,14 @@ import java.util.Set;
public class ComponentBuilder {
private static final Logger logger = LoggerFactory.getLogger(ComponentBuilder.class);
- private StatelessEngine<VersionedFlowSnapshot> statelessEngine;
+ private StatelessEngine statelessEngine;
private FlowManager flowManager;
private String identifier;
private String type;
private BundleCoordinate bundleCoordinate;
private Set<URL> additionalClassPathUrls;
- public ComponentBuilder statelessEngine(final StatelessEngine<VersionedFlowSnapshot> statelessEngine) {
+ public ComponentBuilder statelessEngine(final StatelessEngine statelessEngine) {
this.statelessEngine = statelessEngine;
return this;
}
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
index d683b63..d66d269 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
@@ -43,6 +43,7 @@ import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.extensions.ExtensionRepository;
+import org.apache.nifi.flow.VersionedExternalFlowMetadata;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.LogRepositoryFactory;
@@ -57,8 +58,6 @@ import org.apache.nifi.processor.StandardValidationContext;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
-import org.apache.nifi.registry.flow.VersionedFlow;
-import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
@@ -98,7 +97,7 @@ import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
-public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSnapshot> {
+public class StandardStatelessEngine implements StatelessEngine {
private static final Logger logger = LoggerFactory.getLogger(StandardStatelessEngine.class);
private static final int CONCURRENT_EXTENSION_DOWNLOADS = 8;
public static final Duration DEFAULT_STATUS_TASK_PERIOD = Duration.of(1, ChronoUnit.MINUTES);
@@ -158,13 +157,14 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
}
@Override
- public StatelessDataflow createFlow(final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition) {
+ public StatelessDataflow createFlow(final DataflowDefinition dataflowDefinition) {
if (!this.initialized) {
throw new IllegalStateException("Cannot create Flow without first initializing Stateless Engine");
}
- final VersionedFlow versionedFlow = dataflowDefinition.getFlowSnapshot().getFlow();
- logger.info("Building Dataflow {}", versionedFlow == null ? "" : versionedFlow.getName());
+ final VersionedExternalFlowMetadata metadata = dataflowDefinition.getVersionedExternalFlow().getMetadata();
+ final String flowName = metadata == null ? "" : metadata.getFlowName();
+ logger.info("Building Dataflow {}", flowName);
loadNecessaryExtensions(dataflowDefinition);
@@ -178,7 +178,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
rootGroup.addProcessGroup(childGroup);
LogRepositoryFactory.purge();
- childGroup.updateFlow(dataflowDefinition.getFlowSnapshot(), "stateless-component-id-seed", false, true, true);
+ childGroup.updateFlow(dataflowDefinition.getVersionedExternalFlow(), "stateless-component-id-seed", false, true, true);
final ParameterValueProvider parameterValueProvider = createParameterValueProvider(dataflowDefinition);
@@ -205,7 +205,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
return dataflow;
}
- private ParameterValueProvider createParameterValueProvider(final DataflowDefinition<?> dataflowDefinition) {
+ private ParameterValueProvider createParameterValueProvider(final DataflowDefinition dataflowDefinition) {
// Create a Provider for each definition
final List<ParameterValueProvider> providers = new ArrayList<>();
for (final ParameterValueProviderDefinition definition : dataflowDefinition.getParameterValueProviderDefinitions()) {
@@ -300,8 +300,8 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
}
}
- private void loadNecessaryExtensions(final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition) {
- final VersionedProcessGroup group = dataflowDefinition.getFlowSnapshot().getFlowContents();
+ private void loadNecessaryExtensions(final DataflowDefinition dataflowDefinition) {
+ final VersionedProcessGroup group = dataflowDefinition.getVersionedExternalFlow().getFlowContents();
final Set<BundleCoordinate> requiredBundles = gatherRequiredBundles(group);
for (final ReportingTaskDefinition reportingTaskDefinition : dataflowDefinition.getReportingTaskDefinitions()) {
@@ -380,7 +380,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
return new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion());
}
- private List<ReportingTaskNode> createReportingTasks(final DataflowDefinition<?> dataflowDefinition) {
+ private List<ReportingTaskNode> createReportingTasks(final DataflowDefinition dataflowDefinition) {
final List<ReportingTaskNode> reportingTaskNodes = new ArrayList<>();
for (final ReportingTaskDefinition taskDefinition : dataflowDefinition.getReportingTaskDefinitions()) {
final ReportingTaskNode taskNode = createReportingTask(taskDefinition);
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
index d6533d7..941aca2 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
@@ -37,11 +37,11 @@ import org.apache.nifi.stateless.flow.StatelessDataflow;
import java.time.Duration;
-public interface StatelessEngine<T> {
+public interface StatelessEngine {
void initialize(StatelessEngineInitializationContext initializationContext);
- StatelessDataflow createFlow(DataflowDefinition<T> dataflowDefinition);
+ StatelessDataflow createFlow(DataflowDefinition dataflowDefinition);
ExtensionManager getExtensionManager();
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
index 6cd43b9..83727b7 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
@@ -65,7 +65,6 @@ import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.parameter.ParameterContextManager;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.StandardProcessContext;
-import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.remote.StandardRemoteProcessGroup;
import org.apache.nifi.reporting.BulletinRepository;
@@ -91,12 +90,12 @@ import static java.util.Objects.requireNonNull;
public class StatelessFlowManager extends AbstractFlowManager implements FlowManager {
private static final Logger logger = LoggerFactory.getLogger(StatelessFlowManager.class);
- private final StatelessEngine<VersionedFlowSnapshot> statelessEngine;
+ private final StatelessEngine statelessEngine;
private final SSLContext sslContext;
private final BulletinRepository bulletinRepository;
public StatelessFlowManager(final FlowFileEventRepository flowFileEventRepository, final ParameterContextManager parameterContextManager,
- final StatelessEngine<VersionedFlowSnapshot> statelessEngine, final BooleanSupplier flowInitializedCheck,
+ final StatelessEngine statelessEngine, final BooleanSupplier flowInitializedCheck,
final SSLContext sslContext, final BulletinRepository bulletinRepository) {
super(flowFileEventRepository, parameterContextManager, statelessEngine.getFlowRegistryClient(), flowInitializedCheck);
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java
index 48a0dfd..831bf71 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java
@@ -29,7 +29,6 @@ import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.TerminationAwareLogger;
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
-import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.service.ControllerServiceInvocationHandler;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.StandardConfigurationContext;
@@ -40,7 +39,6 @@ import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardProcessContext;
-import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
@@ -52,9 +50,9 @@ import java.util.Set;
public class StatelessReloadComponent implements ReloadComponent {
private static final Logger logger = LoggerFactory.getLogger(StatelessReloadComponent.class);
- private final StatelessEngine<VersionedFlowSnapshot> statelessEngine;
+ private final StatelessEngine statelessEngine;
- public StatelessReloadComponent(final StatelessEngine<VersionedFlowSnapshot> statelessEngine) {
+ public StatelessReloadComponent(final StatelessEngine statelessEngine) {
this.statelessEngine = statelessEngine;
}
@@ -169,7 +167,7 @@ public class StatelessReloadComponent implements ReloadComponent {
}
@Override
- public void reload(final ReportingTaskNode existingNode, final String newType, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls) throws ReportingTaskInstantiationException {
+ public void reload(final ReportingTaskNode existingNode, final String newType, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls) {
if (existingNode == null) {
throw new IllegalStateException("Existing ReportingTaskNode cannot be null");
}
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardDataflowDefinition.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardDataflowDefinition.java
index e1bb1e6..25e1b0a 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardDataflowDefinition.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardDataflowDefinition.java
@@ -19,10 +19,10 @@ package org.apache.nifi.stateless.flow;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
-import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.stateless.config.ParameterContextDefinition;
import org.apache.nifi.stateless.config.ParameterValueProviderDefinition;
import org.apache.nifi.stateless.config.ReportingTaskDefinition;
@@ -35,33 +35,31 @@ import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
-public class StandardDataflowDefinition implements DataflowDefinition<VersionedFlowSnapshot> {
- private final VersionedFlowSnapshot flowSnapshot;
+public class StandardDataflowDefinition implements DataflowDefinition {
+ private final VersionedExternalFlow versionedExternalFlow;
private final Set<String> failurePortNames;
private final List<ParameterContextDefinition> parameterContexts;
private final List<ReportingTaskDefinition> reportingTaskDefinitions;
private final List<ParameterValueProviderDefinition> parameterValueProviderDefinitions;
private final TransactionThresholds transactionThresholds;
- private final String flowName;
private StandardDataflowDefinition(final Builder builder) {
- flowSnapshot = requireNonNull(builder.flowSnapshot, "Flow Snapshot must be provided");
+ versionedExternalFlow = requireNonNull(builder.versionedExternalFlow, "Flow Snapshot must be provided");
failurePortNames = builder.failurePortNames == null ? Collections.emptySet() : builder.failurePortNames;
parameterContexts = builder.parameterContexts == null ? Collections.emptyList() : builder.parameterContexts;
reportingTaskDefinitions = builder.reportingTaskDefinitions == null ? Collections.emptyList() : builder.reportingTaskDefinitions;
transactionThresholds = builder.transactionThresholds == null ? TransactionThresholds.SINGLE_FLOWFILE : builder.transactionThresholds;
parameterValueProviderDefinitions = builder.parameterValueProviderDefinitions == null ? Collections.emptyList() : builder.parameterValueProviderDefinitions;
- flowName = builder.flowName;
}
@Override
- public VersionedFlowSnapshot getFlowSnapshot() {
- return flowSnapshot;
+ public VersionedExternalFlow getVersionedExternalFlow() {
+ return versionedExternalFlow;
}
@Override
public String getFlowName() {
- return flowName;
+ return versionedExternalFlow.getMetadata().getFlowName();
}
@Override
@@ -71,14 +69,14 @@ public class StandardDataflowDefinition implements DataflowDefinition<VersionedF
@Override
public Set<String> getInputPortNames() {
- return flowSnapshot.getFlowContents().getInputPorts().stream()
+ return versionedExternalFlow.getFlowContents().getInputPorts().stream()
.map(VersionedPort::getName)
.collect(Collectors.toSet());
}
@Override
public Set<String> getOutputPortNames() {
- return flowSnapshot.getFlowContents().getOutputPorts().stream()
+ return versionedExternalFlow.getFlowContents().getOutputPorts().stream()
.map(VersionedPort::getName)
.collect(Collectors.toSet());
}
@@ -105,7 +103,7 @@ public class StandardDataflowDefinition implements DataflowDefinition<VersionedF
public Set<Bundle> getReferencedBundles() {
final Set<Bundle> referenced = new HashSet<>();
- final VersionedProcessGroup rootGroup = flowSnapshot.getFlowContents();
+ final VersionedProcessGroup rootGroup = versionedExternalFlow.getFlowContents();
discoverReferencedBundles(rootGroup, referenced);
return referenced;
}
@@ -125,21 +123,15 @@ public class StandardDataflowDefinition implements DataflowDefinition<VersionedF
}
public static class Builder {
- private VersionedFlowSnapshot flowSnapshot;
+ private VersionedExternalFlow versionedExternalFlow;
private Set<String> failurePortNames;
private List<ParameterContextDefinition> parameterContexts;
private List<ReportingTaskDefinition> reportingTaskDefinitions;
private List<ParameterValueProviderDefinition> parameterValueProviderDefinitions;
private TransactionThresholds transactionThresholds;
- private String flowName;
- public Builder flowSnapshot(final VersionedFlowSnapshot flowSnapshot) {
- this.flowSnapshot = flowSnapshot;
- return this;
- }
-
- public Builder flowName(final String flowName) {
- this.flowName = flowName;
+ public Builder versionedExternalFlow(final VersionedExternalFlow versionedExternalFlow) {
+ this.versionedExternalFlow = versionedExternalFlow;
return this;
}
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
index 459aacb..d95c9fe 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
@@ -52,7 +52,6 @@ import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.InMemoryFlowRegistry;
import org.apache.nifi.registry.flow.StandardFlowRegistryClient;
-import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.stateless.bootstrap.ExtensionDiscovery;
@@ -88,17 +87,15 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-public class StandardStatelessDataflowFactory implements StatelessDataflowFactory<VersionedFlowSnapshot> {
+public class StandardStatelessDataflowFactory implements StatelessDataflowFactory {
private static final Logger logger = LoggerFactory.getLogger(StandardStatelessDataflowFactory.class);
- @Override
- public StatelessDataflow createDataflow(final StatelessEngineConfiguration engineConfiguration, final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition,
+
+ public StatelessDataflow createDataflow(final StatelessEngineConfiguration engineConfiguration, final DataflowDefinition dataflowDefinition,
final ClassLoader extensionRootClassLoader)
throws IOException, StatelessConfigurationException {
final long start = System.currentTimeMillis();
- final VersionedFlowSnapshot flowSnapshot = dataflowDefinition.getFlowSnapshot();
-
ProvenanceRepository provenanceRepo = null;
ContentRepository contentRepo = null;
StatelessProcessScheduler processScheduler = null;
@@ -114,7 +111,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
}
final InMemoryFlowRegistry flowRegistry = new InMemoryFlowRegistry();
- flowRegistry.addFlowSnapshot(flowSnapshot);
+ flowRegistry.addFlowSnapshot(dataflowDefinition.getVersionedExternalFlow());
final FlowRegistryClient flowRegistryClient = new StandardFlowRegistryClient();
flowRegistryClient.addFlowRegistry(flowRegistry);
@@ -184,7 +181,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath());
}
- final StatelessEngine<VersionedFlowSnapshot> statelessEngine = new StandardStatelessEngine.Builder()
+ final StatelessEngine statelessEngine = new StandardStatelessEngine.Builder()
.bulletinRepository(bulletinRepository)
.encryptor(lazyInitializedEncryptor)
.extensionManager(extensionManager)
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index ff03473..c335ae7 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -107,7 +107,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
private final ProcessContextFactory processContextFactory;
private final RepositoryContextFactory repositoryContextFactory;
private final List<FlowFileQueue> internalFlowFileQueues;
- private final DataflowDefinition<?> dataflowDefinition;
+ private final DataflowDefinition dataflowDefinition;
private final StatelessStateManagerProvider stateManagerProvider;
private final ObjectMapper objectMapper = new ObjectMapper();
private final ProcessScheduler processScheduler;
@@ -122,7 +122,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
private volatile Boolean stateful = null;
public StandardStatelessFlow(final ProcessGroup rootGroup, final List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider controllerServiceProvider,
- final ProcessContextFactory processContextFactory, final RepositoryContextFactory repositoryContextFactory, final DataflowDefinition<?> dataflowDefinition,
+ final ProcessContextFactory processContextFactory, final RepositoryContextFactory repositoryContextFactory, final DataflowDefinition dataflowDefinition,
final StatelessStateManagerProvider stateManagerProvider, final ProcessScheduler processScheduler, final BulletinRepository bulletinRepository) {
this.rootGroup = rootGroup;
this.allConnections = rootGroup.findAllConnections();
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java
index f178b5f..2ba428a 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java
@@ -45,7 +45,7 @@ public class TestPropertiesFileFlowDefinitionParser {
final List<ParameterOverride> parameterOverrides = new ArrayList<>();
final StatelessEngineConfiguration engineConfig = createStatelessEngineConfiguration();
- final DataflowDefinition<?> dataflowDefinition = parser.parseFlowDefinition(new File("src/test/resources/flow-configuration.properties"), engineConfig, parameterOverrides);
+ final DataflowDefinition dataflowDefinition = parser.parseFlowDefinition(new File("src/test/resources/flow-configuration.properties"), engineConfig, parameterOverrides);
assertEquals(new HashSet<>(Arrays.asList("foo", "bar", "baz")), dataflowDefinition.getFailurePortNames());
final List<ParameterContextDefinition> contextDefinitions = dataflowDefinition.getParameterContexts();
diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
index 697dd05..f0df861 100644
--- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
+++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
@@ -19,7 +19,9 @@ package org.apache.nifi.stateless;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.registry.VersionedFlowConverter;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.stateless.bootstrap.StatelessBootstrap;
import org.apache.nifi.stateless.config.ExtensionClientDefinition;
@@ -165,12 +167,21 @@ public class StatelessSystemIT {
protected StatelessDataflow loadDataflow(final VersionedFlowSnapshot versionedFlowSnapshot, final List<ParameterContextDefinition> parameterContexts,
final List<ParameterValueProviderDefinition> parameterValueProviderDefinitions, final Set<String> failurePortNames,
+ final TransactionThresholds transactionThresholds)
+ throws IOException, StatelessConfigurationException {
+
+ final VersionedExternalFlow externalFlow = VersionedFlowConverter.createVersionedExternalFlow(versionedFlowSnapshot);
+ return loadDataflow(externalFlow, parameterContexts, parameterValueProviderDefinitions, failurePortNames, transactionThresholds);
+ }
+
+ protected StatelessDataflow loadDataflow(final VersionedExternalFlow versionedExternalFlow, final List<ParameterContextDefinition> parameterContexts,
+ final List<ParameterValueProviderDefinition> parameterValueProviderDefinitions, final Set<String> failurePortNames,
final TransactionThresholds transactionThresholds) throws IOException, StatelessConfigurationException {
- final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition = new DataflowDefinition<VersionedFlowSnapshot>() {
+ final DataflowDefinition dataflowDefinition = new DataflowDefinition() {
@Override
- public VersionedFlowSnapshot getFlowSnapshot() {
- return versionedFlowSnapshot;
+ public VersionedExternalFlow getVersionedExternalFlow() {
+ return versionedExternalFlow;
}
@Override
@@ -185,14 +196,14 @@ public class StatelessSystemIT {
@Override
public Set<String> getInputPortNames() {
- return versionedFlowSnapshot.getFlowContents().getInputPorts().stream()
+ return versionedExternalFlow.getFlowContents().getInputPorts().stream()
.map(VersionedPort::getName)
.collect(Collectors.toSet());
}
@Override
public Set<String> getOutputPortNames() {
- return versionedFlowSnapshot.getFlowContents().getOutputPorts().stream()
+ return versionedExternalFlow.getFlowContents().getOutputPorts().stream()
.map(VersionedPort::getName)
.collect(Collectors.toSet());
}
diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/VersionedFlowBuilder.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/VersionedFlowBuilder.java
index 597d301..07381da 100644
--- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/VersionedFlowBuilder.java
+++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/VersionedFlowBuilder.java
@@ -29,7 +29,7 @@ import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/parameters/ParameterContextIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/parameters/ParameterContextIT.java
index 3570fcb..5eeb6e7 100644
--- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/parameters/ParameterContextIT.java
+++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/parameters/ParameterContextIT.java
@@ -20,8 +20,8 @@ package org.apache.nifi.stateless.parameters;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
-import org.apache.nifi.registry.flow.VersionedParameter;
-import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedParameter;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;