You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:14:09 UTC

[21/50] nifi git commit: NIFI-4436: Integrate with actual Flow Registry via REST Client - Store Bucket Name, Flow Name, Flow Description for VersionControlInformation - Added endpoint for determining local modifications to a process group - Updated autho

NIFI-4436: Integrate with actual Flow Registry via REST Client - Store Bucket Name, Flow Name, Flow Description for VersionControlInformation - Added endpoint for determining local modifications to a process group - Updated authorizations required for version control endpoints - Add state and percent complete fields ot VersionedFlowUpdateRequestDTO - If a variable exists in a parent process group, do not include it in imported/updated process group when interacting with flow registry - Code cleanup, documentation; bug fixes - Ensure that we are passing NiFiUser to the flow registry client when appropriate - Updated to work against new version of flow registry client; deleted file-based flow registry client

Signed-off-by: Matt Gilman <ma...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f6cc5b6c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f6cc5b6c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f6cc5b6c

Branch: refs/heads/master
Commit: f6cc5b6cdca86b3f535754f7ec4d91ca61f2e35b
Parents: d6e54f1
Author: Mark Payne <ma...@hotmail.com>
Authored: Sat Nov 4 14:19:49 2017 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Jan 8 12:44:53 2018 -0500

----------------------------------------------------------------------
 .../apache/nifi/registry/flow/FlowRegistry.java |  54 +-
 .../flow/VersionControlInformation.java         |  14 +-
 .../nifi/groups/StandardProcessGroup.java       |  53 +-
 .../registry/flow/FileBasedFlowRegistry.java    | 509 -------------------
 .../registry/flow/RestBasedFlowRegistry.java    |  64 ++-
 .../flow/StandardFlowRegistryClient.java        |  14 +-
 .../flow/StandardVersionControlInformation.java |  23 +-
 .../org/apache/nifi/web/NiFiServiceFacade.java  |  10 +
 .../nifi/web/StandardNiFiServiceFacade.java     |  51 +-
 .../nifi/web/api/ProcessGroupResource.java      |  12 +-
 .../apache/nifi/web/api/VersionsResource.java   | 361 +++++++------
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  57 ++-
 12 files changed, 424 insertions(+), 798 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f6cc5b6c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
index 10db9cf..76f96f2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
@@ -84,14 +84,6 @@ public interface FlowRegistry {
     Bucket getBucket(String bucketId, NiFiUser user) throws IOException, NiFiRegistryException;
 
     /**
-     * Gets the bucket with the given ID
-     *
-     * @param bucketId the id of the bucket
-     * @return the bucket with the given ID
-     */
-    Bucket getBucket(String bucketId) throws IOException, NiFiRegistryException;
-
-    /**
      * Retrieves the set of all Versioned Flows for the specified bucket
      *
      * @param bucketId the ID of the bucket
@@ -123,7 +115,15 @@ public interface FlowRegistry {
      * @throws NullPointerException if the VersionedFlow is null, or if its bucket identifier or name is null
      * @throws NiFiRegistryException if the bucket id does not exist
      */
-    VersionedFlow registerVersionedFlow(VersionedFlow flow) throws IOException, NiFiRegistryException;
+    VersionedFlow registerVersionedFlow(VersionedFlow flow, NiFiUser user) throws IOException, NiFiRegistryException;
+
+    /**
+     * Deletes the specified flow from the Flow Registry
+     *
+     * @param bucketId the ID of the bucket
+     * @param flowId the ID of the flow
+     */
+    VersionedFlow deleteVersionedFlow(String bucketId, String flowId, NiFiUser user) throws IOException, NiFiRegistryException;
 
     /**
      * Adds the given snapshot to the Flow Registry for the given flow
@@ -138,7 +138,8 @@ public interface FlowRegistry {
      * @throws NullPointerException if the VersionedFlow is null, or if its bucket identifier is null, or if the flow to snapshot is null
      * @throws NiFiRegistryException if the flow does not exist
      */
-    VersionedFlowSnapshot registerVersionedFlowSnapshot(VersionedFlow flow, VersionedProcessGroup snapshot, String comments, int expectedVersion) throws IOException, NiFiRegistryException;
+    VersionedFlowSnapshot registerVersionedFlowSnapshot(VersionedFlow flow, VersionedProcessGroup snapshot, String comments, int expectedVersion, NiFiUser user)
+        throws IOException, NiFiRegistryException;
 
     /**
      * Returns the latest (most recent) version of the Flow in the Flow Registry for the given bucket and flow
@@ -150,7 +151,23 @@ public interface FlowRegistry {
      * @throws IOException if unable to communicate with the Flow Registry
      * @throws NiFiRegistryException if unable to find the bucket with the given ID or the flow with the given ID
      */
-    int getLatestVersion(String bucketId, String flowId) throws IOException, NiFiRegistryException;
+    int getLatestVersion(String bucketId, String flowId, NiFiUser user) throws IOException, NiFiRegistryException;
+
+    /**
+     * Retrieves the contents of the Flow with the given Bucket ID, Flow ID, and version, from the Flow Registry
+     *
+     * @param bucketId the ID of the bucket
+     * @param flowId the ID of the flow
+     * @param version the version to retrieve
+     * @return the contents of the Flow from the Flow Registry
+     *
+     * @throws IOException if unable to communicate with the Flow Registry
+     * @throws NiFiRegistryException if unable to find the contents of the flow due to the bucket or flow not existing,
+     *             or the specified version of the flow not existing
+     * @throws NullPointerException if any of the arguments is not specified
+     * @throws IllegalArgumentException if the given version is less than 1
+     */
+    VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version, NiFiUser user) throws IOException, NiFiRegistryException;
 
     /**
      * Retrieves the contents of the Flow with the given Bucket ID, Flow ID, and version, from the Flow Registry
@@ -166,7 +183,6 @@ public interface FlowRegistry {
      * @throws NullPointerException if any of the arguments is not specified
      * @throws IllegalArgumentException if the given version is less than 1
      */
