You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:14:12 UTC

[24/50] nifi git commit: NIFI-4436: Integrate with actual Flow Registry via REST Client - Store Bucket Name, Flow Name, Flow Description for VersionControlInformation - Added endpoint for determining local modifications to a process group - Updated autho

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
index da5880c..9b3ba94 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URI;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -37,6 +38,7 @@ import java.util.UUID;
 
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.NiFiRegistryException;
 
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
@@ -150,7 +152,7 @@ public class FileBasedFlowRegistry implements FlowRegistry {
                     try {
                         final VersionedFlow versionedFlow = getVersionedFlow(bucketIdentifier, flowIdentifier);
                         versionedFlows.add(versionedFlow);
-                    } catch (UnknownResourceException e) {
+                    } catch (NiFiRegistryException e) {
                         continue;
                     }
                 }
@@ -164,9 +166,38 @@ public class FileBasedFlowRegistry implements FlowRegistry {
         return buckets;
     }
 
+    @Override
+    public Bucket getBucket(String bucketId) throws IOException, NiFiRegistryException {
+        return getBucket(bucketId, null);
+    }
+
+    @Override
+    public Bucket getBucket(String bucketId, NiFiUser user) throws IOException, NiFiRegistryException {
+        return getBuckets(user).stream().filter(b -> b.getIdentifier().equals(bucketId)).findFirst().orElse(null);
+    }
+
+    @Override
+    public Set<VersionedFlow> getFlows(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
+        final Bucket bucket = getBuckets(user).stream().filter(b -> bucketId.equals(b.getIdentifier())).findFirst().orElse(null);
+        if (bucket == null) {
+            return Collections.emptySet();
+        }
+
+        return bucket.getVersionedFlows();
+    }
+
+    @Override
+    public Set<VersionedFlowSnapshotMetadata> getFlowVersions(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
+        final VersionedFlow flow = getFlows(bucketId, user).stream().filter(f -> flowId.equals(f.getIdentifier())).findFirst().orElse(null);
+        if (flow == null) {
+            return Collections.emptySet();
+        }
+
+        return flow.getSnapshotMetadata();
+    }
 
     @Override
