You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:14:03 UTC
[15/50] nifi git commit: NIFI-4436: - Initial checkpoint: able ot
start version control and detect changes, in standalone mode,
still 'crude' implementation - Checkpoint: Can place flow under version
control and can determine if modified - Checkpoint: Ch
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
index 94e0745..a91ce97 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
@@ -18,6 +18,7 @@ package org.apache.nifi.connectable;
import org.apache.nifi.authorization.resource.ComponentAuthorizable;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.VersionedComponent;
import org.apache.nifi.controller.Triggerable;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessSession;
@@ -32,11 +33,12 @@ import java.util.concurrent.TimeUnit;
/**
* Represents a connectable component to which or from which data can flow.
*/
-public interface Connectable extends Triggerable, ComponentAuthorizable, Positionable {
+public interface Connectable extends Triggerable, ComponentAuthorizable, Positionable, VersionedComponent {
/**
* @return the unique identifier for this <code>Connectable</code>
*/
+ @Override
String getIdentifier();
/**
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java
index acdcec6..423f52d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java
@@ -17,6 +17,7 @@
package org.apache.nifi.connectable;
import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.components.VersionedComponent;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.groups.ProcessGroup;
@@ -27,7 +28,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Set;
-public interface Connection extends Authorizable {
+public interface Connection extends Authorizable, VersionedComponent {
void enqueue(FlowFileRecord flowFile);
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
index 7190fd4..0240648 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
@@ -42,6 +42,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -81,6 +82,7 @@ public abstract class AbstractPort implements Port {
private final AtomicReference<String> penalizationPeriod;
private final AtomicReference<String> yieldPeriod;
private final AtomicReference<String> schedulingPeriod;
+ private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
private final AtomicLong schedulingNanos;
private final AtomicLong yieldExpiration;
private final ProcessScheduler processScheduler;
@@ -635,4 +637,27 @@ public abstract class AbstractPort implements Port {
@Override
public void verifyCanClearState() {
}
+
+ @Override
+ public Optional<String> getVersionedComponentId() {
+ return Optional.ofNullable(versionedComponentId.get());
+ }
+
+ @Override
+ public void setVersionedComponentId(final String versionedComponentId) {
+ boolean updated = false;
+ while (!updated) {
+ final String currentId = this.versionedComponentId.get();
+
+ if (currentId == null) {
+ updated = this.versionedComponentId.compareAndSet(null, versionedComponentId);
+ } else if (currentId.equals(versionedComponentId)) {
+ return;
+ } else if (versionedComponentId == null) {
+ updated = this.versionedComponentId.compareAndSet(currentId, null);
+ } else {
+ throw new IllegalStateException(this + " is already under version control");
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
index 34ffbac..4b3507c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
@@ -43,6 +43,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -77,6 +78,7 @@ public class StandardFunnel implements Funnel {
private final AtomicBoolean lossTolerant;
private final AtomicReference<ScheduledState> scheduledState;
private final AtomicLong yieldExpiration;
+ private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
@@ -557,4 +559,27 @@ public class StandardFunnel implements Funnel {
public String getComponentType() {
return "Funnel";
}
+
+ @Override
+ public Optional<String> getVersionedComponentId() {
+ return Optional.ofNullable(versionedComponentId.get());
+ }
+
+ @Override
+ public void setVersionedComponentId(final String versionedComponentId) {
+ boolean updated = false;
+ while (!updated) {
+ final String currentId = this.versionedComponentId.get();
+
+ if (currentId == null) {
+ updated = this.versionedComponentId.compareAndSet(null, versionedComponentId);
+ } else if (currentId.equals(versionedComponentId)) {
+ return;
+ } else if (versionedComponentId == null) {
+ updated = this.versionedComponentId.compareAndSet(currentId, null);
+ } else {
+ throw new IllegalStateException(this + " is already under version control");
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/label/Label.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/label/Label.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/label/Label.java
index bc1be00..d463725 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/label/Label.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/label/Label.java
@@ -17,14 +17,16 @@
package org.apache.nifi.controller.label;
import org.apache.nifi.authorization.resource.ComponentAuthorizable;
+import org.apache.nifi.components.VersionedComponent;
import org.apache.nifi.connectable.Positionable;
import org.apache.nifi.connectable.Size;
import org.apache.nifi.groups.ProcessGroup;
import java.util.Map;
-public interface Label extends ComponentAuthorizable, Positionable {
+public interface Label extends ComponentAuthorizable, Positionable, VersionedComponent {
+ @Override
String getIdentifier();
Map<String, String> getStyle();
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index 3dd1076..2f28963 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.controller.service;
+import org.apache.nifi.components.VersionedComponent;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.LoggableComponent;
@@ -26,7 +27,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
-public interface ControllerServiceNode extends ConfiguredComponent {
+public interface ControllerServiceNode extends ConfiguredComponent, VersionedComponent {
/**
* @return the Process Group that this Controller Service belongs to, or <code>null</code> if the Controller Service
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index 0baba23..8934788 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.groups;
+import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -24,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import org.apache.nifi.authorization.resource.ComponentAuthorizable;
+import org.apache.nifi.components.VersionedComponent;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
@@ -39,6 +41,10 @@ import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.registry.ComponentVariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.UnknownResourceException;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.remote.RemoteGroupPort;
/**
@@ -50,7 +56,7 @@ import org.apache.nifi.remote.RemoteGroupPort;
* <p>
* MUST BE THREAD-SAFE</p>
*/
-public interface ProcessGroup extends ComponentAuthorizable, Positionable {
+public interface ProcessGroup extends ComponentAuthorizable, Positionable, VersionedComponent {
/**
* Predicate for filtering schedulable Processors.
@@ -772,6 +778,17 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable {
void move(final Snippet snippet, final ProcessGroup destination);
/**
+ * Updates the Process Group to match the proposed flow
+ *
+ * @param proposedSnapshot the proposed flow
+ * @param componentIdSeed a seed value to use when generating ID's for new components
+ * @param verifyNotDirty whether or not to verify that the Process Group is not 'dirty'. If this value is <code>true</code>,
+ * and the Process Group has been modified since it was last synchronized with the Flow Registry, then this method will
+ * throw an IllegalStateException
+ */
+ void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty);
+
+ /**
* Verifies a template with the specified name can be created.
*
* @param name name of the template
@@ -832,6 +849,18 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable {
void verifyCanUpdateVariables(Map<String, String> updatedVariables);
/**
+ * Ensure that the contents of the Process Group can be update to match the given new flow
+ *
+ * @param updatedFlow the updated version of the flow
+ * @param verifyConnectionRemoval whether or not to verify that connections that are not present in the updated flow can be removed
+ * @param verifyNotDirty whether or not to verify that the Process Group is not 'dirty'. If <code>true</code> and the Process Group has been changed since
+ * it was last synchronized with the FlowRegistry, then this method will throw an IllegalStateException
+ *
+ * @throws IllegalStateException if the Process Group is not in a state that will allow the update
+ */
+ void verifyCanUpdate(VersionedFlowSnapshot updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty);
+
+ /**
* Adds the given template to this Process Group
*
* @param template the template to add
@@ -894,4 +923,30 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable {
* @return a set of all components that are affected by the variable with the given name
*/
Set<ConfiguredComponent> getComponentsAffectedByVariable(String variableName);
+
+ /**
+ * @return the version control information that indicates where this flow is stored in a Flow Registry,
+ * or <code>null</code> if this Process Group is not under version control.
+ */
+ VersionControlInformation getVersionControlInformation();
+
+ /**
+ * Updates the Version Control Information for this Process Group
+ *
+ * @param versionControlInformation specification of where the flow is tracked in Version Control
+ * @param versionedComponentIds a mapping of component ID's to Versioned Component ID's. This is used to update the components in the
+ * Process Group so that the components that exist in the Process Group can be associated with the corresponding components in the
+ * Version Controlled flow
+ */
+ void setVersionControlInformation(VersionControlInformation versionControlInformation, Map<String, String> versionedComponentIds);
+
+ /**
+ * Synchronizes the Process Group with the given Flow Registry, determining whether or not the local flow
+ * is up to date with the newest version of the flow in the Registry and whether or not the local flow has been
+ * modified since it was last synced with the Flow Registry. If this Process Group is not under Version Control,
+ * this method will have no effect.
+ *
+ * @param flowRegistry the Flow Registry to synchronize with
+ */
+ void synchronizeWithFlowRegistry(FlowRegistryClient flowRegistry);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index 79b9509..e4da31b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -18,6 +18,7 @@ package org.apache.nifi.groups;
import org.apache.nifi.authorization.resource.ComponentAuthorizable;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.VersionedComponent;
import org.apache.nifi.connectable.Positionable;
import org.apache.nifi.controller.exception.CommunicationsException;
import org.apache.nifi.events.EventReporter;
@@ -30,7 +31,7 @@ import java.util.Date;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable {
+public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable, VersionedComponent {
@Override
String getIdentifier();
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java
index b797749..2f9a9fd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java
@@ -40,6 +40,11 @@ public interface RemoteProcessGroupPortDescriptor {
String getTargetId();
/**
+ * @return the ID corresponding to the component that is under version control
+ */
+ String getVersionedComponentId();
+
+ /**
* @return id of the remote process group that this port resides in
*/
String getGroupId();
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
new file mode 100644
index 0000000..a5bb738
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+import java.io.IOException;
+
+public interface FlowRegistry {
+
+ /**
+ * @return the URL of the Flow Registry
+ */
+ String getURL();
+
+ /**
+ * Registers the given Versioned Flow with the Flow Registry
+ *
+ * @param flow the Versioned Flow to add to the registry
+ * @return the fully populated VersionedFlow
+ *
+ * @throws NullPointerException if the VersionedFlow is null, or if its bucket identifier or name is null
+ * @throws UnknownResourceException if the bucket id does not exist
+ */
+ VersionedFlow registerVersionedFlow(VersionedFlow flow) throws IOException, UnknownResourceException;
+
+ /**
+ * Adds the given snapshot to the Flow Registry for the given flow
+ *
+ * @param flow the Versioned Flow
+ * @param snapshot the snapshot of the flow
+ * @param comments any comments for the snapshot
+ * @return the versioned flow snapshot
+ *
+ * @throws IOException if unable to communicate with the registry
+ * @throws NullPointerException if the VersionedFlow is null, or if its bucket identifier is null, or if the flow to snapshot is null
+ * @throws UnknownResourceException if the flow does not exist
+ */
+ VersionedFlowSnapshot registerVersionedFlowSnapshot(VersionedFlow flow, VersionedProcessGroup snapshot, String comments) throws IOException, UnknownResourceException;
+
+ /**
+ * Returns the latest (most recent) version of the Flow in the Flow Registry for the given bucket and flow
+ *
+ * @param bucketId the ID of the bucket
+ * @param flowId the ID of the flow
+ * @return the latest version of the Flow
+ *
+ * @throws IOException if unable to communicate with the Flow Registry
+ * @throws UnknownResourceException if unable to find the bucket with the given ID or the flow with the given ID
+ */
+ int getLatestVersion(String bucketId, String flowId) throws IOException, UnknownResourceException;
+
+ /**
+ * Retrieves the contents of the Flow with the given Bucket ID, Flow ID, and version, from the Flow Registry
+ *
+ * @param bucketId the ID of the bucket
+ * @param flowId the ID of the flow
+ * @param version the version to retrieve
+ * @return the contents of the Flow from the Flow Registry
+ *
+ * @throws IOException if unable to communicate with the Flow Registry
+ * @throws UnknownResourceException if unable to find the contents of the flow due to the bucket or flow not existing,
+ * or the specified version of the flow not existing
+ * @throws NullPointerException if any of the arguments is not specified
+ * @throws IllegalArgumentException if the given version is less than 1
+ */
+ VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version) throws IOException, UnknownResourceException;
+
+ /**
+ * Retrieves a VersionedFlow by bucket id and flow id
+ *
+ * @param bucketId the ID of the bucket
+ * @param flowId the ID of the flow
+ * @return the VersionedFlow for the given bucket and flow ID's
+ *
+ * @throws IOException if unable to communicate with the Flow Registry
+ * @throws UnknownResourceException if unable to find a flow with the given bucket ID and flow ID
+ */
+ VersionedFlow getVersionedFlow(String bucketId, String flowId) throws IOException, UnknownResourceException;
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
new file mode 100644
index 0000000..83f66dc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+import java.util.Set;
+
+public interface FlowRegistryClient {
+ FlowRegistry getFlowRegistry(String registryId);
+
+ default String getFlowRegistryId(String registryUrl) {
+ for (final String registryClientId : getRegistryIdentifiers()) {
+ final FlowRegistry registry = getFlowRegistry(registryClientId);
+ if (registry.getURL().equals(registryUrl)) {
+ return registryClientId;
+ }
+ }
+
+ return null;
+ }
+
+ Set<String> getRegistryIdentifiers();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/UnknownResourceException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/UnknownResourceException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/UnknownResourceException.java
new file mode 100644
index 0000000..8c95e67
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/UnknownResourceException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+public class UnknownResourceException extends Exception {
+
+ public UnknownResourceException(String message) {
+ super(message);
+ }
+
+ public UnknownResourceException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public UnknownResourceException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
new file mode 100644
index 0000000..ea70b1c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+import java.util.Optional;
+
+/**
+ * <p>
+ * Provides a mechanism for conveying which Flow Registry a flow is stored in, and
+ * where in the Flow Registry the flow is stored.
+ * </p>
+ */
+public interface VersionControlInformation {
+
+ /**
+ * @return the unique identifier of the Flow Registry that this flow is tracking to
+ */
+ String getRegistryIdentifier();
+
+ /**
+ * @return the unique identifier of the bucket that this flow belongs to
+ */
+ String getBucketIdentifier();
+
+ /**
+ * @return the unique identifier of this flow in the Flow Registry
+ */
+ String getFlowIdentifier();
+
+ /**
+ * @return the version of the flow in the Flow Registry that this flow is based on.
+ */
+ int getVersion();
+
+ /**
+ * @return <code>true</code> if the flow has been modified since the last time that it was updated from the Flow Registry or saved
+ * to the Flow Registry; <code>false</code> if the flow is in sync with the Flow Registry. An empty optional will be returned
+ * if it is not yet known whether or not the flow has been modified (for example, on startup, when the flow has not yet been
+ * fetched from the Flow Registry)
+ */
+ Optional<Boolean> getModified();
+
+ /**
+ * @return <code>true</code> if this version of the flow is the most recent version of the flow available in the Flow Registry.
+ * An empty optional will be returned if it is not yet known whether or not the flow has been modified (for example, on startup,
+ * when the flow has not yet been fetched from the Flow Registry)
+ */
+ Optional<Boolean> getCurrent();
+
+ /**
+ * @return the snapshot of the flow that was synchronized with the Flow Registry
+ */
+ VersionedProcessGroup getFlowSnapshot();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index d53eb49..09d032e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -152,6 +152,14 @@
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi.registry</groupId>
+ <artifactId>nifi-registry-data-model</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi.registry</groupId>
+ <artifactId>nifi-registry-flow-diff</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.curator</groupId>
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index 728e8cf..7aa3003 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -47,6 +47,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
@@ -71,6 +72,7 @@ public final class StandardConnection implements Connection {
private final StandardFlowFileQueue flowFileQueue;
private final AtomicInteger labelIndex = new AtomicInteger(1);
private final AtomicLong zIndex = new AtomicLong(0L);
+ private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
private final ProcessScheduler scheduler;
private final int hashCode;
@@ -519,4 +521,27 @@ public final class StandardConnection implements Connection {
}
}
}
+
+ @Override
+ public Optional<String> getVersionedComponentId() {
+ return Optional.ofNullable(versionedComponentId.get());
+ }
+
+ @Override
+ public void setVersionedComponentId(final String versionedComponentId) {
+ boolean updated = false;
+ while (!updated) {
+ final String currentId = this.versionedComponentId.get();
+
+ if (currentId == null) {
+ updated = this.versionedComponentId.compareAndSet(null, versionedComponentId);
+ } else if (currentId.equals(versionedComponentId)) {
+ return;
+ } else if (versionedComponentId == null) {
+ updated = this.versionedComponentId.compareAndSet(currentId, null);
+ } else {
+ throw new IllegalStateException(this + " is already under version control");
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 56b2590..242ef6a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -164,6 +164,7 @@ import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
import org.apache.nifi.remote.HttpRemoteSiteListener;
@@ -329,6 +330,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final List<RemoteGroupPort> startRemoteGroupPortsAfterInitialization;
private final LeaderElectionManager leaderElectionManager;
private final ClusterCoordinator clusterCoordinator;
+ private final FlowRegistryClient flowRegistryClient;
/**
* true if controller is configured to operate in a clustered environment
@@ -395,7 +397,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final AuditService auditService,
final StringEncryptor encryptor,
final BulletinRepository bulletinRepo,
- final VariableRegistry variableRegistry) {
+ final VariableRegistry variableRegistry,
+ final FlowRegistryClient flowRegistryClient) {
return new FlowController(
flowFileEventRepo,
@@ -409,7 +412,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/* cluster coordinator */ null,
/* heartbeat monitor */ null,
/* leader election manager */ null,
- /* variable registry */ variableRegistry);
+ /* variable registry */ variableRegistry,
+ flowRegistryClient);
}
public static FlowController createClusteredInstance(
@@ -423,7 +427,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final ClusterCoordinator clusterCoordinator,
final HeartbeatMonitor heartbeatMonitor,
final LeaderElectionManager leaderElectionManager,
- final VariableRegistry variableRegistry) {
+ final VariableRegistry variableRegistry,
+ final FlowRegistryClient flowRegistryClient) {
final FlowController flowController = new FlowController(
flowFileEventRepo,
@@ -437,7 +442,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
clusterCoordinator,
heartbeatMonitor,
leaderElectionManager,
- variableRegistry);
+ variableRegistry,
+ flowRegistryClient);
return flowController;
}
@@ -454,7 +460,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final ClusterCoordinator clusterCoordinator,
final HeartbeatMonitor heartbeatMonitor,
final LeaderElectionManager leaderElectionManager,
- final VariableRegistry variableRegistry) {
+ final VariableRegistry variableRegistry,
+ final FlowRegistryClient flowRegistryClient) {
maxTimerDrivenThreads = new AtomicInteger(10);
maxEventDrivenThreads = new AtomicInteger(5);
@@ -516,6 +523,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
startRemoteGroupPortsAfterInitialization = new ArrayList<>();
this.authorizer = authorizer;
this.auditService = auditService;
+ this.flowRegistryClient = flowRegistryClient;
final String gracefulShutdownSecondsVal = nifiProperties.getProperty(GRACEFUL_SHUTDOWN_PERIOD);
long shutdownSecs;
@@ -754,6 +762,23 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}, 0L, 30L, TimeUnit.SECONDS);
+ timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ final ProcessGroup rootGroup = getRootGroup();
+ final List<ProcessGroup> allGroups = rootGroup.findAllProcessGroups();
+ allGroups.add(rootGroup);
+
+ for (final ProcessGroup group : allGroups) {
+ try {
+ group.synchronizeWithFlowRegistry(flowRegistryClient);
+ } catch (final Exception e) {
+ LOG.error("Failed to synchronize {} with Flow Registry", group, e);
+ }
+ }
+ }
+ }, 5, 60, TimeUnit.SECONDS);
+
initialized.set(true);
} finally {
writeLock.unlock();
@@ -3311,6 +3336,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return new HashSet<>(reportingTasks.values());
}
+ public FlowRegistryClient getFlowRegistryClient() {
+ return flowRegistryClient;
+ }
+
@Override
public ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls, final boolean firstTimeAdded) {
final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, bundleCoordinate, additionalUrls, firstTimeAdded);
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 3a0b093..e879e38 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -85,6 +85,8 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SimpleProcessLogger;
+import org.apache.nifi.registry.flow.StandardVersionControlInformation;
+import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
@@ -113,6 +115,7 @@ import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
@@ -1048,6 +1051,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final ProcessGroupDTO processGroupDTO = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor, encodingVersion);
final ProcessGroup processGroup = controller.createProcessGroup(processGroupDTO.getId());
processGroup.setComments(processGroupDTO.getComments());
+ processGroup.setVersionedComponentId(processGroupDTO.getVersionedComponentId());
processGroup.setPosition(toPosition(processGroupDTO.getPosition()));
processGroup.setName(processGroupDTO.getName());
processGroup.setParent(parentGroup);
@@ -1072,6 +1076,20 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
processGroup.setVariables(variables);
+ final VersionControlInformationDTO versionControlInfoDto = processGroupDTO.getVersionControlInformation();
+ if (versionControlInfoDto != null) {
+ final String registryId = versionControlInfoDto.getRegistryId();
+ final String bucketId = versionControlInfoDto.getBucketId();
+ final String flowId = versionControlInfoDto.getFlowId();
+ final int version = versionControlInfoDto.getVersion();
+ final boolean modified = false;
+ final boolean current = true;
+
+ final VersionControlInformation versionControlInformation = new StandardVersionControlInformation(registryId, bucketId, flowId, version, null, modified, current);
+ // pass empty map for the version control mapping because the VersionedComponentId has already been set on the components
+ processGroup.setVersionControlInformation(versionControlInformation, Collections.emptyMap());
+ }
+
// Add Controller Services
final List<Element> serviceNodeList = getChildrenByTagName(processGroupElement, "controllerService");
if (!serviceNodeList.isEmpty()) {
@@ -1097,6 +1115,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
final ProcessorNode procNode = controller.createProcessor(processorDTO.getType(), processorDTO.getId(), coordinate, false);
+ procNode.setVersionedComponentId(processorDTO.getVersionedComponentId());
processGroup.addProcessor(procNode);
updateProcessor(procNode, processorDTO, processGroup, controller);
}
@@ -1113,6 +1132,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
port = controller.createLocalInputPort(portDTO.getId(), portDTO.getName());
}
+ port.setVersionedComponentId(portDTO.getVersionedComponentId());
port.setPosition(toPosition(portDTO.getPosition()));
port.setComments(portDTO.getComments());
port.setProcessGroup(processGroup);
@@ -1156,6 +1176,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
} else {
port = controller.createLocalOutputPort(portDTO.getId(), portDTO.getName());
}
+
+ port.setVersionedComponentId(portDTO.getVersionedComponentId());
port.setPosition(toPosition(portDTO.getPosition()));
port.setComments(portDTO.getComments());
port.setProcessGroup(processGroup);
@@ -1193,6 +1215,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
for (final Element funnelElement : funnelNodeList) {
final FunnelDTO funnelDTO = FlowFromDOMFactory.getFunnel(funnelElement);
final Funnel funnel = controller.createFunnel(funnelDTO.getId());
+ funnel.setVersionedComponentId(funnelDTO.getVersionedComponentId());
funnel.setPosition(toPosition(funnelDTO.getPosition()));
// Since this is called during startup, we want to add the funnel without enabling it
@@ -1207,6 +1230,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
for (final Element labelElement : labelNodeList) {
final LabelDTO labelDTO = FlowFromDOMFactory.getLabel(labelElement);
final Label label = controller.createLabel(labelDTO.getId(), labelDTO.getLabel());
+ label.setVersionedComponentId(labelDTO.getVersionedComponentId());
label.setStyle(labelDTO.getStyle());
label.setPosition(toPosition(labelDTO.getPosition()));
@@ -1225,6 +1249,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
for (final Element remoteProcessGroupElement : remoteProcessGroupNodeList) {
final RemoteProcessGroupDTO remoteGroupDto = FlowFromDOMFactory.getRemoteProcessGroup(remoteProcessGroupElement, encryptor);
final RemoteProcessGroup remoteGroup = controller.createRemoteProcessGroup(remoteGroupDto.getId(), remoteGroupDto.getTargetUris());
+ remoteGroup.setVersionedComponentId(remoteGroupDto.getVersionedComponentId());
remoteGroup.setComments(remoteGroupDto.getComments());
remoteGroup.setPosition(toPosition(remoteGroupDto.getPosition()));
final String name = remoteGroupDto.getName();
@@ -1332,6 +1357,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
final Connection connection = controller.createConnection(dto.getId(), dto.getName(), source, destination, dto.getSelectedRelationships());
+ connection.setVersionedComponentId(dto.getVersionedComponentId());
connection.setProcessGroup(processGroup);
final List<Position> bendPoints = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 88912aa..187b62f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
@@ -126,6 +127,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
private final AtomicInteger concurrentTaskCount;
private final AtomicLong yieldExpiration;
private final AtomicLong schedulingNanos;
+ private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
private final ProcessScheduler processScheduler;
private long runNanos = 0L;
private volatile long yieldNanos;
@@ -1511,4 +1513,26 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
return group == null ? null : group.getIdentifier();
}
+ @Override
+ public Optional<String> getVersionedComponentId() {
+ return Optional.ofNullable(versionedComponentId.get());
+ }
+
+ @Override
+ public void setVersionedComponentId(final String versionedComponentId) {
+ boolean updated = false;
+ while (!updated) {
+ final String currentId = this.versionedComponentId.get();
+
+ if (currentId == null) {
+ updated = this.versionedComponentId.compareAndSet(null, versionedComponentId);
+ } else if (currentId.equals(versionedComponentId)) {
+ return;
+ } else if (versionedComponentId == null) {
+ updated = this.versionedComponentId.compareAndSet(currentId, null);
+ } else {
+ throw new IllegalStateException(this + " is already under version control");
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/label/StandardLabel.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/label/StandardLabel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/label/StandardLabel.java
index 32e742d..2e98e84 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/label/StandardLabel.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/label/StandardLabel.java
@@ -28,6 +28,7 @@ import org.apache.nifi.util.CharacterFilterUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
public class StandardLabel implements Label {
@@ -38,6 +39,7 @@ public class StandardLabel implements Label {
private final AtomicReference<Map<String, String>> style;
private final AtomicReference<String> value;
private final AtomicReference<ProcessGroup> processGroup;
+ private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
public StandardLabel(final String identifier, final String value) {
this(identifier, new Position(0D, 0D), new HashMap<String, String>(), value, null);
@@ -76,6 +78,7 @@ public class StandardLabel implements Label {
}
}
+ @Override
public String getIdentifier() {
return identifier;
}
@@ -96,10 +99,12 @@ public class StandardLabel implements Label {
return ResourceFactory.getComponentResource(ResourceType.Label, getIdentifier(),"Label");
}
+ @Override
public Map<String, String> getStyle() {
return style.get();
}
+ @Override
public void setStyle(final Map<String, String> style) {
if (style != null) {
boolean updated = false;
@@ -112,19 +117,46 @@ public class StandardLabel implements Label {
}
}
+ @Override
public String getValue() {
return value.get();
}
+ @Override
public void setValue(final String value) {
this.value.set(CharacterFilterUtils.filterInvalidXmlCharacters(value));
}
+ @Override
public void setProcessGroup(final ProcessGroup group) {
this.processGroup.set(group);
}
+ @Override
public ProcessGroup getProcessGroup() {
return processGroup.get();
}
+
+ @Override
+ public Optional<String> getVersionedComponentId() {
+ return Optional.ofNullable(versionedComponentId.get());
+ }
+
+ @Override
+ public void setVersionedComponentId(final String versionedComponentId) {
+ boolean updated = false;
+ while (!updated) {
+ final String currentId = this.versionedComponentId.get();
+
+ if (currentId == null) {
+ updated = this.versionedComponentId.compareAndSet(null, versionedComponentId);
+ } else if (currentId.equals(versionedComponentId)) {
+ return;
+ } else if (versionedComponentId == null) {
+ updated = this.versionedComponentId.compareAndSet(currentId, null);
+ } else {
+ throw new IllegalStateException(this + " is already under version control");
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
index f67ecd9..a2a589a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
@@ -50,6 +50,7 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
@@ -102,6 +103,7 @@ public class FlowFromDOMFactory {
final ControllerServiceDTO dto = new ControllerServiceDTO();
dto.setId(getString(element, "id"));
+ dto.setVersionedComponentId(getString(element, "versionedComponentId"));
dto.setName(getString(element, "name"));
dto.setComments(getString(element, "comment"));
dto.setType(getString(element, "class"));
@@ -138,6 +140,7 @@ public class FlowFromDOMFactory {
final ProcessGroupDTO dto = new ProcessGroupDTO();
final String groupId = getString(element, "id");
dto.setId(groupId);
+ dto.setVersionedComponentId(getString(element, "versionedComponentId"));
dto.setParentGroupId(parentId);
dto.setName(getString(element, "name"));
dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
@@ -153,6 +156,9 @@ public class FlowFromDOMFactory {
}
dto.setVariables(variables);
+ final Element versionControlInfoElement = DomUtils.getChild(element, "versionControlInformation");
+ dto.setVersionControlInformation(getVersionControlInformation(versionControlInfoElement));
+
final Set<ProcessorDTO> processors = new HashSet<>();
final Set<ConnectionDTO> connections = new HashSet<>();
final Set<FunnelDTO> funnels = new HashSet<>();
@@ -216,12 +222,26 @@ public class FlowFromDOMFactory {
return dto;
}
+ private static VersionControlInformationDTO getVersionControlInformation(final Element versionControlInfoElement) {
+ if (versionControlInfoElement == null) {
+ return null;
+ }
+
+ final VersionControlInformationDTO dto = new VersionControlInformationDTO();
+ dto.setRegistryId(getString(versionControlInfoElement, "registryId"));
+ dto.setBucketId(getString(versionControlInfoElement, "bucketId"));
+ dto.setFlowId(getString(versionControlInfoElement, "flowId"));
+ dto.setVersion(getInt(versionControlInfoElement, "version"));
+ return dto;
+ }
+
public static ConnectionDTO getConnection(final Element element) {
final ConnectionDTO dto = new ConnectionDTO();
dto.setId(getString(element, "id"));
dto.setName(getString(element, "name"));
dto.setLabelIndex(getOptionalInt(element, "labelIndex"));
dto.setzIndex(getOptionalLong(element, "zIndex"));
+ dto.setVersionedComponentId(getString(element, "versionedComponentId"));
final List<PositionDTO> bends = new ArrayList<>();
final Element bendPointsElement = DomUtils.getChild(element, "bendPoints");
@@ -278,6 +298,7 @@ public class FlowFromDOMFactory {
public static RemoteProcessGroupDTO getRemoteProcessGroup(final Element element, final StringEncryptor encryptor) {
final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO();
dto.setId(getString(element, "id"));
+ dto.setVersionedComponentId(getString(element, "versionedComponentId"));
dto.setName(getString(element, "name"));
dto.setTargetUri(getString(element, "url"));
dto.setTargetUris(getString(element, "urls"));
@@ -302,6 +323,7 @@ public class FlowFromDOMFactory {
public static LabelDTO getLabel(final Element element) {
final LabelDTO dto = new LabelDTO();
dto.setId(getString(element, "id"));
+ dto.setVersionedComponentId(getString(element, "versionedComponentId"));
dto.setLabel(getString(element, "value"));
dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
final Size size = getSize(DomUtils.getChild(element, "size"));
@@ -315,6 +337,7 @@ public class FlowFromDOMFactory {
public static FunnelDTO getFunnel(final Element element) {
final FunnelDTO dto = new FunnelDTO();
dto.setId(getString(element, "id"));
+ dto.setVersionedComponentId(getString(element, "versionedComponentId"));
dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
return dto;
@@ -323,6 +346,7 @@ public class FlowFromDOMFactory {
public static PortDTO getPort(final Element element) {
final PortDTO portDTO = new PortDTO();
portDTO.setId(getString(element, "id"));
+ portDTO.setVersionedComponentId(getString(element, "versionedComponentId"));
portDTO.setPosition(getPosition(DomUtils.getChild(element, "position")));
portDTO.setName(getString(element, "name"));
portDTO.setComments(getString(element, "comments"));
@@ -370,6 +394,7 @@ public class FlowFromDOMFactory {
final String targetId = getString(element, "targetId");
descriptor.setTargetId(targetId == null ? id : targetId);
+ descriptor.setVersionedComponentId(getString(element, "versionedComponentId"));
descriptor.setName(getString(element, "name"));
descriptor.setComments(getString(element, "comments"));
descriptor.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks"));
@@ -386,6 +411,7 @@ public class FlowFromDOMFactory {
final ProcessorDTO dto = new ProcessorDTO();
dto.setId(getString(element, "id"));
+ dto.setVersionedComponentId(getString(element, "versionedComponentId"));
dto.setName(getString(element, "name"));
dto.setType(getString(element, "class"));
dto.setBundle(getBundle(DomUtils.getChild(element, "bundle")));
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
index bc28a25..ecf2438 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
@@ -39,6 +39,7 @@ import org.apache.nifi.persistence.TemplateSerializer;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.util.CharacterFilterUtils;
@@ -63,6 +64,7 @@ import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
@@ -151,10 +153,21 @@ public class StandardFlowSerializer implements FlowSerializer {
final Element element = doc.createElement(elementName);
parentElement.appendChild(element);
addTextElement(element, "id", group.getIdentifier());
+ addTextElement(element, "versionedComponentId", group.getVersionedComponentId());
addTextElement(element, "name", group.getName());
addPosition(element, group.getPosition());
addTextElement(element, "comment", group.getComments());
+ final VersionControlInformation versionControlInfo = group.getVersionControlInformation();
+ if (versionControlInfo != null) {
+ final Element versionControlInfoElement = doc.createElement("versionControlInformation");
+ addTextElement(versionControlInfoElement, "registryId", versionControlInfo.getRegistryIdentifier());
+ addTextElement(versionControlInfoElement, "bucketId", versionControlInfo.getBucketIdentifier());
+ addTextElement(versionControlInfoElement, "flowId", versionControlInfo.getFlowIdentifier());
+ addTextElement(versionControlInfoElement, "version", versionControlInfo.getVersion());
+ element.appendChild(versionControlInfoElement);
+ }
+
for (final ProcessorNode processor : group.getProcessors()) {
addProcessor(element, processor, scheduledStateLookup);
}
@@ -258,6 +271,7 @@ public class StandardFlowSerializer implements FlowSerializer {
final Element element = doc.createElement("label");
parentElement.appendChild(element);
addTextElement(element, "id", label.getIdentifier());
+ addTextElement(element, "versionedComponentId", label.getVersionedComponentId());
addPosition(element, label.getPosition());
addSize(element, label.getSize());
@@ -272,6 +286,7 @@ public class StandardFlowSerializer implements FlowSerializer {
final Element element = doc.createElement("funnel");
parentElement.appendChild(element);
addTextElement(element, "id", funnel.getIdentifier());
+ addTextElement(element, "versionedComponentId", funnel.getVersionedComponentId());
addPosition(element, funnel.getPosition());
}
@@ -280,6 +295,7 @@ public class StandardFlowSerializer implements FlowSerializer {
final Element element = doc.createElement("remoteProcessGroup");
parentElement.appendChild(element);
addTextElement(element, "id", remoteRef.getIdentifier());
+ addTextElement(element, "versionedComponentId", remoteRef.getVersionedComponentId());
addTextElement(element, "name", remoteRef.getName());
addPosition(element, remoteRef.getPosition());
addTextElement(element, "comment", remoteRef.getComments());
@@ -322,6 +338,7 @@ public class StandardFlowSerializer implements FlowSerializer {
final Element element = doc.createElement(elementName);
parentElement.appendChild(element);
addTextElement(element, "id", port.getIdentifier());
+ addTextElement(element, "versionedComponentId", port.getVersionedComponentId());
addTextElement(element, "name", port.getName());
addPosition(element, port.getPosition());
addTextElement(element, "comments", port.getComments());
@@ -350,6 +367,7 @@ public class StandardFlowSerializer implements FlowSerializer {
final Element element = doc.createElement(elementName);
parentElement.appendChild(element);
addTextElement(element, "id", port.getIdentifier());
+ addTextElement(element, "versionedComponentId", port.getVersionedComponentId());
addTextElement(element, "name", port.getName());
addPosition(element, port.getPosition());
addTextElement(element, "comments", port.getComments());
@@ -363,6 +381,7 @@ public class StandardFlowSerializer implements FlowSerializer {
final Element element = doc.createElement(elementName);
parentElement.appendChild(element);
addTextElement(element, "id", port.getIdentifier());
+ addTextElement(element, "versionedComponentId", port.getVersionedComponentId());
addTextElement(element, "name", port.getName());
addPosition(element, port.getPosition());
addTextElement(element, "comments", port.getComments());
@@ -383,6 +402,7 @@ public class StandardFlowSerializer implements FlowSerializer {
final Element element = doc.createElement("processor");
parentElement.appendChild(element);
addTextElement(element, "id", processor.getIdentifier());
+ addTextElement(element, "versionedComponentId", processor.getVersionedComponentId());
addTextElement(element, "name", processor.getName());
addPosition(element, processor.getPosition());
@@ -444,6 +464,7 @@ public class StandardFlowSerializer implements FlowSerializer {
final Element element = doc.createElement("connection");
parentElement.appendChild(element);
addTextElement(element, "id", connection.getIdentifier());
+ addTextElement(element, "versionedComponentId", connection.getVersionedComponentId());
addTextElement(element, "name", connection.getName());
final Element bendPointsElement = doc.createElement("bendPoints");
@@ -500,6 +521,7 @@ public class StandardFlowSerializer implements FlowSerializer {
public void addControllerService(final Element element, final ControllerServiceNode serviceNode) {
final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
addTextElement(serviceElement, "id", serviceNode.getIdentifier());
+ addTextElement(serviceElement, "versionedComponentId", serviceNode.getVersionedComponentId());
addTextElement(serviceElement, "name", serviceNode.getName());
addTextElement(serviceElement, "comment", serviceNode.getComments());
addTextElement(serviceElement, "class", serviceNode.getCanonicalClassName());
@@ -544,6 +566,17 @@ public class StandardFlowSerializer implements FlowSerializer {
element.appendChild(toAdd);
}
+ private static void addTextElement(final Element element, final String name, final Optional<String> value) {
+ if (!value.isPresent()) {
+ return;
+ }
+
+ final Document doc = element.getOwnerDocument();
+ final Element toAdd = doc.createElement(name);
+ toAdd.setTextContent(CharacterFilterUtils.filterInvalidXmlCharacters(value.get())); // value should already be filtered, but just in case ensure there are no invalid xml characters
+ element.appendChild(toAdd);
+ }
+
public static void addTemplate(final Element element, final Template template) {
try {
final byte[] serialized = TemplateSerializer.serialize(template.getDetails());
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index 3faffd7..633f0ed 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -198,6 +198,7 @@ public class ControllerServiceLoader {
final ControllerServiceNode node = provider.createControllerService(dto.getType(), dto.getId(), coordinate, Collections.emptySet(), false);
node.setName(dto.getName());
node.setComments(dto.getComments());
+ node.setVersionedComponentId(dto.getVersionedComponentId());
return node;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index f46f796..53fd166 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
@@ -72,6 +73,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
private final AtomicReference<ControllerServiceDetails> controllerServiceHolder = new AtomicReference<>(null);
private final ControllerServiceProvider serviceProvider;
private final ServiceStateTransition stateTransition = new ServiceStateTransition();
+ private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
@@ -526,4 +528,26 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
return results != null ? results : Collections.emptySet();
}
+ @Override
+ public Optional<String> getVersionedComponentId() {
+ return Optional.ofNullable(versionedComponentId.get());
+ }
+
+ @Override
+ public void setVersionedComponentId(final String versionedComponentId) {
+ boolean updated = false;
+ while (!updated) {
+ final String currentId = this.versionedComponentId.get();
+
+ if (currentId == null) {
+ updated = this.versionedComponentId.compareAndSet(null, versionedComponentId);
+ } else if (currentId.equals(versionedComponentId)) {
+ return;
+ } else if (versionedComponentId == null) {
+ updated = this.versionedComponentId.compareAndSet(currentId, null);
+ } else {
+ throw new IllegalStateException(this + " is already under version control");
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
index 7c68475..3aa5084 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
@@ -268,6 +268,17 @@ public class FingerprintFactory {
private StringBuilder addProcessGroupFingerprint(final StringBuilder builder, final Element processGroupElem, final FlowController controller) throws FingerprintException {
// id
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "id"));
+ appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "versionedComponentId"));
+
+ final Element versionControlInfo = DomUtils.getChild(processGroupElem, "versionControlInformation");
+ if (versionControlInfo == null) {
+ builder.append("NO_VERSION_CONTROL_INFORMATION");
+ } else {
+ appendFirstValue(builder, DomUtils.getChildNodesByTagName(versionControlInfo, "registryId"));
+ appendFirstValue(builder, DomUtils.getChildNodesByTagName(versionControlInfo, "bucketId"));
+ appendFirstValue(builder, DomUtils.getChildNodesByTagName(versionControlInfo, "flowId"));
+ appendFirstValue(builder, DomUtils.getChildNodesByTagName(versionControlInfo, "version"));
+ }
// processors
final List<Element> processorElems = DomUtils.getChildElementsByTagName(processGroupElem, "processor");
@@ -344,6 +355,7 @@ public class FingerprintFactory {
private StringBuilder addFlowFileProcessorFingerprint(final StringBuilder builder, final Element processorElem) throws FingerprintException {
// id
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "id"));
+ appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "versionedComponentId"));
// class
final NodeList childNodes = DomUtils.getChildNodesByTagName(processorElem, "class");
final String className = childNodes.item(0).getTextContent();
@@ -435,6 +447,7 @@ public class FingerprintFactory {
private StringBuilder addPortFingerprint(final StringBuilder builder, final Element portElem) throws FingerprintException {
// id
appendFirstValue(builder, DomUtils.getChildNodesByTagName(portElem, "id"));
+ appendFirstValue(builder, DomUtils.getChildNodesByTagName(portElem, "versionedComponentId"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(portElem, "name"));
final NodeList userAccessControlNodeList = DomUtils.getChildNodesByTagName(portElem, "userAccessControl");
@@ -471,13 +484,14 @@ public class FingerprintFactory {
private StringBuilder addLabelFingerprint(final StringBuilder builder, final Element labelElem) {
appendFirstValue(builder, DomUtils.getChildNodesByTagName(labelElem, "id"));
+ appendFirstValue(builder, DomUtils.getChildNodesByTagName(labelElem, "versionedComponentId"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(labelElem, "value"));
return builder;
}
private StringBuilder addRemoteProcessGroupFingerprint(final StringBuilder builder, final Element remoteProcessGroupElem) throws FingerprintException {
- for (String tagName : new String[]{"id", "urls", "networkInterface", "timeout", "yieldPeriod",
+ for (String tagName : new String[] {"id", "versionedComponentId", "urls", "networkInterface", "timeout", "yieldPeriod",
"transportProtocol", "proxyHost", "proxyPort", "proxyUser", "proxyPassword"}) {
final String value = getFirstValue(DomUtils.getChildNodesByTagName(remoteProcessGroupElem, tagName));
if (isEncrypted(value)) {
@@ -544,7 +558,7 @@ public class FingerprintFactory {
}
private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, final Element remoteGroupPortElement) {
- for (final String childName : new String[] {"id", "targetId", "maxConcurrentTasks", "useCompression", "batchCount", "batchSize", "batchDuration"}) {
+ for (final String childName : new String[] {"id", "targetId", "versionedComponentId", "maxConcurrentTasks", "useCompression", "batchCount", "batchSize", "batchDuration"}) {
appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteGroupPortElement, childName));
}
@@ -555,6 +569,7 @@ public class FingerprintFactory {
private StringBuilder addConnectionFingerprint(final StringBuilder builder, final Element connectionElem) throws FingerprintException {
// id
appendFirstValue(builder, DomUtils.getChildNodesByTagName(connectionElem, "id"));
+ appendFirstValue(builder, DomUtils.getChildNodesByTagName(connectionElem, "versionedComponentId"));
// source id
appendFirstValue(builder, DomUtils.getChildNodesByTagName(connectionElem, "sourceId"));
// source group id
@@ -583,11 +598,13 @@ public class FingerprintFactory {
private StringBuilder addFunnelFingerprint(final StringBuilder builder, final Element funnelElem) throws FingerprintException {
// id
appendFirstValue(builder, DomUtils.getChildNodesByTagName(funnelElem, "id"));
+ appendFirstValue(builder, DomUtils.getChildNodesByTagName(funnelElem, "versionedComponentId"));
return builder;
}
private void addControllerServiceFingerprint(final StringBuilder builder, final ControllerServiceDTO dto) {
builder.append(dto.getId());
+ builder.append(dto.getVersionedComponentId());
builder.append(dto.getType());
builder.append(dto.getName());