-    // TODO: Need to pass in user
     VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version) throws IOException, NiFiRegistryException;
 
     /**
@@ -179,6 +195,18 @@ public interface FlowRegistry {
      * @throws IOException if unable to communicate with the Flow Registry
      * @throws NiFiRegistryException if unable to find a flow with the given bucket ID and flow ID
      */
-    // TODO: Need to pass in user
+    VersionedFlow getVersionedFlow(String bucketId, String flowId, NiFiUser user) throws IOException, NiFiRegistryException;
+
+    /**
+     * Retrieves a VersionedFlow by bucket id and flow id
+     *
+     * @param bucketId the ID of the bucket
+     * @param flowId the ID of the flow
+     * @return the VersionedFlow for the given bucket and flow ID's
+     *
+     * @throws IOException if unable to communicate with the Flow Registry
+     * @throws NiFiRegistryException if unable to find a flow with the given bucket ID and flow ID
+     */
+    // TODO: Do we still need this?
     VersionedFlow getVersionedFlow(String bucketId, String flowId) throws IOException, NiFiRegistryException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f6cc5b6c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
index 67c3635..b54a1c9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
@@ -17,8 +17,6 @@
 
 package org.apache.nifi.registry.flow;
 
-import java.util.Optional;
-
 /**
  * <p>
  * Provides a mechanism for conveying which Flow Registry a flow is stored in, and
@@ -69,18 +67,14 @@ public interface VersionControlInformation {
 
     /**
      * @return <code>true</code> if the flow has been modified since the last time that it was updated from the Flow Registry or saved
-     *         to the Flow Registry; <code>false</code> if the flow is in sync with the Flow Registry. An empty optional will be returned
-     *         if it is not yet known whether or not the flow has been modified (for example, on startup, when the flow has not yet been
-     *         fetched from the Flow Registry)
+     *         to the Flow Registry; <code>false</code> if the flow is in sync with the Flow Registry.
      */
-    Optional<Boolean> getModified();
+    boolean isModified();
 
     /**
-     * @return <code>true</code> if this version of the flow is the most recent version of the flow available in the Flow Registry.
-     *         An empty optional will be returned if it is not yet known whether or not the flow has been modified (for example, on startup,
-     *         when the flow has not yet been fetched from the Flow Registry)
+     * @return <code>true</code> if this version of the flow is the most recent version of the flow available in the Flow Registry, <code>false</code> otherwise.
      */