-    public synchronized VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, UnknownResourceException {
+    public synchronized VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, NiFiRegistryException {
         Objects.requireNonNull(flow);
         Objects.requireNonNull(flow.getBucketIdentifier());
         Objects.requireNonNull(flow.getName());
@@ -174,7 +205,7 @@ public class FileBasedFlowRegistry implements FlowRegistry {
         // Verify that bucket exists
         final File bucketDir = new File(directory, flow.getBucketIdentifier());
         if (!bucketDir.exists()) {
-            throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier());
+            throw new NiFiRegistryException("No bucket exists with ID " + flow.getBucketIdentifier());
         }
 
         // Verify that there is no flow with the same name in that bucket
@@ -213,8 +244,8 @@ public class FileBasedFlowRegistry implements FlowRegistry {
     }
 
     @Override
-    public synchronized VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments)
-            throws IOException, UnknownResourceException {
+    public synchronized VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments, final int expectedVersion)
+        throws IOException, NiFiRegistryException {
         Objects.requireNonNull(flow);
         Objects.requireNonNull(flow.getBucketIdentifier());
         Objects.requireNonNull(flow.getName());
@@ -223,13 +254,13 @@ public class FileBasedFlowRegistry implements FlowRegistry {
         // Verify that the bucket exists
         final File bucketDir = new File(directory, flow.getBucketIdentifier());
         if (!bucketDir.exists()) {
-            throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier());
+            throw new NiFiRegistryException("No bucket exists with ID " + flow.getBucketIdentifier());
         }
 
         // Verify that the flow exists
         final File flowDir = new File(bucketDir, flow.getIdentifier());
         if (!flowDir.exists()) {
-            throw new UnknownResourceException("No Flow with ID " + flow.getIdentifier() + " exists for Bucket with ID " + flow.getBucketIdentifier());
+            throw new NiFiRegistryException("No Flow with ID " + flow.getIdentifier() + " exists for Bucket with ID " + flow.getBucketIdentifier());
         }
 
         final File[] versionDirs = flowDir.listFiles();
@@ -291,17 +322,17 @@ public class FileBasedFlowRegistry implements FlowRegistry {
     }
 
     @Override
-    public int getLatestVersion(final String bucketId, final String flowId) throws IOException, UnknownResourceException {
+    public int getLatestVersion(final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
         // Verify that the bucket exists
         final File bucketDir = new File(directory, bucketId);
         if (!bucketDir.exists()) {
-            throw new UnknownResourceException("No bucket exists with ID " + bucketId);
+            throw new NiFiRegistryException("No bucket exists with ID " + bucketId);
         }
 
         // Verify that the flow exists
         final File flowDir = new File(bucketDir, flowId);
         if (!flowDir.exists()) {
-            throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + bucketId);
+            throw new NiFiRegistryException("No Flow with ID " + flowId + " exists for Bucket with ID " + bucketId);
         }
 
         final File[] versionDirs = flowDir.listFiles();
@@ -329,22 +360,22 @@ public class FileBasedFlowRegistry implements FlowRegistry {
     }
 
     @Override
-    public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, int version) throws IOException, UnknownResourceException {
+    public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, int version) throws IOException, NiFiRegistryException {
         // Verify that the bucket exists
         final File bucketDir = new File(directory, bucketId);
         if (!bucketDir.exists()) {
-            throw new UnknownResourceException("No bucket exists with ID " + bucketId);
+            throw new NiFiRegistryException("No bucket exists with ID " + bucketId);
         }
 
         // Verify that the flow exists
         final File flowDir = new File(bucketDir, flowId);
         if (!flowDir.exists()) {
-            throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
+            throw new NiFiRegistryException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
         }
 
         final File versionDir = new File(flowDir, String.valueOf(version));
         if (!versionDir.exists()) {
-            throw new UnknownResourceException("Flow with ID " + flowId + " in Bucket with ID " + bucketId + " does not contain a snapshot with version " + version);
+            throw new NiFiRegistryException("Flow with ID " + flowId + " in Bucket with ID " + bucketId + " does not contain a snapshot with version " + version);
         }
 
         final File contentsFile = new File(versionDir, "flow.xml");
@@ -383,17 +414,17 @@ public class FileBasedFlowRegistry implements FlowRegistry {
     }
 
     @Override
-    public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, UnknownResourceException {
+    public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
         // Verify that the bucket exists
         final File bucketDir = new File(directory, bucketId);
         if (!bucketDir.exists()) {
-            throw new UnknownResourceException("No bucket exists with ID " + bucketId);
+            throw new NiFiRegistryException("No bucket exists with ID " + bucketId);
         }
 
         // Verify that the flow exists
         final File flowDir = new File(bucketDir, flowId);
         if (!flowDir.exists()) {
-            throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
+            throw new NiFiRegistryException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
         }
 
         final File flowPropsFile = new File(flowDir, "flow.properties");

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
new file mode 100644
index 0000000..26be69b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.BucketClient;
+import org.apache.nifi.registry.client.FlowClient;
+import org.apache.nifi.registry.client.FlowSnapshotClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryClientConfig;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.client.impl.JerseyNiFiRegistryClient;
+
+public class RestBasedFlowRegistry implements FlowRegistry {
+
+    private final FlowRegistryClient flowRegistryClient;
+    private final String identifier;
+    private final SSLContext sslContext;
+    private volatile String description;
+    private volatile String url;
+    private volatile String name;
+
+    private NiFiRegistryClient registryClient;
+
+    public RestBasedFlowRegistry(final FlowRegistryClient flowRegistryClient, final String identifier, final String url, final SSLContext sslContext, final String name) {
+        this.flowRegistryClient = flowRegistryClient;
+        this.identifier = identifier;
+        this.url = url;
+        this.name = name;
+        this.sslContext = sslContext;
+    }
+
+    private synchronized NiFiRegistryClient getRegistryClient() {
+        if (registryClient != null) {
+            return registryClient;
+        }
+
+        final NiFiRegistryClientConfig config = new NiFiRegistryClientConfig.Builder()
+            .connectTimeout(30000)
+            .readTimeout(30000)
+            .sslContext(sslContext)
+            .baseUrl(url)
+            .build();
+
+        registryClient = new JerseyNiFiRegistryClient.Builder()
+            .config(config)
+            .build();
+
+        return registryClient;
+    }
+
+    private synchronized void invalidateClient() {
+        this.registryClient = null;
+    }
+
+    @Override
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+
+    @Override
+    public void setDescription(final String description) {
+        this.description = description;
+    }
+
+    @Override
+    public String getURL() {
+        return url;
+    }
+
+    @Override
+    public synchronized void setURL(final String url) {
+        this.url = url;
+        invalidateClient();
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public void setName(final String name) {
+        this.name = name;
+    }
+
+    @Override
+    public Set<Bucket> getBuckets(final NiFiUser user) throws IOException, NiFiRegistryException {
+        final BucketClient bucketClient = getRegistryClient().getBucketClient(user.isAnonymous() ? null : user.getIdentity());
+        return new HashSet<>(bucketClient.getAll());
+    }
+
+    @Override
+    public Bucket getBucket(final String bucketId) throws IOException, NiFiRegistryException {
+        final BucketClient bucketClient = getRegistryClient().getBucketClient();
+        return bucketClient.get(bucketId);
+    }
+
+    @Override
+    public Bucket getBucket(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
+        final BucketClient bucketClient = getRegistryClient().getBucketClient(user.isAnonymous() ? null : user.getIdentity());
+        return bucketClient.get(bucketId);
+    }
+
+
+    @Override
+    public Set<VersionedFlow> getFlows(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
+        final FlowClient flowClient = getRegistryClient().getFlowClient(user.isAnonymous() ? null : user.getIdentity());
+        return new HashSet<>(flowClient.getByBucket(bucketId));
+    }
+
+    @Override
+    public Set<VersionedFlowSnapshotMetadata> getFlowVersions(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
+        final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(user.isAnonymous() ? null : user.getIdentity());
+        return new HashSet<>(snapshotClient.getSnapshotMetadata(bucketId, flowId));
+    }
+
+    @Override
+    public VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, NiFiRegistryException {
+        final FlowClient flowClient = getRegistryClient().getFlowClient();
+        return flowClient.create(flow);
+    }
+
+    @Override
+    public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments, final int expectedVersion)
+            throws IOException, NiFiRegistryException {
+
+        final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient();
+        final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot();
+        versionedFlowSnapshot.setFlowContents(snapshot);
+
+        final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
+        metadata.setBucketIdentifier(flow.getBucketIdentifier());
+        metadata.setFlowIdentifier(flow.getIdentifier());
+        metadata.setFlowName(flow.getName());
+        metadata.setTimestamp(System.currentTimeMillis());
+        metadata.setVersion(expectedVersion);
+        metadata.setComments(comments);
+
+        versionedFlowSnapshot.setSnapshotMetadata(metadata);
+        return snapshotClient.create(versionedFlowSnapshot);
+    }
+
+    @Override
+    public int getLatestVersion(final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
+        return (int) getRegistryClient().getFlowClient().get(bucketId, flowId).getVersionCount();
+    }
+
+    @Override
+    public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version) throws IOException, NiFiRegistryException {
+        final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient();
+        final VersionedFlowSnapshot flowSnapshot = snapshotClient.get(bucketId, flowId, version);
+
+        final VersionedProcessGroup contents = flowSnapshot.getFlowContents();
+        for (final VersionedProcessGroup child : contents.getProcessGroups()) {
+            populateVersionedContentsRecursively(child);
+        }
+
+        return flowSnapshot;
+    }
+
+    private void populateVersionedContentsRecursively(final VersionedProcessGroup group) throws NiFiRegistryException, IOException {
+        if (group == null) {
+            return;
+        }
+
+        final VersionedFlowCoordinates coordinates = group.getVersionedFlowCoordinates();
+        if (coordinates != null) {
+            final String registryUrl = coordinates.getRegistryUrl();
+            final String bucketId = coordinates.getBucketId();
+            final String flowId = coordinates.getFlowId();
+            final int version = coordinates.getVersion();
+
+            final String registryId = flowRegistryClient.getFlowRegistryId(registryUrl);
+            if (registryId == null) {
+                throw new NiFiRegistryException("Flow contains a reference to another Versioned Flow located at URL " + registryUrl
+                    + " but NiFi is not configured to communicate with a Flow Registry at that URL");
+            }
+
+            final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
+            final VersionedFlowSnapshot snapshot = flowRegistry.getFlowContents(bucketId, flowId, version);
+            final VersionedProcessGroup contents = snapshot.getFlowContents();
+
+            group.setComments(contents.getComments());
+            group.setConnections(contents.getConnections());
+            group.setControllerServices(contents.getControllerServices());
+            group.setFunnels(contents.getFunnels());
+            group.setInputPorts(contents.getInputPorts());
+            group.setLabels(contents.getLabels());
+            group.setOutputPorts(contents.getOutputPorts());
+            group.setProcessGroups(contents.getProcessGroups());
+            group.setProcessors(contents.getProcessors());
+            group.setRemoteProcessGroups(contents.getRemoteProcessGroups());
+            group.setVariables(contents.getVariables());
+        }
+
+        for (final VersionedProcessGroup child : group.getProcessGroups()) {
+            populateVersionedContentsRecursively(child);
+        }
+    }
+
+    @Override
+    public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
+        final FlowClient flowClient = getRegistryClient().getFlowClient();
+        return flowClient.get(bucketId, flowId);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java
index 828b970..d5d0d86 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java
@@ -23,7 +23,13 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.framework.security.util.SslContextFactory;
+import org.apache.nifi.util.NiFiProperties;
+
 public class StandardFlowRegistryClient implements FlowRegistryClient {
+    private NiFiProperties nifiProperties;
     private ConcurrentMap<String, FlowRegistry> registryById = new ConcurrentHashMap<>();
 
     @Override
@@ -59,6 +65,16 @@ public class StandardFlowRegistryClient implements FlowRegistryClient {
 
             registry.setName(registryName);
             registry.setDescription(description);
+        } else if (uriScheme.equalsIgnoreCase("http") || uriScheme.equalsIgnoreCase("https")) {
+            final SSLContext sslContext = SslContextFactory.createSslContext(nifiProperties, false);
+            if (sslContext == null && uriScheme.equalsIgnoreCase("https")) {
+                throw new RuntimeException("Failed to create Flow Registry for URI " + registryUrl
+                    + " because this NiFi is not configured with a Keystore/Truststore, so it is not capable of communicating with a secure Registry. "
+                    + "Please populate NiFi's Keystore/Truststore properties or connect to a NiFi Registry over http instead of https.");
+            }
+
+            registry = new RestBasedFlowRegistry(this, registryId, registryUrl, sslContext, registryName);
+            registry.setDescription(description);
         } else {
             throw new IllegalArgumentException("Cannot create Flow Registry with URI of " + registryUrl
                 + " because there are no known implementations of Flow Registries that can handle URIs of scheme " + uriScheme);
@@ -72,4 +88,8 @@ public class StandardFlowRegistryClient implements FlowRegistryClient {
     public FlowRegistry removeFlowRegistry(final String registryId) {
         return registryById.remove(registryId);
     }
+
+    public void setProperties(final NiFiProperties nifiProperties) {
+        this.nifiProperties = nifiProperties;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
index 41b98ed..aaba126 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
@@ -17,21 +17,131 @@
 
 package org.apache.nifi.registry.flow;
 
+import java.util.Objects;
 import java.util.Optional;
 
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
+
 public class StandardVersionControlInformation implements VersionControlInformation {
 
     private final String registryIdentifier;
+    private volatile String registryName;
     private final String bucketIdentifier;
+    private volatile String bucketName;
     private final String flowIdentifier;
+    private volatile String flowName;
+    private volatile String flowDescription;
     private final int version;
     private volatile VersionedProcessGroup flowSnapshot;
     private volatile Boolean modified = null;
     private volatile Boolean current = null;
 
-    public StandardVersionControlInformation(final String registryId, final String bucketId, final String flowId, final int version,
+    public static class Builder {
+        private String registryIdentifier;
+        private String registryName;
+        private String bucketIdentifier;
+        private String bucketName;
+        private String flowIdentifier;
+        private String flowName;
+        private String flowDescription;
+        private int version;
+        private VersionedProcessGroup flowSnapshot;
+        private Boolean modified = null;
+        private Boolean current = null;
+
+        public Builder registryId(String registryId) {
+            this.registryIdentifier = registryId;
+            return this;
+        }
+
+        public Builder registryName(String registryName) {
+            this.registryName = registryName;
+            return this;
+        }
+
+        public Builder bucketId(String bucketId) {
+            this.bucketIdentifier = bucketId;
+            return this;
+        }
+
+        public Builder bucketName(String bucketName) {
+            this.bucketName = bucketName;
+            return this;
+        }
+
+        public Builder flowId(String flowId) {
+            this.flowIdentifier = flowId;
+            return this;
+        }
+
+        public Builder flowName(String flowName) {
+            this.flowName = flowName;
+            return this;
+        }
+
+        public Builder flowDescription(String flowDescription) {
+            this.flowDescription = flowDescription;
+            return this;
+        }
+
+        public Builder version(int version) {
+            this.version = version;
+            return this;
+        }
+
+        public Builder modified(Boolean modified) {
+            this.modified = modified;
+            return this;
+        }
+
+        public Builder current(Boolean current) {
+            this.current = current;
+            return this;
+        }
+
+        public Builder flowSnapshot(VersionedProcessGroup snapshot) {
+            this.flowSnapshot = snapshot;
+            return this;
+        }
+
+        public static Builder fromDto(VersionControlInformationDTO dto) {
+            Builder builder = new Builder();
+            builder.registryId(dto.getRegistryId())
+                .registryName(dto.getRegistryName())
+                .bucketId(dto.getBucketId())
+                .bucketName(dto.getBucketName())
+                .flowId(dto.getFlowId())
+                .flowName(dto.getFlowName())
+                .flowDescription(dto.getFlowDescription())
+                .current(dto.getCurrent())
+                .modified(dto.getModified())
+                .version(dto.getVersion());
+
+            return builder;
+        }
+
+        public StandardVersionControlInformation build() {
+            Objects.requireNonNull(registryIdentifier, "Registry ID must be specified");
+            Objects.requireNonNull(bucketIdentifier, "Bucket ID must be specified");
+            Objects.requireNonNull(flowIdentifier, "Flow ID must be specified");
+            Objects.requireNonNull(version, "Version must be specified");
+
+            final StandardVersionControlInformation svci = new StandardVersionControlInformation(registryIdentifier, registryName,
+                bucketIdentifier, flowIdentifier, version, flowSnapshot, modified, current);
+
+            svci.setBucketName(bucketName);
+            svci.setFlowName(flowName);
+            svci.setFlowDescription(flowDescription);
+
+            return svci;
+        }
+    }
+
+
+    public StandardVersionControlInformation(final String registryId, final String registryName, final String bucketId, final String flowId, final int version,
         final VersionedProcessGroup snapshot, final Boolean modified, final Boolean current) {
         this.registryIdentifier = registryId;
+        this.registryName = registryName;
         this.bucketIdentifier = bucketId;
         this.flowIdentifier = flowId;
         this.version = version;
@@ -40,21 +150,58 @@ public class StandardVersionControlInformation implements VersionControlInformat
         this.current = current;
     }
 
+
     @Override
     public String getRegistryIdentifier() {
         return registryIdentifier;
     }
 
     @Override
+    public String getRegistryName() {
+        return registryName;
+    }
+
+    public void setRegistryName(final String registryName) {
+        this.registryName = registryName;
+    }
+
+    @Override
     public String getBucketIdentifier() {
         return bucketIdentifier;
     }
 
     @Override
+    public String getBucketName() {
+        return bucketName;
+    }
+
+    public void setBucketName(final String bucketName) {
+        this.bucketName = bucketName;
+    }
+
+    @Override
     public String getFlowIdentifier() {
         return flowIdentifier;
     }
 
+    public void setFlowName(String flowName) {
+        this.flowName = flowName;
+    }
+
+    @Override
+    public String getFlowName() {
+        return flowName;
+    }
+
+    public void setFlowDescription(String flowDescription) {
+        this.flowDescription = flowDescription;
+    }
+
+    @Override
+    public String getFlowDescription() {
+        return flowDescription;
+    }
+
     @Override
     public int getVersion() {
         return version;

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
index a75d112..a10a1b8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
@@ -80,14 +80,11 @@ public class NiFiRegistryFlowMapper {
     // created before attempting to create the connection, where the ConnectableDTO is converted.
     private Map<String, String> versionedComponentIds = new HashMap<>();
 
-    public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final FlowRegistryClient registryClient) {
+    public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean mapDescendantVersionedFlows) {
         versionedComponentIds.clear();
-        final InstantiatedVersionedProcessGroup mapped = mapGroup(group, registryClient, true);
+        final InstantiatedVersionedProcessGroup mapped = mapGroup(group, registryClient, true, mapDescendantVersionedFlows);
 
-        // TODO: Test that this works properly
         populateReferencedAncestorServices(group, mapped);
-
-        // TODO: Test that this works properly
         populateReferencedAncestorVariables(group, mapped);
 
         return mapped;
@@ -149,7 +146,10 @@ public class NiFiRegistryFlowMapper {
         if (!implicitlyDefinedVariables.isEmpty()) {
             // Merge the implicit variables with the explicitly defined variables for the Process Group
             // and set those as the Versioned Group's variables.
-            implicitlyDefinedVariables.putAll(versionedGroup.getVariables());
+            if (versionedGroup.getVariables() != null) {
+                implicitlyDefinedVariables.putAll(versionedGroup.getVariables());
+            }
+
             versionedGroup.setVariables(implicitlyDefinedVariables);
         }
     }
@@ -167,7 +167,7 @@ public class NiFiRegistryFlowMapper {
     }
 
 
-    private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean topLevel) {
+    private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean topLevel, final boolean mapDescendantVersionedFlows) {
         final InstantiatedVersionedProcessGroup versionedGroup = new InstantiatedVersionedProcessGroup(group.getIdentifier(), group.getProcessGroupIdentifier());
         versionedGroup.setIdentifier(getId(group.getVersionedComponentId(), group.getIdentifier()));
         versionedGroup.setGroupIdentifier(getGroupId(group.getProcessGroupIdentifier()));
@@ -192,10 +192,22 @@ public class NiFiRegistryFlowMapper {
                 coordinates.setBucketId(versionControlInfo.getBucketIdentifier());
                 coordinates.setFlowId(versionControlInfo.getFlowIdentifier());
                 coordinates.setVersion(versionControlInfo.getVersion());
+                versionedGroup.setVersionedFlowCoordinates(coordinates);
+
+                // We need to register the Port ID -> Versioned Component ID's in our versionedComponentIds member variable for all input & output ports.
+                // Otherwise, we will not be able to lookup the port when connecting to it.
+                for (final Port port : group.getInputPorts()) {
+                    getId(port.getVersionedComponentId(), port.getIdentifier());
+                }
+                for (final Port port : group.getOutputPorts()) {
+                    getId(port.getVersionedComponentId(), port.getIdentifier());
+                }
 
                 // If the Process Group itself is remotely versioned, then we don't want to include its contents
                 // because the contents are remotely managed and not part of the versioning of this Process Group
-                return versionedGroup;
+                if (!mapDescendantVersionedFlows) {
+                    return versionedGroup;
+                }
             }
         }
 
@@ -228,7 +240,7 @@ public class NiFiRegistryFlowMapper {
             .collect(Collectors.toCollection(LinkedHashSet::new)));
 
         versionedGroup.setProcessGroups(group.getProcessGroups().stream()
-            .map(grp -> mapGroup(grp, registryClient, false))
+            .map(grp -> mapGroup(grp, registryClient, false, mapDescendantVersionedFlows))
             .collect(Collectors.toCollection(LinkedHashSet::new)));
 
         versionedGroup.setConnections(group.getConnections().stream()

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
index 8954f39..9e81d22 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
@@ -51,7 +51,7 @@
             <xs:element name="id" type="NonEmptyStringType" />
             <xs:element name="name" type="NonEmptyStringType" />
             <xs:element name="url" type="NonEmptyStringType" />
-            <xs:element name="description" type="NonEmptyStringType" />
+            <xs:element name="description" type="xs:string" />
         </xs:sequence>
     </xs:complexType>
     
@@ -180,7 +180,10 @@
         <xs:sequence>
             <xs:element name="registryId" type="NonEmptyStringType" />
             <xs:element name="bucketId" type="NonEmptyStringType" />
+            <xs:element name="bucketName" type="NonEmptyStringType" />
             <xs:element name="flowId" type="NonEmptyStringType" />
+            <xs:element name="flowName" type="NonEmptyStringType" />
+            <xs:element name="flowDescription" type="xs:string" />
             <xs:element name="version" type="NonEmptyStringType" />
         </xs:sequence>
     </xs:complexType>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
index fc42c62..d9f89aa 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
@@ -36,7 +36,9 @@
     </bean>
 
     <!-- flow registry -->
-    <bean id="flowRegistryClient" class="org.apache.nifi.registry.flow.StandardFlowRegistryClient" />
+    <bean id="flowRegistryClient" class="org.apache.nifi.registry.flow.StandardFlowRegistryClient">
+        <property name="properties" ref="nifiProperties" />
+    </bean>
 
     <!-- flow controller -->
     <bean id="flowController" class="org.apache.nifi.spring.FlowControllerFactoryBean">

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index e299059..be907ba 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -16,6 +16,14 @@
  */
 package org.apache.nifi.web;
 
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
 import org.apache.nifi.authorization.AuthorizeAccess;
 import org.apache.nifi.authorization.RequestAction;
 import org.apache.nifi.authorization.user.NiFiUser;
@@ -23,7 +31,7 @@ import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.registry.flow.UnknownResourceException;
+import org.apache.nifi.registry.client.NiFiRegistryException;
 import org.apache.nifi.registry.flow.VersionedFlow;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
 import org.apache.nifi.registry.flow.VersionedProcessGroup;
@@ -86,6 +94,7 @@ import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
 import org.apache.nifi.web.api.entity.CurrentUserEntity;
+import org.apache.nifi.web.api.entity.FlowComparisonEntity;
 import org.apache.nifi.web.api.entity.FlowConfigurationEntity;
 import org.apache.nifi.web.api.entity.FlowEntity;
 import org.apache.nifi.web.api.entity.FunnelEntity;
@@ -115,14 +124,6 @@ import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
 
-import java.io.IOException;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
-
 /**
  * Defines the NiFiServiceFacade interface.
  */
@@ -1274,6 +1275,17 @@ public interface NiFiServiceFacade {
     // ----------------------------------------
 
     /**
+     * Returns a FlowComparisonEntity that contains all of the local modifications since the Process Group
+     * was last synchronized with the Flow Registry
+     *
+     * @param processGroupId
+     * @return a FlowComparisonEntity that contains all of the local modifications since the Process Group
+     *         was last synchronized with the Flow Registry
+     * @throws IllegalStateException if the Process Group with the given ID is not under version control
+     */
+    FlowComparisonEntity getLocalModifications(String processGroupId) throws IOException, NiFiRegistryException;
+
+    /**
      * Returns the Version Control information for the Process Group with the given ID
      *
      * @param processGroupId the ID of the Process Group
@@ -1292,7 +1304,7 @@ public interface NiFiServiceFacade {
      *
      * @throws IOException if unable to communicate with the Flow Registry
      */
-    VersionedFlow registerVersionedFlow(String registryId, VersionedFlow flow) throws IOException, UnknownResourceException;
+    VersionedFlow registerVersionedFlow(String registryId, VersionedFlow flow) throws IOException, NiFiRegistryException;
 
     /**
      * Creates a snapshot of the Process Group with the given identifier, then creates a new Flow entity in the NiFi Registry
@@ -1312,11 +1324,13 @@ public interface NiFiServiceFacade {
      * @param flow the flow where the snapshot should be persisted
      * @param snapshot the Snapshot to persist
      * @param comments about the snapshot
+     * @param expectedVersion the version to save the flow as
      * @return the snapshot that represents what was stored in the registry
      *
      * @throws IOException if unable to communicate with the Flow Registry
      */
-    VersionedFlowSnapshot registerVersionedFlowSnapshot(String registryId, VersionedFlow flow, VersionedProcessGroup snapshot, String comments) throws IOException, UnknownResourceException;
+    VersionedFlowSnapshot registerVersionedFlowSnapshot(String registryId, VersionedFlow flow, VersionedProcessGroup snapshot, String comments, int expectedVersion)
+        throws IOException, NiFiRegistryException;
 
     /**
      * Updates the Version Control Information on the Process Group with the given ID
@@ -1351,6 +1365,15 @@ public interface NiFiServiceFacade {
     VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo) throws IOException;
 
     /**
+     * Returns the name of the Flow Registry that is registered with the given ID. If no Flow Registry exists with the given ID, will return
+     * the ID itself as the name
+     *
+     * @param flowRegistryId the id of the flow registry
+     * @return the name of the Flow Registry that is registered with the given ID, or the ID itself if no Flow Registry is registered with the given ID
+     */
+    String getFlowRegistryName(String flowRegistryId);
+
+    /**
      * Determines which components currently exist in the Process Group with the given identifier and calculates which of those components
      * would be impacted by updating the Process Group to the provided snapshot
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index a319f27..89e00ba 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -90,7 +90,7 @@ import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.registry.bucket.Bucket;
 import org.apache.nifi.registry.flow.FlowRegistry;
 import org.apache.nifi.registry.flow.FlowRegistryClient;
-import org.apache.nifi.registry.flow.UnknownResourceException;
+import org.apache.nifi.registry.client.NiFiRegistryException;
 import org.apache.nifi.registry.flow.VersionControlInformation;
 import org.apache.nifi.registry.flow.VersionedComponent;
 import org.apache.nifi.registry.flow.VersionedConnection;
@@ -101,11 +101,13 @@ import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
 import org.apache.nifi.registry.flow.VersionedProcessGroup;
 import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
 import org.apache.nifi.registry.flow.diff.DifferenceType;
+import org.apache.nifi.registry.flow.diff.EvolvingDifferenceDescriptor;
 import org.apache.nifi.registry.flow.diff.FlowComparator;
 import org.apache.nifi.registry.flow.diff.FlowComparison;
 import org.apache.nifi.registry.flow.diff.FlowDifference;
 import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
 import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
+import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor;
 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
@@ -128,6 +130,7 @@ import org.apache.nifi.web.api.dto.BulletinDTO;
 import org.apache.nifi.web.api.dto.BulletinQueryDTO;
 import org.apache.nifi.web.api.dto.ClusterDTO;
 import org.apache.nifi.web.api.dto.ComponentDTO;
+import org.apache.nifi.web.api.dto.ComponentDifferenceDTO;
 import org.apache.nifi.web.api.dto.ComponentHistoryDTO;
 import org.apache.nifi.web.api.dto.ComponentReferenceDTO;
 import org.apache.nifi.web.api.dto.ComponentStateDTO;
@@ -205,6 +208,7 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
 import org.apache.nifi.web.api.entity.CurrentUserEntity;
+import org.apache.nifi.web.api.entity.FlowComparisonEntity;
 import org.apache.nifi.web.api.entity.FlowConfigurationEntity;
 import org.apache.nifi.web.api.entity.FlowEntity;
 import org.apache.nifi.web.api.entity.FunnelEntity;
@@ -3628,6 +3632,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
 
     @Override
     public VersionControlComponentMappingEntity registerFlowWithFlowRegistry(final String groupId, final StartVersionControlRequestEntity requestEntity) {
+        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+        final VersionControlInformation currentVci = processGroup.getVersionControlInformation();
+        final int expectedVersion = currentVci == null ? 1 : currentVci.getVersion() + 1;
+
         // Create a VersionedProcessGroup snapshot of the flow as it is currently.
         final InstantiatedVersionedProcessGroup versionedProcessGroup = createFlowSnapshot(groupId);
 
@@ -3660,8 +3668,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
 
             // add first snapshot to the flow in the registry
             final String comments = versionedFlow.getDescription() == null ? "Initial version of flow" : versionedFlow.getDescription();
-            registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, comments);
-        } catch (final UnknownResourceException e) {
+            registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, comments, expectedVersion);
+        } catch (final NiFiRegistryException e) {
             throw new IllegalArgumentException(e);
         } catch (final IOException ioe) {
             // will result in a 500: Internal Server Error
@@ -3671,12 +3679,15 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         // Update the Process Group with the new VersionControlInformation. (Send this to all nodes).
         final VersionControlInformationDTO vci = new VersionControlInformationDTO();
         vci.setBucketId(registeredFlow.getBucketIdentifier());
+        vci.setBucketName(registeredFlow.getBucketName());
         vci.setCurrent(true);
         vci.setFlowId(registeredFlow.getIdentifier());
         vci.setFlowName(registeredFlow.getName());
+        vci.setFlowDescription(registeredFlow.getDescription());
         vci.setGroupId(groupId);
         vci.setModified(false);
         vci.setRegistryId(registryId);
+        vci.setRegistryName(getFlowRegistryName(registryId));
         vci.setVersion(registeredSnapshot.getSnapshotMetadata().getVersion());
 
         final Map<String, String> mapping = dtoFactory.createVersionControlComponentMappingDto(versionedProcessGroup);
@@ -3707,12 +3718,67 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     private InstantiatedVersionedProcessGroup createFlowSnapshot(final String processGroupId) {
         final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
         final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
-        final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient);
+        final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, false);
         return versionedGroup;
     }
 
     @Override
-    public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) throws IOException, UnknownResourceException {
+    public FlowComparisonEntity getLocalModifications(final String processGroupId) throws IOException, NiFiRegistryException {
+        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
+        final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
+        if (versionControlInfo == null) {
+            throw new IllegalStateException("Process Group with ID " + processGroupId + " is not under Version Control");
+        }
+
+        final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryIdentifier());
+        if (flowRegistry == null) {
+            throw new IllegalStateException("Process Group with ID " + processGroupId + " is tracking to a flow in Flow Registry with ID " + versionControlInfo.getRegistryIdentifier()
+                + " but cannot find a Flow Registry with that identifier");
+        }
+
+        final VersionedFlowSnapshot versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(),
+            versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion());
+
+
+        final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
+        final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, true);
+        final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents();
+
+        final ComparableDataFlow localFlow = new ComparableDataFlow() {
+            @Override
+            public VersionedProcessGroup getContents() {
+                return localGroup;
+            }
+
+            @Override
+            public String getName() {
+                return "Local Flow";
+            }
+        };
+
+        final ComparableDataFlow registryFlow = new ComparableDataFlow() {
+            @Override
+            public VersionedProcessGroup getContents() {
+                return registryGroup;
+            }
+
+            @Override
+            public String getName() {
+                return "Versioned Flow";
+            }
+        };
+
+        final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new EvolvingDifferenceDescriptor());
+        final FlowComparison flowComparison = flowComparator.compare();
+
+        final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison);
+        final FlowComparisonEntity entity = new FlowComparisonEntity();
+        entity.setComponentDifferences(differenceDtos);
+        return entity;
+    }
+
+    @Override
+    public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) throws IOException, NiFiRegistryException {
         final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
         if (registry == null) {
             throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
@@ -3721,7 +3787,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         return registry.registerVersionedFlow(flow);
     }
 
-    private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, UnknownResourceException {
+    private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
         final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
         if (registry == null) {
             throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
@@ -3732,13 +3798,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
 
     @Override
     public VersionedFlowSnapshot registerVersionedFlowSnapshot(final String registryId, final VersionedFlow flow,
-            final VersionedProcessGroup snapshot, final String comments) throws IOException, UnknownResourceException {
+        final VersionedProcessGroup snapshot, final String comments, final int expectedVersion) throws IOException, NiFiRegistryException {
         final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
         if (registry == null) {
             throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
         }
 
-        return registry.registerVersionedFlowSnapshot(flow, snapshot, comments);
+        return registry.registerVersionedFlowSnapshot(flow, snapshot, comments, expectedVersion);
     }
 
     @Override
@@ -3778,12 +3844,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
 
         final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
-        final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, flowRegistryClient);
+        final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, flowRegistryClient, true);
 
         final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localContents);
-        final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Proposed Flow", updatedSnapshot.getFlowContents());
+        final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Versioned Flow", updatedSnapshot.getFlowContents());
 
-        final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow);
+        final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, new StaticDifferenceDescriptor());
         final FlowComparison comparison = flowComparator.compare();
 
         final Set<AffectedComponentEntity> affectedComponents = comparison.getDifferences().stream()
@@ -3958,7 +4024,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         final VersionedFlowSnapshot snapshot;
         try {
             snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion());
-        } catch (final UnknownResourceException e) {
+        } catch (final NiFiRegistryException e) {
             throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket "
                 + versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion());
         }
@@ -3969,6 +4035,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         return snapshot;
     }
 
+    @Override
+    public String getFlowRegistryName(final String flowRegistryId) {
+        final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(flowRegistryId);
+        return flowRegistry == null ? flowRegistryId : flowRegistry.getName();
+    }
+
     private void populateVersionedChildFlows(final VersionedFlowSnapshot snapshot) throws IOException {
         final VersionedProcessGroup group = snapshot.getFlowContents();
 
@@ -3993,7 +4065,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
             final VersionedFlowSnapshot childSnapshot;
             try {
                 childSnapshot = flowRegistry.getFlowContents(remoteCoordinates.getBucketId(), remoteCoordinates.getFlowId(), remoteCoordinates.getVersion());
-            } catch (final UnknownResourceException e) {
+            } catch (final NiFiRegistryException e) {
                 throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket "
                     + remoteCoordinates.getBucketId() + ", Flow " + remoteCoordinates.getFlowId() + ", Version " + remoteCoordinates.getVersion());
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index b8bdc14..6bf4cca 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -39,6 +39,7 @@ import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.nar.NarClassLoaders;
+import org.apache.nifi.registry.client.NiFiRegistryException;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.IllegalClusterResourceRequestException;
 import org.apache.nifi.web.NiFiServiceFacade;
@@ -1373,7 +1374,7 @@ public class FlowResource extends ApplicationResource {
                     value = "The registry id.",
                     required = true
             )
-            @PathParam("id") String id) {
+            @PathParam("id") String id) throws NiFiRegistryException {
 
         authorizeFlow();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 11c548f..d24dcbb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -87,8 +87,10 @@ import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.serialization.FlowEncodingVersion;
 import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.registry.client.NiFiRegistryException;
 import org.apache.nifi.registry.flow.FlowRegistryUtils;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
 import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
 import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
 import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
@@ -123,6 +125,7 @@ import org.apache.nifi.web.api.entity.ControllerServicesEntity;
 import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
 import org.apache.nifi.web.api.entity.CreateTemplateRequestEntity;
 import org.apache.nifi.web.api.entity.Entity;
+import org.apache.nifi.web.api.entity.FlowComparisonEntity;
 import org.apache.nifi.web.api.entity.FlowEntity;
 import org.apache.nifi.web.api.entity.FunnelEntity;
 import org.apache.nifi.web.api.entity.FunnelsEntity;
@@ -301,6 +304,53 @@ public class ProcessGroupResource extends ApplicationResource {
 
 
     /**
+     * Retrieves a list of local modifications to the Process Group since it was last synchronized with the Flow Registry
+     *
+     * @param groupId The id of the process group.
+     * @return A processGroupEntity.
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/local-modifications")
+    @ApiOperation(
+            value = "Gets a list of local modifications to the Process Group since it was last synchronized with the Flow Registry",
+            response = FlowComparisonEntity.class,
+            authorizations = {
+            @Authorization(value = "Read - /process-groups/{uuid}"),
+            @Authorization(value = "Read - /{component-type}/{uuid} - For all encapsulated components")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+            }
+    )
+    public Response getLocalModifications(
+            @ApiParam(
+                    value = "The process group id.",
+                    required = false
+            )
+            @PathParam("id") final String groupId) throws IOException, NiFiRegistryException {
+
+        // authorize access
+        serviceFacade.authorizeAccess(lookup -> {
+            final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
+            final Authorizable processGroup = groupAuthorizable.getAuthorizable();
+            processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+            super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, false, false, true, false);
+        });
+
+        final FlowComparisonEntity entity = serviceFacade.getLocalModifications(groupId);
+        return generateOkResponse(entity).build();
+    }
+
+
+    /**
      * Retrieves the Variable Registry for the group with the given ID
      *
      * @param groupId the ID of the Process Group
@@ -1594,6 +1644,13 @@ public class ProcessGroupResource extends ApplicationResource {
             // Step 2: Retrieve flow from Flow Registry
             final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo);
 
+            final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata();
+            versionControlInfo.setBucketName(metadata.getBucketName());
+            versionControlInfo.setFlowName(metadata.getFlowName());
+            versionControlInfo.setFlowDescription(metadata.getFlowDescription());
+
+            versionControlInfo.setRegistryName(serviceFacade.getFlowRegistryName(versionControlInfo.getRegistryId()));
+
             // Step 3: Resolve Bundle info
             BundleUtils.discoverCompatibleBundles(flowSnapshot.getFlowContents());
 
@@ -1635,14 +1692,14 @@ public class ProcessGroupResource extends ApplicationResource {
 
                     // create the process group contents
                     final Revision revision = getRevision(processGroupGroupEntity, processGroupGroupEntity.getComponent().getId());
-                    final ProcessGroupEntity entity = serviceFacade.createProcessGroup(revision, groupId, processGroupGroupEntity.getComponent());
+                    ProcessGroupEntity entity = serviceFacade.createProcessGroup(revision, groupId, processGroupGroupEntity.getComponent());
 
                     final VersionedFlowSnapshot flowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot();
                     if (flowSnapshot != null) {
                         final RevisionDTO revisionDto = entity.getRevision();
                         final String newGroupId = entity.getComponent().getId();
                         final Revision newGroupRevision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), newGroupId);
-                        serviceFacade.updateProcessGroupContents(NiFiUserUtils.getNiFiUser(), newGroupRevision, newGroupId,
+                        entity = serviceFacade.updateProcessGroupContents(NiFiUserUtils.getNiFiUser(), newGroupRevision, newGroupId,
                             versionControlInfo, flowSnapshot, getIdGenerationSeed().orElse(null), false, false);
                     }