You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:14:12 UTC
[24/50] nifi git commit: NIFI-4436: Integrate with actual Flow
Registry via REST Client - Store Bucket Name, Flow Name,
Flow Description for VersionControlInformation - Added endpoint for
determining local modifications to a process group - Updated autho
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
index da5880c..9b3ba94 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -37,6 +38,7 @@ import java.util.UUID;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.NiFiRegistryException;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -150,7 +152,7 @@ public class FileBasedFlowRegistry implements FlowRegistry {
try {
final VersionedFlow versionedFlow = getVersionedFlow(bucketIdentifier, flowIdentifier);
versionedFlows.add(versionedFlow);
- } catch (UnknownResourceException e) {
+ } catch (NiFiRegistryException e) {
continue;
}
}
@@ -164,9 +166,38 @@ public class FileBasedFlowRegistry implements FlowRegistry {
return buckets;
}
+ @Override
+ public Bucket getBucket(String bucketId) throws IOException, NiFiRegistryException {
+ return getBucket(bucketId, null);
+ }
+
+ @Override
+ public Bucket getBucket(String bucketId, NiFiUser user) throws IOException, NiFiRegistryException {
+ return getBuckets(user).stream().filter(b -> b.getIdentifier().equals(bucketId)).findFirst().orElse(null);
+ }
+
+ @Override
+ public Set<VersionedFlow> getFlows(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
+ final Bucket bucket = getBuckets(user).stream().filter(b -> bucketId.equals(b.getIdentifier())).findFirst().orElse(null);
+ if (bucket == null) {
+ return Collections.emptySet();
+ }
+
+ return bucket.getVersionedFlows();
+ }
+
+ @Override
+ public Set<VersionedFlowSnapshotMetadata> getFlowVersions(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
+ final VersionedFlow flow = getFlows(bucketId, user).stream().filter(f -> flowId.equals(f.getIdentifier())).findFirst().orElse(null);
+ if (flow == null) {
+ return Collections.emptySet();
+ }
+
+ return flow.getSnapshotMetadata();
+ }
@Override
- public synchronized VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, UnknownResourceException {
+ public synchronized VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, NiFiRegistryException {
Objects.requireNonNull(flow);
Objects.requireNonNull(flow.getBucketIdentifier());
Objects.requireNonNull(flow.getName());
@@ -174,7 +205,7 @@ public class FileBasedFlowRegistry implements FlowRegistry {
// Verify that bucket exists
final File bucketDir = new File(directory, flow.getBucketIdentifier());
if (!bucketDir.exists()) {
- throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier());
+ throw new NiFiRegistryException("No bucket exists with ID " + flow.getBucketIdentifier());
}
// Verify that there is no flow with the same name in that bucket
@@ -213,8 +244,8 @@ public class FileBasedFlowRegistry implements FlowRegistry {
}
@Override
- public synchronized VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments)
- throws IOException, UnknownResourceException {
+ public synchronized VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments, final int expectedVersion)
+ throws IOException, NiFiRegistryException {
Objects.requireNonNull(flow);
Objects.requireNonNull(flow.getBucketIdentifier());
Objects.requireNonNull(flow.getName());
@@ -223,13 +254,13 @@ public class FileBasedFlowRegistry implements FlowRegistry {
// Verify that the bucket exists
final File bucketDir = new File(directory, flow.getBucketIdentifier());
if (!bucketDir.exists()) {
- throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier());
+ throw new NiFiRegistryException("No bucket exists with ID " + flow.getBucketIdentifier());
}
// Verify that the flow exists
final File flowDir = new File(bucketDir, flow.getIdentifier());
if (!flowDir.exists()) {
- throw new UnknownResourceException("No Flow with ID " + flow.getIdentifier() + " exists for Bucket with ID " + flow.getBucketIdentifier());
+ throw new NiFiRegistryException("No Flow with ID " + flow.getIdentifier() + " exists for Bucket with ID " + flow.getBucketIdentifier());
}
final File[] versionDirs = flowDir.listFiles();
@@ -291,17 +322,17 @@ public class FileBasedFlowRegistry implements FlowRegistry {
}
@Override
- public int getLatestVersion(final String bucketId, final String flowId) throws IOException, UnknownResourceException {
+ public int getLatestVersion(final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
// Verify that the bucket exists
final File bucketDir = new File(directory, bucketId);
if (!bucketDir.exists()) {
- throw new UnknownResourceException("No bucket exists with ID " + bucketId);
+ throw new NiFiRegistryException("No bucket exists with ID " + bucketId);
}
// Verify that the flow exists
final File flowDir = new File(bucketDir, flowId);
if (!flowDir.exists()) {
- throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + bucketId);
+ throw new NiFiRegistryException("No Flow with ID " + flowId + " exists for Bucket with ID " + bucketId);
}
final File[] versionDirs = flowDir.listFiles();
@@ -329,22 +360,22 @@ public class FileBasedFlowRegistry implements FlowRegistry {
}
@Override
- public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, int version) throws IOException, UnknownResourceException {
+ public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, int version) throws IOException, NiFiRegistryException {
// Verify that the bucket exists
final File bucketDir = new File(directory, bucketId);
if (!bucketDir.exists()) {
- throw new UnknownResourceException("No bucket exists with ID " + bucketId);
+ throw new NiFiRegistryException("No bucket exists with ID " + bucketId);
}
// Verify that the flow exists
final File flowDir = new File(bucketDir, flowId);
if (!flowDir.exists()) {
- throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
+ throw new NiFiRegistryException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
}
final File versionDir = new File(flowDir, String.valueOf(version));
if (!versionDir.exists()) {
- throw new UnknownResourceException("Flow with ID " + flowId + " in Bucket with ID " + bucketId + " does not contain a snapshot with version " + version);
+ throw new NiFiRegistryException("Flow with ID " + flowId + " in Bucket with ID " + bucketId + " does not contain a snapshot with version " + version);
}
final File contentsFile = new File(versionDir, "flow.xml");
@@ -383,17 +414,17 @@ public class FileBasedFlowRegistry implements FlowRegistry {
}
@Override
- public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, UnknownResourceException {
+ public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
// Verify that the bucket exists
final File bucketDir = new File(directory, bucketId);
if (!bucketDir.exists()) {
- throw new UnknownResourceException("No bucket exists with ID " + bucketId);
+ throw new NiFiRegistryException("No bucket exists with ID " + bucketId);
}
// Verify that the flow exists
final File flowDir = new File(bucketDir, flowId);
if (!flowDir.exists()) {
- throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
+ throw new NiFiRegistryException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
}
final File flowPropsFile = new File(flowDir, "flow.properties");
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
new file mode 100644
index 0000000..26be69b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.BucketClient;
+import org.apache.nifi.registry.client.FlowClient;
+import org.apache.nifi.registry.client.FlowSnapshotClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryClientConfig;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.client.impl.JerseyNiFiRegistryClient;
+
+public class RestBasedFlowRegistry implements FlowRegistry {
+
+ private final FlowRegistryClient flowRegistryClient;
+ private final String identifier;
+ private final SSLContext sslContext;
+ private volatile String description;
+ private volatile String url;
+ private volatile String name;
+
+ private NiFiRegistryClient registryClient;
+
+ public RestBasedFlowRegistry(final FlowRegistryClient flowRegistryClient, final String identifier, final String url, final SSLContext sslContext, final String name) {
+ this.flowRegistryClient = flowRegistryClient;
+ this.identifier = identifier;
+ this.url = url;
+ this.name = name;
+ this.sslContext = sslContext;
+ }
+
+ private synchronized NiFiRegistryClient getRegistryClient() {
+ if (registryClient != null) {
+ return registryClient;
+ }
+
+ final NiFiRegistryClientConfig config = new NiFiRegistryClientConfig.Builder()
+ .connectTimeout(30000)
+ .readTimeout(30000)
+ .sslContext(sslContext)
+ .baseUrl(url)
+ .build();
+
+ registryClient = new JerseyNiFiRegistryClient.Builder()
+ .config(config)
+ .build();
+
+ return registryClient;
+ }
+
+ private synchronized void invalidateClient() {
+ this.registryClient = null;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+
+ @Override
+ public void setDescription(final String description) {
+ this.description = description;
+ }
+
+ @Override
+ public String getURL() {
+ return url;
+ }
+
+ @Override
+ public synchronized void setURL(final String url) {
+ this.url = url;
+ invalidateClient();
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public void setName(final String name) {
+ this.name = name;
+ }
+
+ @Override
+ public Set<Bucket> getBuckets(final NiFiUser user) throws IOException, NiFiRegistryException {
+ final BucketClient bucketClient = getRegistryClient().getBucketClient(user.isAnonymous() ? null : user.getIdentity());
+ return new HashSet<>(bucketClient.getAll());
+ }
+
+ @Override
+ public Bucket getBucket(final String bucketId) throws IOException, NiFiRegistryException {
+ final BucketClient bucketClient = getRegistryClient().getBucketClient();
+ return bucketClient.get(bucketId);
+ }
+
+ @Override
+ public Bucket getBucket(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
+ final BucketClient bucketClient = getRegistryClient().getBucketClient(user.isAnonymous() ? null : user.getIdentity());
+ return bucketClient.get(bucketId);
+ }
+
+
+ @Override
+ public Set<VersionedFlow> getFlows(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
+ final FlowClient flowClient = getRegistryClient().getFlowClient(user.isAnonymous() ? null : user.getIdentity());
+ return new HashSet<>(flowClient.getByBucket(bucketId));
+ }
+
+ @Override
+ public Set<VersionedFlowSnapshotMetadata> getFlowVersions(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
+ final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(user.isAnonymous() ? null : user.getIdentity());
+ return new HashSet<>(snapshotClient.getSnapshotMetadata(bucketId, flowId));
+ }
+
+ @Override
+ public VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, NiFiRegistryException {
+ final FlowClient flowClient = getRegistryClient().getFlowClient();
+ return flowClient.create(flow);
+ }
+
+ @Override
+ public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments, final int expectedVersion)
+ throws IOException, NiFiRegistryException {
+
+ final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient();
+ final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot();
+ versionedFlowSnapshot.setFlowContents(snapshot);
+
+ final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
+ metadata.setBucketIdentifier(flow.getBucketIdentifier());
+ metadata.setFlowIdentifier(flow.getIdentifier());
+ metadata.setFlowName(flow.getName());
+ metadata.setTimestamp(System.currentTimeMillis());
+ metadata.setVersion(expectedVersion);
+ metadata.setComments(comments);
+
+ versionedFlowSnapshot.setSnapshotMetadata(metadata);
+ return snapshotClient.create(versionedFlowSnapshot);
+ }
+
+ @Override
+ public int getLatestVersion(final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
+ return (int) getRegistryClient().getFlowClient().get(bucketId, flowId).getVersionCount();
+ }
+
+ @Override
+ public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version) throws IOException, NiFiRegistryException {
+ final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient();
+ final VersionedFlowSnapshot flowSnapshot = snapshotClient.get(bucketId, flowId, version);
+
+ final VersionedProcessGroup contents = flowSnapshot.getFlowContents();
+ for (final VersionedProcessGroup child : contents.getProcessGroups()) {
+ populateVersionedContentsRecursively(child);
+ }
+
+ return flowSnapshot;
+ }
+
+ private void populateVersionedContentsRecursively(final VersionedProcessGroup group) throws NiFiRegistryException, IOException {
+ if (group == null) {
+ return;
+ }
+
+ final VersionedFlowCoordinates coordinates = group.getVersionedFlowCoordinates();
+ if (coordinates != null) {
+ final String registryUrl = coordinates.getRegistryUrl();
+ final String bucketId = coordinates.getBucketId();
+ final String flowId = coordinates.getFlowId();
+ final int version = coordinates.getVersion();
+
+ final String registryId = flowRegistryClient.getFlowRegistryId(registryUrl);
+ if (registryId == null) {
+ throw new NiFiRegistryException("Flow contains a reference to another Versioned Flow located at URL " + registryUrl
+ + " but NiFi is not configured to communicate with a Flow Registry at that URL");
+ }
+
+ final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
+ final VersionedFlowSnapshot snapshot = flowRegistry.getFlowContents(bucketId, flowId, version);
+ final VersionedProcessGroup contents = snapshot.getFlowContents();
+
+ group.setComments(contents.getComments());
+ group.setConnections(contents.getConnections());
+ group.setControllerServices(contents.getControllerServices());
+ group.setFunnels(contents.getFunnels());
+ group.setInputPorts(contents.getInputPorts());
+ group.setLabels(contents.getLabels());
+ group.setOutputPorts(contents.getOutputPorts());
+ group.setProcessGroups(contents.getProcessGroups());
+ group.setProcessors(contents.getProcessors());
+ group.setRemoteProcessGroups(contents.getRemoteProcessGroups());
+ group.setVariables(contents.getVariables());
+ }
+
+ for (final VersionedProcessGroup child : group.getProcessGroups()) {
+ populateVersionedContentsRecursively(child);
+ }
+ }
+
+ @Override
+ public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
+ final FlowClient flowClient = getRegistryClient().getFlowClient();
+ return flowClient.get(bucketId, flowId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java
index 828b970..d5d0d86 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java
@@ -23,7 +23,13 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.framework.security.util.SslContextFactory;
+import org.apache.nifi.util.NiFiProperties;
+
public class StandardFlowRegistryClient implements FlowRegistryClient {
+ private NiFiProperties nifiProperties;
private ConcurrentMap<String, FlowRegistry> registryById = new ConcurrentHashMap<>();
@Override
@@ -59,6 +65,16 @@ public class StandardFlowRegistryClient implements FlowRegistryClient {
registry.setName(registryName);
registry.setDescription(description);
+ } else if (uriScheme.equalsIgnoreCase("http") || uriScheme.equalsIgnoreCase("https")) {
+ final SSLContext sslContext = SslContextFactory.createSslContext(nifiProperties, false);
+ if (sslContext == null && uriScheme.equalsIgnoreCase("https")) {
+ throw new RuntimeException("Failed to create Flow Registry for URI " + registryUrl
+ + " because this NiFi is not configured with a Keystore/Truststore, so it is not capable of communicating with a secure Registry. "
+ + "Please populate NiFi's Keystore/Truststore properties or connect to a NiFi Registry over http instead of https.");
+ }
+
+ registry = new RestBasedFlowRegistry(this, registryId, registryUrl, sslContext, registryName);
+ registry.setDescription(description);
} else {
throw new IllegalArgumentException("Cannot create Flow Registry with URI of " + registryUrl
+ " because there are no known implementations of Flow Registries that can handle URIs of scheme " + uriScheme);
@@ -72,4 +88,8 @@ public class StandardFlowRegistryClient implements FlowRegistryClient {
public FlowRegistry removeFlowRegistry(final String registryId) {
return registryById.remove(registryId);
}
+
+ public void setProperties(final NiFiProperties nifiProperties) {
+ this.nifiProperties = nifiProperties;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
index 41b98ed..aaba126 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
@@ -17,21 +17,131 @@
package org.apache.nifi.registry.flow;
+import java.util.Objects;
import java.util.Optional;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
+
public class StandardVersionControlInformation implements VersionControlInformation {
private final String registryIdentifier;
+ private volatile String registryName;
private final String bucketIdentifier;
+ private volatile String bucketName;
private final String flowIdentifier;
+ private volatile String flowName;
+ private volatile String flowDescription;
private final int version;
private volatile VersionedProcessGroup flowSnapshot;
private volatile Boolean modified = null;
private volatile Boolean current = null;
- public StandardVersionControlInformation(final String registryId, final String bucketId, final String flowId, final int version,
+ public static class Builder {
+ private String registryIdentifier;
+ private String registryName;
+ private String bucketIdentifier;
+ private String bucketName;
+ private String flowIdentifier;
+ private String flowName;
+ private String flowDescription;
+ private int version;
+ private VersionedProcessGroup flowSnapshot;
+ private Boolean modified = null;
+ private Boolean current = null;
+
+ public Builder registryId(String registryId) {
+ this.registryIdentifier = registryId;
+ return this;
+ }
+
+ public Builder registryName(String registryName) {
+ this.registryName = registryName;
+ return this;
+ }
+
+ public Builder bucketId(String bucketId) {
+ this.bucketIdentifier = bucketId;
+ return this;
+ }
+
+ public Builder bucketName(String bucketName) {
+ this.bucketName = bucketName;
+ return this;
+ }
+
+ public Builder flowId(String flowId) {
+ this.flowIdentifier = flowId;
+ return this;
+ }
+
+ public Builder flowName(String flowName) {
+ this.flowName = flowName;
+ return this;
+ }
+
+ public Builder flowDescription(String flowDescription) {
+ this.flowDescription = flowDescription;
+ return this;
+ }
+
+ public Builder version(int version) {
+ this.version = version;
+ return this;
+ }
+
+ public Builder modified(Boolean modified) {
+ this.modified = modified;
+ return this;
+ }
+
+ public Builder current(Boolean current) {
+ this.current = current;
+ return this;
+ }
+
+ public Builder flowSnapshot(VersionedProcessGroup snapshot) {
+ this.flowSnapshot = snapshot;
+ return this;
+ }
+
+ public static Builder fromDto(VersionControlInformationDTO dto) {
+ Builder builder = new Builder();
+ builder.registryId(dto.getRegistryId())
+ .registryName(dto.getRegistryName())
+ .bucketId(dto.getBucketId())
+ .bucketName(dto.getBucketName())
+ .flowId(dto.getFlowId())
+ .flowName(dto.getFlowName())
+ .flowDescription(dto.getFlowDescription())
+ .current(dto.getCurrent())
+ .modified(dto.getModified())
+ .version(dto.getVersion());
+
+ return builder;
+ }
+
+ public StandardVersionControlInformation build() {
+ Objects.requireNonNull(registryIdentifier, "Registry ID must be specified");
+ Objects.requireNonNull(bucketIdentifier, "Bucket ID must be specified");
+ Objects.requireNonNull(flowIdentifier, "Flow ID must be specified");
+ Objects.requireNonNull(version, "Version must be specified");
+
+ final StandardVersionControlInformation svci = new StandardVersionControlInformation(registryIdentifier, registryName,
+ bucketIdentifier, flowIdentifier, version, flowSnapshot, modified, current);
+
+ svci.setBucketName(bucketName);
+ svci.setFlowName(flowName);
+ svci.setFlowDescription(flowDescription);
+
+ return svci;
+ }
+ }
+
+
+ public StandardVersionControlInformation(final String registryId, final String registryName, final String bucketId, final String flowId, final int version,
final VersionedProcessGroup snapshot, final Boolean modified, final Boolean current) {
this.registryIdentifier = registryId;
+ this.registryName = registryName;
this.bucketIdentifier = bucketId;
this.flowIdentifier = flowId;
this.version = version;
@@ -40,21 +150,58 @@ public class StandardVersionControlInformation implements VersionControlInformat
this.current = current;
}
+
@Override
public String getRegistryIdentifier() {
return registryIdentifier;
}
@Override
+ public String getRegistryName() {
+ return registryName;
+ }
+
+ public void setRegistryName(final String registryName) {
+ this.registryName = registryName;
+ }
+
+ @Override
public String getBucketIdentifier() {
return bucketIdentifier;
}
@Override
+ public String getBucketName() {
+ return bucketName;
+ }
+
+ public void setBucketName(final String bucketName) {
+ this.bucketName = bucketName;
+ }
+
+ @Override
public String getFlowIdentifier() {
return flowIdentifier;
}
+ public void setFlowName(String flowName) {
+ this.flowName = flowName;
+ }
+
+ @Override
+ public String getFlowName() {
+ return flowName;
+ }
+
+ public void setFlowDescription(String flowDescription) {
+ this.flowDescription = flowDescription;
+ }
+
+ @Override
+ public String getFlowDescription() {
+ return flowDescription;
+ }
+
@Override
public int getVersion() {
return version;
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
index a75d112..a10a1b8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
@@ -80,14 +80,11 @@ public class NiFiRegistryFlowMapper {
// created before attempting to create the connection, where the ConnectableDTO is converted.
private Map<String, String> versionedComponentIds = new HashMap<>();
- public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final FlowRegistryClient registryClient) {
+ public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean mapDescendantVersionedFlows) {
versionedComponentIds.clear();
- final InstantiatedVersionedProcessGroup mapped = mapGroup(group, registryClient, true);
+ final InstantiatedVersionedProcessGroup mapped = mapGroup(group, registryClient, true, mapDescendantVersionedFlows);
- // TODO: Test that this works properly
populateReferencedAncestorServices(group, mapped);
-
- // TODO: Test that this works properly
populateReferencedAncestorVariables(group, mapped);
return mapped;
@@ -149,7 +146,10 @@ public class NiFiRegistryFlowMapper {
if (!implicitlyDefinedVariables.isEmpty()) {
// Merge the implicit variables with the explicitly defined variables for the Process Group
// and set those as the Versioned Group's variables.
- implicitlyDefinedVariables.putAll(versionedGroup.getVariables());
+ if (versionedGroup.getVariables() != null) {
+ implicitlyDefinedVariables.putAll(versionedGroup.getVariables());
+ }
+
versionedGroup.setVariables(implicitlyDefinedVariables);
}
}
@@ -167,7 +167,7 @@ public class NiFiRegistryFlowMapper {
}
- private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean topLevel) {
+ private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean topLevel, final boolean mapDescendantVersionedFlows) {
final InstantiatedVersionedProcessGroup versionedGroup = new InstantiatedVersionedProcessGroup(group.getIdentifier(), group.getProcessGroupIdentifier());
versionedGroup.setIdentifier(getId(group.getVersionedComponentId(), group.getIdentifier()));
versionedGroup.setGroupIdentifier(getGroupId(group.getProcessGroupIdentifier()));
@@ -192,10 +192,22 @@ public class NiFiRegistryFlowMapper {
coordinates.setBucketId(versionControlInfo.getBucketIdentifier());
coordinates.setFlowId(versionControlInfo.getFlowIdentifier());
coordinates.setVersion(versionControlInfo.getVersion());
+ versionedGroup.setVersionedFlowCoordinates(coordinates);
+
+ // We need to register the Port ID -> Versioned Component ID's in our versionedComponentIds member variable for all input & output ports.
+ // Otherwise, we will not be able to lookup the port when connecting to it.
+ for (final Port port : group.getInputPorts()) {
+ getId(port.getVersionedComponentId(), port.getIdentifier());
+ }
+ for (final Port port : group.getOutputPorts()) {
+ getId(port.getVersionedComponentId(), port.getIdentifier());
+ }
// If the Process Group itself is remotely versioned, then we don't want to include its contents
// because the contents are remotely managed and not part of the versioning of this Process Group
- return versionedGroup;
+ if (!mapDescendantVersionedFlows) {
+ return versionedGroup;
+ }
}
}
@@ -228,7 +240,7 @@ public class NiFiRegistryFlowMapper {
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setProcessGroups(group.getProcessGroups().stream()
- .map(grp -> mapGroup(grp, registryClient, false))
+ .map(grp -> mapGroup(grp, registryClient, false, mapDescendantVersionedFlows))
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setConnections(group.getConnections().stream()
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
index 8954f39..9e81d22 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
@@ -51,7 +51,7 @@
<xs:element name="id" type="NonEmptyStringType" />
<xs:element name="name" type="NonEmptyStringType" />
<xs:element name="url" type="NonEmptyStringType" />
- <xs:element name="description" type="NonEmptyStringType" />
+ <xs:element name="description" type="xs:string" />
</xs:sequence>
</xs:complexType>
@@ -180,7 +180,10 @@
<xs:sequence>
<xs:element name="registryId" type="NonEmptyStringType" />
<xs:element name="bucketId" type="NonEmptyStringType" />
+ <xs:element name="bucketName" type="NonEmptyStringType" />
<xs:element name="flowId" type="NonEmptyStringType" />
+ <xs:element name="flowName" type="NonEmptyStringType" />
+ <xs:element name="flowDescription" type="xs:string" />
<xs:element name="version" type="NonEmptyStringType" />
</xs:sequence>
</xs:complexType>
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
index fc42c62..d9f89aa 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
@@ -36,7 +36,9 @@
</bean>
<!-- flow registry -->
- <bean id="flowRegistryClient" class="org.apache.nifi.registry.flow.StandardFlowRegistryClient" />
+ <bean id="flowRegistryClient" class="org.apache.nifi.registry.flow.StandardFlowRegistryClient">
+ <property name="properties" ref="nifiProperties" />
+ </bean>
<!-- flow controller -->
<bean id="flowController" class="org.apache.nifi.spring.FlowControllerFactoryBean">
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index e299059..be907ba 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -16,6 +16,14 @@
*/
package org.apache.nifi.web;
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
import org.apache.nifi.authorization.AuthorizeAccess;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.user.NiFiUser;
@@ -23,7 +31,7 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.registry.flow.UnknownResourceException;
+import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
@@ -86,6 +94,7 @@ import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
import org.apache.nifi.web.api.entity.CurrentUserEntity;
+import org.apache.nifi.web.api.entity.FlowComparisonEntity;
import org.apache.nifi.web.api.entity.FlowConfigurationEntity;
import org.apache.nifi.web.api.entity.FlowEntity;
import org.apache.nifi.web.api.entity.FunnelEntity;
@@ -115,14 +124,6 @@ import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
import org.apache.nifi.web.api.entity.VersionedFlowEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
-import java.io.IOException;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
-
/**
* Defines the NiFiServiceFacade interface.
*/
@@ -1274,6 +1275,17 @@ public interface NiFiServiceFacade {
// ----------------------------------------
/**
+ * Returns a FlowComparisonEntity that contains all of the local modifications since the Process Group
+ * was last synchronized with the Flow Registry
+ *
+ * @param processGroupId
+ * @return a FlowComparisonEntity that contains all of the local modifications since the Process Group
+ * was last synchronized with the Flow Registry
+ * @throws IllegalStateException if the Process Group with the given ID is not under version control
+ */
+ FlowComparisonEntity getLocalModifications(String processGroupId) throws IOException, NiFiRegistryException;
+
+ /**
* Returns the Version Control information for the Process Group with the given ID
*
* @param processGroupId the ID of the Process Group
@@ -1292,7 +1304,7 @@ public interface NiFiServiceFacade {
*
* @throws IOException if unable to communicate with the Flow Registry
*/
- VersionedFlow registerVersionedFlow(String registryId, VersionedFlow flow) throws IOException, UnknownResourceException;
+ VersionedFlow registerVersionedFlow(String registryId, VersionedFlow flow) throws IOException, NiFiRegistryException;
/**
* Creates a snapshot of the Process Group with the given identifier, then creates a new Flow entity in the NiFi Registry
@@ -1312,11 +1324,13 @@ public interface NiFiServiceFacade {
* @param flow the flow where the snapshot should be persisted
* @param snapshot the Snapshot to persist
* @param comments about the snapshot
+ * @param expectedVersion the version to save the flow as
* @return the snapshot that represents what was stored in the registry
*
* @throws IOException if unable to communicate with the Flow Registry
*/
- VersionedFlowSnapshot registerVersionedFlowSnapshot(String registryId, VersionedFlow flow, VersionedProcessGroup snapshot, String comments) throws IOException, UnknownResourceException;
+ VersionedFlowSnapshot registerVersionedFlowSnapshot(String registryId, VersionedFlow flow, VersionedProcessGroup snapshot, String comments, int expectedVersion)
+ throws IOException, NiFiRegistryException;
/**
* Updates the Version Control Information on the Process Group with the given ID
@@ -1351,6 +1365,15 @@ public interface NiFiServiceFacade {
VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo) throws IOException;
/**
+ * Returns the name of the Flow Registry that is registered with the given ID. If no Flow Registry exists with the given ID, will return
+ * the ID itself as the name
+ *
+ * @param flowRegistryId the id of the flow registry
+ * @return the name of the Flow Registry that is registered with the given ID, or the ID itself if no Flow Registry is registered with the given ID
+ */
+ String getFlowRegistryName(String flowRegistryId);
+
+ /**
* Determines which components currently exist in the Process Group with the given identifier and calculates which of those components
* would be impacted by updating the Process Group to the provided snapshot
*
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index a319f27..89e00ba 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -90,7 +90,7 @@ import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
-import org.apache.nifi.registry.flow.UnknownResourceException;
+import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedComponent;
import org.apache.nifi.registry.flow.VersionedConnection;
@@ -101,11 +101,13 @@ import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
import org.apache.nifi.registry.flow.diff.DifferenceType;
+import org.apache.nifi.registry.flow.diff.EvolvingDifferenceDescriptor;
import org.apache.nifi.registry.flow.diff.FlowComparator;
import org.apache.nifi.registry.flow.diff.FlowComparison;
import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
+import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
@@ -128,6 +130,7 @@ import org.apache.nifi.web.api.dto.BulletinDTO;
import org.apache.nifi.web.api.dto.BulletinQueryDTO;
import org.apache.nifi.web.api.dto.ClusterDTO;
import org.apache.nifi.web.api.dto.ComponentDTO;
+import org.apache.nifi.web.api.dto.ComponentDifferenceDTO;
import org.apache.nifi.web.api.dto.ComponentHistoryDTO;
import org.apache.nifi.web.api.dto.ComponentReferenceDTO;
import org.apache.nifi.web.api.dto.ComponentStateDTO;
@@ -205,6 +208,7 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity;
import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
import org.apache.nifi.web.api.entity.CurrentUserEntity;
+import org.apache.nifi.web.api.entity.FlowComparisonEntity;
import org.apache.nifi.web.api.entity.FlowConfigurationEntity;
import org.apache.nifi.web.api.entity.FlowEntity;
import org.apache.nifi.web.api.entity.FunnelEntity;
@@ -3628,6 +3632,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public VersionControlComponentMappingEntity registerFlowWithFlowRegistry(final String groupId, final StartVersionControlRequestEntity requestEntity) {
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+ final VersionControlInformation currentVci = processGroup.getVersionControlInformation();
+ final int expectedVersion = currentVci == null ? 1 : currentVci.getVersion() + 1;
+
// Create a VersionedProcessGroup snapshot of the flow as it is currently.
final InstantiatedVersionedProcessGroup versionedProcessGroup = createFlowSnapshot(groupId);
@@ -3660,8 +3668,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// add first snapshot to the flow in the registry
final String comments = versionedFlow.getDescription() == null ? "Initial version of flow" : versionedFlow.getDescription();
- registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, comments);
- } catch (final UnknownResourceException e) {
+ registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, comments, expectedVersion);
+ } catch (final NiFiRegistryException e) {
throw new IllegalArgumentException(e);
} catch (final IOException ioe) {
// will result in a 500: Internal Server Error
@@ -3671,12 +3679,15 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// Update the Process Group with the new VersionControlInformation. (Send this to all nodes).
final VersionControlInformationDTO vci = new VersionControlInformationDTO();
vci.setBucketId(registeredFlow.getBucketIdentifier());
+ vci.setBucketName(registeredFlow.getBucketName());
vci.setCurrent(true);
vci.setFlowId(registeredFlow.getIdentifier());
vci.setFlowName(registeredFlow.getName());
+ vci.setFlowDescription(registeredFlow.getDescription());
vci.setGroupId(groupId);
vci.setModified(false);
vci.setRegistryId(registryId);
+ vci.setRegistryName(getFlowRegistryName(registryId));
vci.setVersion(registeredSnapshot.getSnapshotMetadata().getVersion());
final Map<String, String> mapping = dtoFactory.createVersionControlComponentMappingDto(versionedProcessGroup);
@@ -3707,12 +3718,67 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private InstantiatedVersionedProcessGroup createFlowSnapshot(final String processGroupId) {
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
- final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient);
+ final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, false);
return versionedGroup;
}
@Override
- public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) throws IOException, UnknownResourceException {
+ public FlowComparisonEntity getLocalModifications(final String processGroupId) throws IOException, NiFiRegistryException {
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
+ final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
+ if (versionControlInfo == null) {
+ throw new IllegalStateException("Process Group with ID " + processGroupId + " is not under Version Control");
+ }
+
+ final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryIdentifier());
+ if (flowRegistry == null) {
+ throw new IllegalStateException("Process Group with ID " + processGroupId + " is tracking to a flow in Flow Registry with ID " + versionControlInfo.getRegistryIdentifier()
+ + " but cannot find a Flow Registry with that identifier");
+ }
+
+ final VersionedFlowSnapshot versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(),
+ versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion());
+
+
+ final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
+ final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, true);
+ final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents();
+
+ final ComparableDataFlow localFlow = new ComparableDataFlow() {
+ @Override
+ public VersionedProcessGroup getContents() {
+ return localGroup;
+ }
+
+ @Override
+ public String getName() {
+ return "Local Flow";
+ }
+ };
+
+ final ComparableDataFlow registryFlow = new ComparableDataFlow() {
+ @Override
+ public VersionedProcessGroup getContents() {
+ return registryGroup;
+ }
+
+ @Override
+ public String getName() {
+ return "Versioned Flow";
+ }
+ };
+
+ final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new EvolvingDifferenceDescriptor());
+ final FlowComparison flowComparison = flowComparator.compare();
+
+ final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison);
+ final FlowComparisonEntity entity = new FlowComparisonEntity();
+ entity.setComponentDifferences(differenceDtos);
+ return entity;
+ }
+
+ @Override
+ public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) throws IOException, NiFiRegistryException {
final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
if (registry == null) {
throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
@@ -3721,7 +3787,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return registry.registerVersionedFlow(flow);
}
- private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, UnknownResourceException {
+ private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
if (registry == null) {
throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
@@ -3732,13 +3798,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public VersionedFlowSnapshot registerVersionedFlowSnapshot(final String registryId, final VersionedFlow flow,
- final VersionedProcessGroup snapshot, final String comments) throws IOException, UnknownResourceException {
+ final VersionedProcessGroup snapshot, final String comments, final int expectedVersion) throws IOException, NiFiRegistryException {
final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
if (registry == null) {
throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
}
- return registry.registerVersionedFlowSnapshot(flow, snapshot, comments);
+ return registry.registerVersionedFlowSnapshot(flow, snapshot, comments, expectedVersion);
}
@Override
@@ -3778,12 +3844,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
- final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, flowRegistryClient);
+ final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, flowRegistryClient, true);
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localContents);
- final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Proposed Flow", updatedSnapshot.getFlowContents());
+ final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Versioned Flow", updatedSnapshot.getFlowContents());
- final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow);
+ final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, new StaticDifferenceDescriptor());
final FlowComparison comparison = flowComparator.compare();
final Set<AffectedComponentEntity> affectedComponents = comparison.getDifferences().stream()
@@ -3958,7 +4024,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final VersionedFlowSnapshot snapshot;
try {
snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion());
- } catch (final UnknownResourceException e) {
+ } catch (final NiFiRegistryException e) {
throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket "
+ versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion());
}
@@ -3969,6 +4035,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return snapshot;
}
+ @Override
+ public String getFlowRegistryName(final String flowRegistryId) {
+ final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(flowRegistryId);
+ return flowRegistry == null ? flowRegistryId : flowRegistry.getName();
+ }
+
private void populateVersionedChildFlows(final VersionedFlowSnapshot snapshot) throws IOException {
final VersionedProcessGroup group = snapshot.getFlowContents();
@@ -3993,7 +4065,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final VersionedFlowSnapshot childSnapshot;
try {
childSnapshot = flowRegistry.getFlowContents(remoteCoordinates.getBucketId(), remoteCoordinates.getFlowId(), remoteCoordinates.getVersion());
- } catch (final UnknownResourceException e) {
+ } catch (final NiFiRegistryException e) {
throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket "
+ remoteCoordinates.getBucketId() + ", Flow " + remoteCoordinates.getFlowId() + ", Version " + remoteCoordinates.getVersion());
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index b8bdc14..6bf4cca 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -39,6 +39,7 @@ import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.NarClassLoaders;
+import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.IllegalClusterResourceRequestException;
import org.apache.nifi.web.NiFiServiceFacade;
@@ -1373,7 +1374,7 @@ public class FlowResource extends ApplicationResource {
value = "The registry id.",
required = true
)
- @PathParam("id") String id) {
+ @PathParam("id") String id) throws NiFiRegistryException {
authorizeFlow();
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 11c548f..d24dcbb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -87,8 +87,10 @@ import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.serialization.FlowEncodingVersion;
import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
@@ -123,6 +125,7 @@ import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
import org.apache.nifi.web.api.entity.CreateTemplateRequestEntity;
import org.apache.nifi.web.api.entity.Entity;
+import org.apache.nifi.web.api.entity.FlowComparisonEntity;
import org.apache.nifi.web.api.entity.FlowEntity;
import org.apache.nifi.web.api.entity.FunnelEntity;
import org.apache.nifi.web.api.entity.FunnelsEntity;
@@ -301,6 +304,53 @@ public class ProcessGroupResource extends ApplicationResource {
/**
+ * Retrieves a list of local modifications to the Process Group since it was last synchronized with the Flow Registry
+ *
+ * @param groupId The id of the process group.
+ * @return A processGroupEntity.
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/local-modifications")
+ @ApiOperation(
+ value = "Gets a list of local modifications to the Process Group since it was last synchronized with the Flow Registry",
+ response = FlowComparisonEntity.class,
+ authorizations = {
+ @Authorization(value = "Read - /process-groups/{uuid}"),
+ @Authorization(value = "Read - /{component-type}/{uuid} - For all encapsulated components")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response getLocalModifications(
+ @ApiParam(
+ value = "The process group id.",
+ required = false
+ )
+ @PathParam("id") final String groupId) throws IOException, NiFiRegistryException {
+
+ // authorize access
+ serviceFacade.authorizeAccess(lookup -> {
+ final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
+ final Authorizable processGroup = groupAuthorizable.getAuthorizable();
+ processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+ super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, false, false, true, false);
+ });
+
+ final FlowComparisonEntity entity = serviceFacade.getLocalModifications(groupId);
+ return generateOkResponse(entity).build();
+ }
+
+
+ /**
* Retrieves the Variable Registry for the group with the given ID
*
* @param groupId the ID of the Process Group
@@ -1594,6 +1644,13 @@ public class ProcessGroupResource extends ApplicationResource {
// Step 2: Retrieve flow from Flow Registry
final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo);
+ final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata();
+ versionControlInfo.setBucketName(metadata.getBucketName());
+ versionControlInfo.setFlowName(metadata.getFlowName());
+ versionControlInfo.setFlowDescription(metadata.getFlowDescription());
+
+ versionControlInfo.setRegistryName(serviceFacade.getFlowRegistryName(versionControlInfo.getRegistryId()));
+
// Step 3: Resolve Bundle info
BundleUtils.discoverCompatibleBundles(flowSnapshot.getFlowContents());
@@ -1635,14 +1692,14 @@ public class ProcessGroupResource extends ApplicationResource {
// create the process group contents
final Revision revision = getRevision(processGroupGroupEntity, processGroupGroupEntity.getComponent().getId());
- final ProcessGroupEntity entity = serviceFacade.createProcessGroup(revision, groupId, processGroupGroupEntity.getComponent());
+ ProcessGroupEntity entity = serviceFacade.createProcessGroup(revision, groupId, processGroupGroupEntity.getComponent());
final VersionedFlowSnapshot flowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot();
if (flowSnapshot != null) {
final RevisionDTO revisionDto = entity.getRevision();
final String newGroupId = entity.getComponent().getId();
final Revision newGroupRevision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), newGroupId);
- serviceFacade.updateProcessGroupContents(NiFiUserUtils.getNiFiUser(), newGroupRevision, newGroupId,
+ entity = serviceFacade.updateProcessGroupContents(NiFiUserUtils.getNiFiUser(), newGroupRevision, newGroupId,
versionControlInfo, flowSnapshot, getIdGenerationSeed().orElse(null), false, false);
}