-    Optional<Boolean> getCurrent();
+    boolean isCurrent();
 
     /**
      * @return the snapshot of the flow that was synchronized with the Flow Registry

http://git-wip-us.apache.org/repos/asf/nifi/blob/f6cc5b6c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 2783e96..d1aa4e2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -2822,17 +2822,17 @@ public final class StandardProcessGroup implements ProcessGroup {
             versionControlInformation.getFlowIdentifier(),
             versionControlInformation.getVersion(),
             versionControlInformation.getFlowSnapshot(),
-            versionControlInformation.getModified().orElse(null),
-            versionControlInformation.getCurrent().orElse(null)) {
+            versionControlInformation.isModified(),
+            versionControlInformation.isCurrent()) {
 
             @Override
-            public Optional<Boolean> getModified() {
+            public boolean isModified() {
                 final Set<FlowDifference> differences = StandardProcessGroup.this.getModifications();
                 if (differences == null) {
-                    return Optional.ofNullable(null);
+                    return false;
                 }
 
-                return Optional.of(!differences.isEmpty());
+                return !differences.isEmpty();
             }
         };
 
@@ -2938,7 +2938,6 @@ public final class StandardProcessGroup implements ProcessGroup {
         try {
             final VersionedFlow versionedFlow = flowRegistry.getVersionedFlow(vci.getBucketIdentifier(), vci.getFlowIdentifier());
             final int latestVersion = (int) versionedFlow.getVersionCount();
-
             vci.setBucketName(versionedFlow.getBucketName());
             vci.setFlowName(versionedFlow.getName());
             vci.setFlowDescription(versionedFlow.getDescription());
@@ -2986,7 +2985,8 @@ public final class StandardProcessGroup implements ProcessGroup {
                 LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", this, proposedSnapshot, flowComparison.getDifferences().size(), differencesByLine);
             }
 
-            updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings);
+            final Set<String> knownVariables = getKnownVariableNames();
+            updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings, knownVariables);
         } catch (final ProcessorInstantiationException pie) {
             throw new RuntimeException(pie);
         } finally {
@@ -2994,9 +2994,26 @@ public final class StandardProcessGroup implements ProcessGroup {
         }
     }
 
+    private Set<String> getKnownVariableNames() {
+        final Set<String> variableNames = new HashSet<>();
+        populateKnownVariableNames(this, variableNames);
+        return variableNames;
+    }
+
+    private void populateKnownVariableNames(final ProcessGroup group, final Set<String> knownVariables) {
+        group.getVariableRegistry().getVariableMap().keySet().stream()
+            .map(VariableDescriptor::getName)
+            .forEach(knownVariables::add);
+
+        final ProcessGroup parent = group.getParent();
+        if (parent != null) {
+            populateKnownVariableNames(parent, knownVariables);
+        }
+    }
+
 
     private void updateProcessGroup(final ProcessGroup group, final VersionedProcessGroup proposed, final String componentIdSeed,
-        final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName) throws ProcessorInstantiationException {
+        final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName, final Set<String> variablesToSkip) throws ProcessorInstantiationException {
 
         group.setComments(proposed.getComments());
 
@@ -3027,7 +3044,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
         // If any new variables exist in the proposed flow, add those to the variable registry.
         for (final Map.Entry<String, String> entry : proposed.getVariables().entrySet()) {
-            if (!existingVariableNames.contains(entry.getKey())) {
+            if (!existingVariableNames.contains(entry.getKey()) && !variablesToSkip.contains(entry.getKey())) {
                 updatedVariableMap.put(entry.getKey(), entry.getValue());
             }
         }
@@ -3068,10 +3085,10 @@ public final class StandardProcessGroup implements ProcessGroup {
             final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
 
             if (childGroup == null) {
-                final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed);
+                final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip);
                 LOG.info("Added {} to {}", added, this);
             } else {
-                updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName);
+                updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName, variablesToSkip);
                 LOG.info("Updated {}", childGroup);
             }
 
@@ -3345,11 +3362,12 @@ public final class StandardProcessGroup implements ProcessGroup {
     }
 
 
-    private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final String componentIdSeed) throws ProcessorInstantiationException {
+    private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final String componentIdSeed, final Set<String> variablesToSkip)
+            throws ProcessorInstantiationException {
         final ProcessGroup group = flowController.createProcessGroup(generateUuid(componentIdSeed));
         group.setVersionedComponentId(proposed.getIdentifier());
         group.setParent(destination);
-        updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true);
+        updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true, variablesToSkip);
         destination.addProcessGroup(group);
         return group;
     }
@@ -3771,16 +3789,11 @@ public final class StandardProcessGroup implements ProcessGroup {
                 }
 
                 if (verifyNotDirty) {
-                    final Optional<Boolean> modifiedOption = versionControlInfo.getModified();
-                    if (!modifiedOption.isPresent()) {
-                        throw new IllegalStateException(this + " cannot be updated to a different version of the flow because the local flow "
-                            + "has not yet been synchronized with the Flow Registry. The Process Group must be"
-                            + " synched with the Flow Registry before continuing. This will happen periodically in the background, so please try the request again later");
-                    }
+                    final boolean modified = versionControlInfo.isModified();
 
                     final Set<FlowDifference> modifications = getModifications();
 
-                    if (Boolean.TRUE.equals(modifiedOption.get())) {
+                    if (modified) {
                         final String changes = modifications.stream()
                             .map(FlowDifference::toString)
                             .collect(Collectors.joining("\n"));

http://git-wip-us.apache.org/repos/asf/nifi/blob/f6cc5b6c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
deleted file mode 100644
index 9b3ba94..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
+++ /dev/null
@@ -1,509 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.registry.flow;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
-
-import org.apache.nifi.authorization.user.NiFiUser;
-import org.apache.nifi.registry.bucket.Bucket;
-import org.apache.nifi.registry.client.NiFiRegistryException;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-/**
- * A simple file-based implementation of a Flow Registry Client. Rather than interacting
- * with an actual Flow Registry, this implementation simply reads flows from disk and writes
- * them to disk. It is not meant for any production use but is available for testing purposes.
- */
-public class FileBasedFlowRegistry implements FlowRegistry {
-    private final File directory;
-    private final Map<String, Set<String>> flowNamesByBucket = new HashMap<>();
-    private final JsonFactory jsonFactory = new JsonFactory();
-    private final String id;
-    private volatile String name = "Local Registry";
-    private volatile String url = "file:" + (new File("..").getAbsolutePath());
-    private volatile String description = "Default file-based Flow Registry";
-
-    public FileBasedFlowRegistry(final String id, final String url) throws IOException {
-        final URI uri = URI.create(url);
-        if (!uri.getScheme().equalsIgnoreCase("file")) {
-            throw new IllegalArgumentException("Cannot create a File Based Flow Registry with a URL of " + url + "; URL scheme must be 'file'");
-        }
-
-        this.directory = new File(URI.create(url).getPath());
-
-        if (!directory.exists() && !directory.mkdirs()) {
-            throw new IOException("Could not access or create directory " + directory.getAbsolutePath() + " for Flow Registry");
-        }
-
-        this.id = id;
-        this.url = url;
-        recoverBuckets();
-    }
-
-    private void recoverBuckets() throws IOException {
-        final File[] bucketDirs = directory.listFiles();
-        if (bucketDirs == null) {
-            throw new IOException("Could not get listing of directory " + directory);
-        }
-
-        for (final File bucketDir : bucketDirs) {
-            final File[] flowDirs = bucketDir.listFiles();
-            if (flowDirs == null) {
-                throw new IOException("Could not get listing of directory " + bucketDir);
-            }
-
-            final Set<String> flowNames = new HashSet<>();
-            for (final File flowDir : flowDirs) {
-                final File propsFile = new File(flowDir, "flow.properties");
-                if (!propsFile.exists()) {
-                    continue;
-                }
-
-                final Properties properties = new Properties();
-                try (final InputStream in = new FileInputStream(propsFile)) {
-                    properties.load(in);
-                }
-
-                final String flowName = properties.getProperty("name");
-                if (flowName == null) {
-                    continue;
-                }
-
-                flowNames.add(flowName);
-            }
-
-            if (!flowNames.isEmpty()) {
-                flowNamesByBucket.put(bucketDir.getName(), flowNames);
-            }
-        }
-    }
-
-    @Override
-    public String getURL() {
-        return url;
-    }
-
-    @Override
-    public String getName() {
-        return name;
-    }
-
-    @Override
-    public Set<Bucket> getBuckets(NiFiUser user) throws IOException {
-        final Set<Bucket> buckets = new HashSet<>();
-
-        final File[] bucketDirs = directory.listFiles();
-        if (bucketDirs == null) {
-            throw new IOException("Could not get listing of directory " + directory);
-        }
-
-        for (final File bucketDirectory : bucketDirs) {
-            final String bucketIdentifier = bucketDirectory.getName();
-            final long creation = bucketDirectory.lastModified();
-
-            final Bucket bucket = new Bucket();
-            bucket.setIdentifier(bucketIdentifier);
-            bucket.setName("Bucket '" + bucketIdentifier + "'");
-            bucket.setCreatedTimestamp(creation);
-
-            final Set<VersionedFlow> versionedFlows = new HashSet<>();
-            final File[] flowDirs = bucketDirectory.listFiles();
-            if (flowDirs != null) {
-                for (final File flowDir : flowDirs) {
-                    final String flowIdentifier = flowDir.getName();
-                    try {
-                        final VersionedFlow versionedFlow = getVersionedFlow(bucketIdentifier, flowIdentifier);
-                        versionedFlows.add(versionedFlow);
-                    } catch (NiFiRegistryException e) {
-                        continue;
-                    }
-                }
-            }
-
-            bucket.setVersionedFlows(versionedFlows);
-
-            buckets.add(bucket);
-        }
-
-        return buckets;
-    }
-
-    @Override
-    public Bucket getBucket(String bucketId) throws IOException, NiFiRegistryException {
-        return getBucket(bucketId, null);
-    }
-
-    @Override
-    public Bucket getBucket(String bucketId, NiFiUser user) throws IOException, NiFiRegistryException {
-        return getBuckets(user).stream().filter(b -> b.getIdentifier().equals(bucketId)).findFirst().orElse(null);
-    }
-
-    @Override
-    public Set<VersionedFlow> getFlows(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
-        final Bucket bucket = getBuckets(user).stream().filter(b -> bucketId.equals(b.getIdentifier())).findFirst().orElse(null);
-        if (bucket == null) {
-            return Collections.emptySet();
-        }
-
-        return bucket.getVersionedFlows();
-    }
-
-    @Override
-    public Set<VersionedFlowSnapshotMetadata> getFlowVersions(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
-        final VersionedFlow flow = getFlows(bucketId, user).stream().filter(f -> flowId.equals(f.getIdentifier())).findFirst().orElse(null);
-        if (flow == null) {
-            return Collections.emptySet();
-        }
-
-        return flow.getSnapshotMetadata();
-    }
-
-    @Override
-    public synchronized VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, NiFiRegistryException {
-        Objects.requireNonNull(flow);
-        Objects.requireNonNull(flow.getBucketIdentifier());
-        Objects.requireNonNull(flow.getName());
-
-        // Verify that bucket exists
-        final File bucketDir = new File(directory, flow.getBucketIdentifier());
-        if (!bucketDir.exists()) {
-            throw new NiFiRegistryException("No bucket exists with ID " + flow.getBucketIdentifier());
-        }
-
-        // Verify that there is no flow with the same name in that bucket
-        final Set<String> flowNames = flowNamesByBucket.get(flow.getBucketIdentifier());
-        if (flowNames != null && flowNames.contains(flow.getName())) {
-            throw new IllegalArgumentException("Flow with name '" + flow.getName() + "' already exists for Bucket with ID " + flow.getBucketIdentifier());
-        }
-
-        final String flowIdentifier = UUID.randomUUID().toString();
-        final File flowDir = new File(bucketDir, flowIdentifier);
-        if (!flowDir.mkdirs()) {
-            throw new IOException("Failed to create directory " + flowDir + " for new Flow");
-        }
-
-        final File propertiesFile = new File(flowDir, "flow.properties");
-
-        final Properties flowProperties = new Properties();
-        flowProperties.setProperty("name", flow.getName());
-        flowProperties.setProperty("created", String.valueOf(flow.getCreatedTimestamp()));
-        flowProperties.setProperty("description", flow.getDescription());
-        flowProperties.setProperty("lastModified", String.valueOf(flow.getModifiedTimestamp()));
-
-        try (final OutputStream out = new FileOutputStream(propertiesFile)) {
-            flowProperties.store(out, null);
-        }
-
-        final VersionedFlow response = new VersionedFlow();
-        response.setBucketIdentifier(flow.getBucketIdentifier());
-        response.setCreatedTimestamp(flow.getCreatedTimestamp());
-        response.setDescription(flow.getDescription());
-        response.setIdentifier(flowIdentifier);
-        response.setModifiedTimestamp(flow.getModifiedTimestamp());
-        response.setName(flow.getName());
-
-        return response;
-    }
-
-    @Override
-    public synchronized VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments, final int expectedVersion)
-        throws IOException, NiFiRegistryException {
-        Objects.requireNonNull(flow);
-        Objects.requireNonNull(flow.getBucketIdentifier());
-        Objects.requireNonNull(flow.getName());
-        Objects.requireNonNull(snapshot);
-
-        // Verify that the bucket exists
-        final File bucketDir = new File(directory, flow.getBucketIdentifier());
-        if (!bucketDir.exists()) {
-            throw new NiFiRegistryException("No bucket exists with ID " + flow.getBucketIdentifier());
-        }
-
-        // Verify that the flow exists
-        final File flowDir = new File(bucketDir, flow.getIdentifier());
-        if (!flowDir.exists()) {
-            throw new NiFiRegistryException("No Flow with ID " + flow.getIdentifier() + " exists for Bucket with ID " + flow.getBucketIdentifier());
-        }
-
-        final File[] versionDirs = flowDir.listFiles();
-        if (versionDirs == null) {
-            throw new IOException("Unable to perform listing of directory " + flowDir);
-        }
-
-        int maxVersion = 0;
-        for (final File versionDir : versionDirs) {
-            final String versionName = versionDir.getName();
-
-            final int version;
-            try {
-                version = Integer.parseInt(versionName);
-            } catch (final NumberFormatException nfe) {
-                continue;
-            }
-
-            if (version > maxVersion) {
-                maxVersion = version;
-            }
-        }
-
-        final int snapshotVersion = maxVersion + 1;
-        final File snapshotDir = new File(flowDir, String.valueOf(snapshotVersion));
-        if (!snapshotDir.mkdir()) {
-            throw new IOException("Could not create directory " + snapshotDir);
-        }
-
-        final File contentsFile = new File(snapshotDir, "flow.xml");
-
-        try (final OutputStream out = new FileOutputStream(contentsFile);
-            final JsonGenerator generator = jsonFactory.createGenerator(out)) {
-            generator.setCodec(new ObjectMapper());
-            generator.setPrettyPrinter(new DefaultPrettyPrinter());
-            generator.writeObject(snapshot);
-        }
-
-        final Properties snapshotProperties = new Properties();
-        snapshotProperties.setProperty("comments", comments);
-        snapshotProperties.setProperty("name", flow.getName());
-        final File snapshotPropsFile = new File(snapshotDir, "snapshot.properties");
-        try (final OutputStream out = new FileOutputStream(snapshotPropsFile)) {
-            snapshotProperties.store(out, null);
-        }
-
-        final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
-        snapshotMetadata.setBucketIdentifier(flow.getBucketIdentifier());
-        snapshotMetadata.setComments(comments);
-        snapshotMetadata.setFlowIdentifier(flow.getIdentifier());
-        snapshotMetadata.setFlowName(flow.getName());
-        snapshotMetadata.setTimestamp(System.currentTimeMillis());
-        snapshotMetadata.setVersion(snapshotVersion);
-
-        final VersionedFlowSnapshot response = new VersionedFlowSnapshot();
-        response.setSnapshotMetadata(snapshotMetadata);
-        response.setFlowContents(snapshot);
-        return response;
-    }
-
-    @Override
-    public int getLatestVersion(final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
-        // Verify that the bucket exists
-        final File bucketDir = new File(directory, bucketId);
-        if (!bucketDir.exists()) {
-            throw new NiFiRegistryException("No bucket exists with ID " + bucketId);
-        }
-
-        // Verify that the flow exists
-        final File flowDir = new File(bucketDir, flowId);
-        if (!flowDir.exists()) {
-            throw new NiFiRegistryException("No Flow with ID " + flowId + " exists for Bucket with ID " + bucketId);
-        }
-
-        final File[] versionDirs = flowDir.listFiles();
-        if (versionDirs == null) {
-            throw new IOException("Unable to perform listing of directory " + flowDir);
-        }
-
-        int maxVersion = 0;
-        for (final File versionDir : versionDirs) {
-            final String versionName = versionDir.getName();
-
-            final int version;
-            try {
-                version = Integer.parseInt(versionName);
-            } catch (final NumberFormatException nfe) {
-                continue;
-            }
-
-            if (version > maxVersion) {
-                maxVersion = version;
-            }
-        }
-
-        return maxVersion;
-    }
-
-    @Override
-    public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, int version) throws IOException, NiFiRegistryException {
-        // Verify that the bucket exists
-        final File bucketDir = new File(directory, bucketId);
-        if (!bucketDir.exists()) {
-            throw new NiFiRegistryException("No bucket exists with ID " + bucketId);
-        }
-
-        // Verify that the flow exists
-        final File flowDir = new File(bucketDir, flowId);
-        if (!flowDir.exists()) {
-            throw new NiFiRegistryException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
-        }
-
-        final File versionDir = new File(flowDir, String.valueOf(version));
-        if (!versionDir.exists()) {
-            throw new NiFiRegistryException("Flow with ID " + flowId + " in Bucket with ID " + bucketId + " does not contain a snapshot with version " + version);
-        }
-
-        final File contentsFile = new File(versionDir, "flow.xml");
-
-        final VersionedProcessGroup processGroup;
-        try (final JsonParser parser = jsonFactory.createParser(contentsFile)) {
-            final ObjectMapper mapper = new ObjectMapper();
-            mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-
-            parser.setCodec(mapper);
-            processGroup = parser.readValueAs(VersionedProcessGroup.class);
-        }
-
-        final Properties properties = new Properties();
-        final File snapshotPropsFile = new File(versionDir, "snapshot.properties");
-        try (final InputStream in = new FileInputStream(snapshotPropsFile)) {
-            properties.load(in);
-        }
-
-        final String comments = properties.getProperty("comments");
-        final String flowName = properties.getProperty("name");
-
-        final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
-        snapshotMetadata.setBucketIdentifier(bucketId);
-        snapshotMetadata.setComments(comments);
-        snapshotMetadata.setFlowIdentifier(flowId);
-        snapshotMetadata.setFlowName(flowName);
-        snapshotMetadata.setTimestamp(System.currentTimeMillis());
-        snapshotMetadata.setVersion(version);
-
-        final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
-        snapshot.setFlowContents(processGroup);
-        snapshot.setSnapshotMetadata(snapshotMetadata);
-
-        return snapshot;
-    }
-
-    @Override
-    public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
-        // Verify that the bucket exists
-        final File bucketDir = new File(directory, bucketId);
-        if (!bucketDir.exists()) {
-            throw new NiFiRegistryException("No bucket exists with ID " + bucketId);
-        }
-
-        // Verify that the flow exists
-        final File flowDir = new File(bucketDir, flowId);
-        if (!flowDir.exists()) {
-            throw new NiFiRegistryException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
-        }
-
-        final File flowPropsFile = new File(flowDir, "flow.properties");
-        final Properties flowProperties = new Properties();
-        try (final InputStream in = new FileInputStream(flowPropsFile)) {
-            flowProperties.load(in);
-        }
-
-        final VersionedFlow flow = new VersionedFlow();
-        flow.setBucketIdentifier(bucketId);
-        flow.setCreatedTimestamp(Long.parseLong(flowProperties.getProperty("created")));
-        flow.setDescription(flowProperties.getProperty("description"));
-        flow.setIdentifier(flowId);
-        flow.setModifiedTimestamp(flowDir.lastModified());
-        flow.setName(flowProperties.getProperty("name"));
-
-        final Comparator<VersionedFlowSnapshotMetadata> versionComparator = (a, b) -> Integer.compare(a.getVersion(), b.getVersion());
-
-        final SortedSet<VersionedFlowSnapshotMetadata> snapshotMetadataSet = new TreeSet<>(versionComparator);
-        flow.setSnapshotMetadata(snapshotMetadataSet);
-
-        final File[] versionDirs = flowDir.listFiles();
-        flow.setVersionCount(versionDirs.length);
-
-        for (final File file : versionDirs) {
-            if (!file.isDirectory()) {
-                continue;
-            }
-
-            int version;
-            try {
-                version = Integer.parseInt(file.getName());
-            } catch (final NumberFormatException nfe) {
-                // not a version. skip.
-                continue;
-            }
-
-            final File snapshotPropsFile = new File(file, "snapshot.properties");
-            final Properties snapshotProperties = new Properties();
-            try (final InputStream in = new FileInputStream(snapshotPropsFile)) {
-                snapshotProperties.load(in);
-            }
-
-            final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
-            metadata.setBucketIdentifier(bucketId);
-            metadata.setComments(snapshotProperties.getProperty("comments"));
-            metadata.setFlowIdentifier(flowId);
-            metadata.setFlowName(snapshotProperties.getProperty("name"));
-            metadata.setTimestamp(file.lastModified());
-            metadata.setVersion(version);
-
-            snapshotMetadataSet.add(metadata);
-        }
-
-        return flow;
-    }
-
-    @Override
-    public String getIdentifier() {
-        return id;
-    }
-
-    @Override
-    public String getDescription() {
-        return description;
-    }
-
-    @Override
-    public void setDescription(String description) {
-        this.description = description;
-    }
-
-    @Override
-    public void setURL(String url) {
-        this.url = url;
-    }
-
-    @Override
-    public void setName(String name) {
-        this.name = name;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f6cc5b6c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
index 26be69b..8bf89c6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
@@ -111,55 +111,59 @@ public class RestBasedFlowRegistry implements FlowRegistry {
         this.name = name;
     }
 
-    @Override
-    public Set<Bucket> getBuckets(final NiFiUser user) throws IOException, NiFiRegistryException {
-        final BucketClient bucketClient = getRegistryClient().getBucketClient(user.isAnonymous() ? null : user.getIdentity());
-        return new HashSet<>(bucketClient.getAll());
+    private String getIdentity(final NiFiUser user) {
+        return (user == null || user.isAnonymous()) ? null : user.getIdentity();
     }
 
     @Override
-    public Bucket getBucket(final String bucketId) throws IOException, NiFiRegistryException {
-        final BucketClient bucketClient = getRegistryClient().getBucketClient();
-        return bucketClient.get(bucketId);
+    public Set<Bucket> getBuckets(final NiFiUser user) throws IOException, NiFiRegistryException {
+        final BucketClient bucketClient = getRegistryClient().getBucketClient(getIdentity(user));
+        return new HashSet<>(bucketClient.getAll());
     }
 
     @Override
     public Bucket getBucket(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
-        final BucketClient bucketClient = getRegistryClient().getBucketClient(user.isAnonymous() ? null : user.getIdentity());
+        final BucketClient bucketClient = getRegistryClient().getBucketClient(getIdentity(user));
         return bucketClient.get(bucketId);
     }
 
 
     @Override
     public Set<VersionedFlow> getFlows(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException {
-        final FlowClient flowClient = getRegistryClient().getFlowClient(user.isAnonymous() ? null : user.getIdentity());
+        final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
         return new HashSet<>(flowClient.getByBucket(bucketId));
     }
 
     @Override
     public Set<VersionedFlowSnapshotMetadata> getFlowVersions(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
-        final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(user.isAnonymous() ? null : user.getIdentity());
+        final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user));
         return new HashSet<>(snapshotClient.getSnapshotMetadata(bucketId, flowId));
     }
 
     @Override
-    public VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, NiFiRegistryException {
-        final FlowClient flowClient = getRegistryClient().getFlowClient();
+    public VersionedFlow registerVersionedFlow(final VersionedFlow flow, final NiFiUser user) throws IOException, NiFiRegistryException {
+        final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
         return flowClient.create(flow);
     }
 
     @Override
-    public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments, final int expectedVersion)
-            throws IOException, NiFiRegistryException {
+    public VersionedFlow deleteVersionedFlow(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
+        final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
+        return flowClient.delete(bucketId, flowId);
+    }
+
+    @Override
+    public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot,
+        final String comments, final int expectedVersion, final NiFiUser user) throws IOException, NiFiRegistryException {
 
-        final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient();
+        final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user));
         final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot();
         versionedFlowSnapshot.setFlowContents(snapshot);
 
         final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
         metadata.setBucketIdentifier(flow.getBucketIdentifier());
         metadata.setFlowIdentifier(flow.getIdentifier());
-        metadata.setFlowName(flow.getName());
+        metadata.setAuthor(getIdentity(user));
         metadata.setTimestamp(System.currentTimeMillis());
         metadata.setVersion(expectedVersion);
         metadata.setComments(comments);
@@ -169,24 +173,29 @@ public class RestBasedFlowRegistry implements FlowRegistry {
     }
 
     @Override
-    public int getLatestVersion(final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
-        return (int) getRegistryClient().getFlowClient().get(bucketId, flowId).getVersionCount();
+    public int getLatestVersion(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
+        return (int) getRegistryClient().getFlowClient(getIdentity(user)).get(bucketId, flowId).getVersionCount();
     }
 
     @Override
-    public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version) throws IOException, NiFiRegistryException {
-        final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient();
+    public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final NiFiUser user) throws IOException, NiFiRegistryException {
+        final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user));
         final VersionedFlowSnapshot flowSnapshot = snapshotClient.get(bucketId, flowId, version);
 
         final VersionedProcessGroup contents = flowSnapshot.getFlowContents();
         for (final VersionedProcessGroup child : contents.getProcessGroups()) {
-            populateVersionedContentsRecursively(child);
+            populateVersionedContentsRecursively(child, user);
         }
 
         return flowSnapshot;
     }
 
-    private void populateVersionedContentsRecursively(final VersionedProcessGroup group) throws NiFiRegistryException, IOException {
+    @Override
+    public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version) throws IOException, NiFiRegistryException {
+        return getFlowContents(bucketId, flowId, version, null);
+    }
+
+    private void populateVersionedContentsRecursively(final VersionedProcessGroup group, final NiFiUser user) throws NiFiRegistryException, IOException {
         if (group == null) {
             return;
         }
@@ -205,7 +214,7 @@ public class RestBasedFlowRegistry implements FlowRegistry {
             }
 
             final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
-            final VersionedFlowSnapshot snapshot = flowRegistry.getFlowContents(bucketId, flowId, version);
+            final VersionedFlowSnapshot snapshot = flowRegistry.getFlowContents(bucketId, flowId, version, user);
             final VersionedProcessGroup contents = snapshot.getFlowContents();
 
             group.setComments(contents.getComments());
@@ -222,14 +231,19 @@ public class RestBasedFlowRegistry implements FlowRegistry {
         }
 
         for (final VersionedProcessGroup child : group.getProcessGroups()) {
-            populateVersionedContentsRecursively(child);
+            populateVersionedContentsRecursively(child, user);
         }
     }
 
     @Override
+    public VersionedFlow getVersionedFlow(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException {
+        final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user));
+        return flowClient.get(bucketId, flowId);
+    }
+
+    @Override
     public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
         final FlowClient flowClient = getRegistryClient().getFlowClient();
         return flowClient.get(bucketId, flowId);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f6cc5b6c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java
index d5d0d86..8a2447d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java
@@ -17,7 +17,6 @@
 
 package org.apache.nifi.registry.flow;
 
-import java.io.IOException;
 import java.net.URI;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -56,14 +55,15 @@ public class StandardFlowRegistryClient implements FlowRegistryClient {
         final String uriScheme = uri.getScheme();
 
         final FlowRegistry registry;
-        if (uriScheme.equalsIgnoreCase("file")) {
-            try {
-                registry = new FileBasedFlowRegistry(registryId, registryUrl);
-            } catch (IOException e) {
-                throw new RuntimeException("Failed to create Flow Registry for URI " + registryUrl, e);
+        if (uriScheme.equalsIgnoreCase("http") || uriScheme.equalsIgnoreCase("https")) {
+            final SSLContext sslContext = SslContextFactory.createSslContext(nifiProperties, false);
+            if (sslContext == null && uriScheme.equalsIgnoreCase("https")) {
+                throw new RuntimeException("Failed to create Flow Registry for URI " + registryUrl
+                    + " because this NiFi is not configured with a Keystore/Truststore, so it is not capable of communicating with a secure Registry. "
+                    + "Please populate NiFi's Keystore/Truststore properties or connect to a NiFi Registry over http instead of https.");
             }
 
-            registry.setName(registryName);
+            registry = new RestBasedFlowRegistry(this, registryId, registryUrl, sslContext, registryName);
             registry.setDescription(description);
         } else if (uriScheme.equalsIgnoreCase("http") || uriScheme.equalsIgnoreCase("https")) {
             final SSLContext sslContext = SslContextFactory.createSslContext(nifiProperties, false);

http://git-wip-us.apache.org/repos/asf/nifi/blob/f6cc5b6c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
index aaba126..92a4166 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
@@ -18,7 +18,6 @@
 package org.apache.nifi.registry.flow;
 
 import java.util.Objects;
-import java.util.Optional;
 
 import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
 
@@ -33,8 +32,8 @@ public class StandardVersionControlInformation implements VersionControlInformat
     private volatile String flowDescription;
     private final int version;
     private volatile VersionedProcessGroup flowSnapshot;
-    private volatile Boolean modified = null;
-    private volatile Boolean current = null;
+    private volatile boolean modified;
+    private volatile boolean current;
 
     public static class Builder {
         private String registryIdentifier;
@@ -89,12 +88,12 @@ public class StandardVersionControlInformation implements VersionControlInformat
             return this;
         }
 
-        public Builder modified(Boolean modified) {
+        public Builder modified(boolean modified) {
             this.modified = modified;
             return this;
         }
 
-        public Builder current(Boolean current) {
+        public Builder current(boolean current) {
             this.current = current;
             return this;
         }
@@ -113,8 +112,8 @@ public class StandardVersionControlInformation implements VersionControlInformat
                 .flowId(dto.getFlowId())
                 .flowName(dto.getFlowName())
                 .flowDescription(dto.getFlowDescription())
-                .current(dto.getCurrent())
-                .modified(dto.getModified())
+                .current(dto.getCurrent() == null ? true : dto.getCurrent())
+                .modified(dto.getModified() == null ? false : dto.getModified())
                 .version(dto.getVersion());
 
             return builder;
@@ -139,7 +138,7 @@ public class StandardVersionControlInformation implements VersionControlInformat
 
 
     public StandardVersionControlInformation(final String registryId, final String registryName, final String bucketId, final String flowId, final int version,
-        final VersionedProcessGroup snapshot, final Boolean modified, final Boolean current) {
+        final VersionedProcessGroup snapshot, final boolean modified, final boolean current) {
         this.registryIdentifier = registryId;
         this.registryName = registryName;
         this.bucketIdentifier = bucketId;
@@ -208,13 +207,13 @@ public class StandardVersionControlInformation implements VersionControlInformat
     }
 
     @Override
-    public Optional<Boolean> getModified() {
-        return Optional.ofNullable(modified);
+    public boolean isModified() {
+        return modified;
     }
 
     @Override
-    public Optional<Boolean> getCurrent() {
-        return Optional.ofNullable(current);
+    public boolean isCurrent() {
+        return current;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/f6cc5b6c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index be907ba..76cd2c4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -1318,6 +1318,16 @@ public interface NiFiServiceFacade {
     VersionControlComponentMappingEntity registerFlowWithFlowRegistry(String groupId, StartVersionControlRequestEntity requestEntity);
 
     /**
+     * Deletes the specified Versioned Flow from the specified Flow Registry
+     *
+     * @param registryId the ID of the Flow Registry
+     * @param bucketId the ID of the bucket
+     * @param flowId the ID of the flow
+     * @return the VersionedFlow that was deleted
+     */
+    VersionedFlow deleteVersionedFlow(String registryId, String bucketId, String flowId) throws IOException, NiFiRegistryException;
+
+    /**
      * Adds the given snapshot to the already existing Versioned Flow, which resides in the given Flow Registry with the given id
      *
      * @param registryId the ID of the Flow Registry to persist the snapshot to

http://git-wip-us.apache.org/repos/asf/nifi/blob/f6cc5b6c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 89e00ba..4d1bbbc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -16,7 +16,9 @@
  */
 package org.apache.nifi.web;
 
-import com.google.common.collect.Sets;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.nifi.action.Action;
 import org.apache.nifi.action.Component;
@@ -88,9 +90,9 @@ import org.apache.nifi.history.HistoryQuery;
 import org.apache.nifi.history.PreviousValue;
 import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.NiFiRegistryException;
 import org.apache.nifi.registry.flow.FlowRegistry;
 import org.apache.nifi.registry.flow.FlowRegistryClient;
-import org.apache.nifi.registry.client.NiFiRegistryException;
 import org.apache.nifi.registry.flow.VersionControlInformation;
 import org.apache.nifi.registry.flow.VersionedComponent;
 import org.apache.nifi.registry.flow.VersionedConnection;
@@ -228,6 +230,7 @@ import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity;
 import org.apache.nifi.web.api.entity.ReportingTaskEntity;
 import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
 import org.apache.nifi.web.api.entity.SnippetEntity;
+import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity;
 import org.apache.nifi.web.api.entity.StatusHistoryEntity;
 import org.apache.nifi.web.api.entity.TemplateEntity;
 import org.apache.nifi.web.api.entity.TenantEntity;
@@ -237,7 +240,6 @@ import org.apache.nifi.web.api.entity.VariableEntity;
 import org.apache.nifi.web.api.entity.VariableRegistryEntity;
 import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
 import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
-import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
 import org.apache.nifi.web.controller.ControllerFacade;
@@ -268,8 +270,8 @@ import org.apache.nifi.web.util.SnippetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
+import com.google.common.collect.Sets;
+
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -3667,8 +3669,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
             action = "add the local flow to the Flow Registry as the first Snapshot";
 
             // add first snapshot to the flow in the registry
-            final String comments = versionedFlow.getDescription() == null ? "Initial version of flow" : versionedFlow.getDescription();
-            registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, comments, expectedVersion);
+            registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, versionedFlowDto.getComments(), expectedVersion);
         } catch (final NiFiRegistryException e) {
             throw new IllegalArgumentException(e);
         } catch (final IOException ioe) {
@@ -3676,14 +3677,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
             throw new RuntimeException("Failed to communicate with Flow Registry when attempting to " + action);
         }
 
+        final Bucket bucket = registeredSnapshot.getBucket();
+        final VersionedFlow flow = registeredSnapshot.getFlow();
+
         // Update the Process Group with the new VersionControlInformation. (Send this to all nodes).
         final VersionControlInformationDTO vci = new VersionControlInformationDTO();
-        vci.setBucketId(registeredFlow.getBucketIdentifier());
-        vci.setBucketName(registeredFlow.getBucketName());
+        vci.setBucketId(bucket.getIdentifier());
+        vci.setBucketName(bucket.getName());
         vci.setCurrent(true);
-        vci.setFlowId(registeredFlow.getIdentifier());
-        vci.setFlowName(registeredFlow.getName());
-        vci.setFlowDescription(registeredFlow.getDescription());
+        vci.setFlowId(flow.getIdentifier());
+        vci.setFlowName(flow.getName());
+        vci.setFlowDescription(flow.getDescription());
         vci.setGroupId(groupId);
         vci.setModified(false);
         vci.setRegistryId(registryId);
@@ -3703,6 +3707,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
+    public VersionedFlow deleteVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
+        final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
+        if (registry == null) {
+            throw new IllegalArgumentException("No Flow Registry exists with ID " + registryId);
+        }
+
+        return registry.deleteVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser());
+    }
+
+    @Override
     public VersionControlInformationEntity getVersionControlInformation(final String groupId) {
         final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
         final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
@@ -3737,8 +3751,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         }
 
         final VersionedFlowSnapshot versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(),
-            versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion());
-
+            versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), NiFiUserUtils.getNiFiUser());
 
         final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
         final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, true);
@@ -3784,7 +3797,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
             throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
         }
 
-        return registry.registerVersionedFlow(flow);
+        return registry.registerVersionedFlow(flow, NiFiUserUtils.getNiFiUser());
     }
 
     private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
@@ -3793,7 +3806,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
             throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
         }
 
-        return registry.getVersionedFlow(bucketId, flowId);
+        return registry.getVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser());
     }
 
     @Override
@@ -3804,7 +3817,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
             throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
         }
 
-        return registry.registerVersionedFlowSnapshot(flow, snapshot, comments, expectedVersion);
+        return registry.registerVersionedFlowSnapshot(flow, snapshot, comments, expectedVersion, NiFiUserUtils.getNiFiUser());
     }
 
     @Override
@@ -4023,7 +4036,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
 
         final VersionedFlowSnapshot snapshot;
         try {
-            snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion());
+            snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), NiFiUserUtils.getNiFiUser());
         } catch (final NiFiRegistryException e) {
             throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket "
                 + versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion());
@@ -4064,7 +4077,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
 
             final VersionedFlowSnapshot childSnapshot;
             try {
-                childSnapshot = flowRegistry.getFlowContents(remoteCoordinates.getBucketId(), remoteCoordinates.getFlowId(), remoteCoordinates.getVersion());
+                childSnapshot = flowRegistry.getFlowContents(remoteCoordinates.getBucketId(), remoteCoordinates.getFlowId(), remoteCoordinates.getVersion(), NiFiUserUtils.getNiFiUser());
             } catch (final NiFiRegistryException e) {
                 throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket "
                     + remoteCoordinates.getBucketId() + ", Flow " + remoteCoordinates.getFlowId() + ", Version " + remoteCoordinates.getVersion());

http://git-wip-us.apache.org/repos/asf/nifi/blob/f6cc5b6c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index d24dcbb..fe57dd6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -87,10 +87,11 @@ import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.serialization.FlowEncodingVersion;
 import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.registry.bucket.Bucket;
 import org.apache.nifi.registry.client.NiFiRegistryException;
 import org.apache.nifi.registry.flow.FlowRegistryUtils;
+import org.apache.nifi.registry.flow.VersionedFlow;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
-import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
 import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
 import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
 import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
@@ -1643,11 +1644,12 @@ public class ProcessGroupResource extends ApplicationResource {
             // Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail.
             // Step 2: Retrieve flow from Flow Registry
             final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo);
+            final Bucket bucket = flowSnapshot.getBucket();
+            final VersionedFlow flow = flowSnapshot.getFlow();
 
-            final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata();
-            versionControlInfo.setBucketName(metadata.getBucketName());
-            versionControlInfo.setFlowName(metadata.getFlowName());
-            versionControlInfo.setFlowDescription(metadata.getFlowDescription());
+            versionControlInfo.setBucketName(bucket.getName());
+            versionControlInfo.setFlowName(flow.getName());
+            versionControlInfo.setFlowDescription(flow.getDescription());
 
             versionControlInfo.setRegistryName(serviceFacade.getFlowRegistryName(versionControlInfo.getRegistryId()));