You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:13:53 UTC

[05/50] nifi git commit: NIFI-4436: Added additional endpoints; bug fixes

NIFI-4436: Added additional endpoints; bug fixes

Signed-off-by: Matt Gilman <ma...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6aa8b5c6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6aa8b5c6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6aa8b5c6

Branch: refs/heads/master
Commit: 6aa8b5c61c734ce56aad7816b40df88d8316feeb
Parents: 7a0a900
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Oct 30 16:35:59 2017 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Jan 8 12:44:52 2018 -0500

----------------------------------------------------------------------
 .../nifi/web/api/entity/ProcessGroupEntity.java |  17 +-
 .../org/apache/nifi/groups/ProcessGroup.java    |   3 +-
 .../apache/nifi/registry/flow/FlowRegistry.java |  30 ++
 .../nifi/registry/flow/FlowRegistryClient.java  |   6 +
 .../apache/nifi/controller/FlowController.java  | 103 ++++
 .../controller/StandardFlowSynchronizer.java    |  36 +-
 .../serialization/StandardFlowSerializer.java   |  29 +-
 .../nifi/fingerprint/FingerprintFactory.java    |  21 +
 .../nifi/groups/StandardProcessGroup.java       | 134 ++++--
 .../registry/flow/FileBasedFlowRegistry.java    | 478 +++++++++++++++++++
 .../flow/FileBasedFlowRegistryClient.java       | 435 -----------------
 .../flow/StandardFlowRegistryClient.java        |  75 +++
 .../flow/mapping/NiFiRegistryFlowMapper.java    |  95 +++-
 .../java/org/apache/nifi/util/BundleUtils.java  |  48 +-
 .../src/main/resources/FlowConfiguration.xsd    |  17 +
 .../src/main/resources/nifi-context.xml         |   4 +-
 .../service/mock/MockProcessGroup.java          |   2 +-
 .../fingerprint/FingerprintFactoryTest.java     |   8 +
 .../nifi/registry/flow/FlowRegistryUtils.java   |  74 +++
 .../org/apache/nifi/web/NiFiServiceFacade.java  |  12 +-
 .../nifi/web/StandardNiFiServiceFacade.java     | 266 ++++++-----
 .../nifi/web/api/ProcessGroupResource.java      | 173 ++++---
 .../apache/nifi/web/api/VersionsResource.java   | 151 +++---
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  10 +
 .../nifi/web/controller/ControllerFacade.java   |   4 +
 .../apache/nifi/web/dao/ProcessGroupDAO.java    |   3 +-
 .../org/apache/nifi/web/dao/RegistryDAO.java    |  35 ++
 .../nifi/web/dao/impl/FlowRegistryDAO.java      |  66 +++
 .../web/dao/impl/StandardProcessGroupDAO.java   |   5 +-
 .../src/main/resources/nifi-web-api-context.xml |   4 +
 30 files changed, 1578 insertions(+), 766 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java
index 83af9d8..1e2a4b4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java
@@ -16,11 +16,13 @@
  */
 package org.apache.nifi.web.api.entity;
 
-import io.swagger.annotations.ApiModelProperty;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
 
-import javax.xml.bind.annotation.XmlRootElement;
+import io.swagger.annotations.ApiModelProperty;
 
 /**
  * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ProcessGroupDTO.
@@ -30,6 +32,7 @@ public class ProcessGroupEntity extends ComponentEntity implements Permissible<P
 
     private ProcessGroupDTO component;
     private ProcessGroupStatusDTO status;
+    private VersionedFlowSnapshot versionedFlowSnapshot;
 
     private Integer runningCount;
     private Integer stoppedCount;
@@ -46,10 +49,12 @@ public class ProcessGroupEntity extends ComponentEntity implements Permissible<P
      *
      * @return The ProcessGroupDTO object
      */
+    @Override
     public ProcessGroupDTO getComponent() {
         return component;
     }
 
+    @Override
     public void setComponent(ProcessGroupDTO component) {
         this.component = component;
     }
@@ -180,4 +185,12 @@ public class ProcessGroupEntity extends ComponentEntity implements Permissible<P
         this.inactiveRemotePortCount = inactiveRemotePortCount;
     }
 
+    @ApiModelProperty(value = "Returns the Versioned Flow that describes the contents of the Versioned Flow to be imported", readOnly = true)
+    public VersionedFlowSnapshot getVersionedFlowSnapshot() {
+        return versionedFlowSnapshot;
+    }
+
+    public void setVersionedFlowSnapshot(VersionedFlowSnapshot versionedFlowSnapshot) {
+        this.versionedFlowSnapshot = versionedFlowSnapshot;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index d335461..16b4b5e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -783,8 +783,9 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
      * @param verifyNotDirty whether or not to verify that the Process Group is not 'dirty'. If this value is <code>true</code>,
      *            and the Process Group has been modified since it was last synchronized with the Flow Registry, then this method will
      *            throw an IllegalStateException
+     * @param updateSettings whether or not to update the process group's name and positions
      */
-    void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty);
+    void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings);
 
     /**
      * Verifies a template with the specified name can be created.

http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
index 962a940..4efff94 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
@@ -24,6 +24,22 @@ import java.io.IOException;
 import java.util.Set;
 
 public interface FlowRegistry {
+    /**
+     * @return the ID of the Flow Registry
+     */
+    String getIdentifier();
+
+    /**
+     * @return the description of the Flow Registry
+     */
+    String getDescription();
+
+    /**
+     * Updates the Flow Registry's description
+     *
+     * @param description the description of the Flow Registry
+     */
+    void setDescription(String description);
 
     /**
      * @return the URL of the Flow Registry
@@ -31,11 +47,25 @@ public interface FlowRegistry {
     String getURL();
 
     /**
+     * Updates the Flow Registry's URL
+     *
+     * @param url the URL of the Flow Registry
+     */
+    void setURL(String url);
+
+    /**
      * @return the name of the Flow Registry
      */
     String getName();
 
     /**
+     * Updates the name of the Flow Registry
+     *
+     * @param name the name of the Flow Registry
+     */
+    void setName(String name);
+
+    /**
      * Gets the buckets for the specified user.
      *
      * @param user current user

http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
index 83f66dc..77c2761 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
@@ -34,4 +34,10 @@ public interface FlowRegistryClient {
     }
 
     Set<String> getRegistryIdentifiers();
+
+    void addFlowRegistry(FlowRegistry registry);
+
+    FlowRegistry addFlowRegistry(String registryId, String registryName, String registryUrl, String description);
+
+    FlowRegistry removeFlowRegistry(String registryId);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 242ef6a..5ed5b6e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -165,6 +165,8 @@ import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.VersionedConnection;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
 import org.apache.nifi.registry.variable.MutableVariableRegistry;
 import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
 import org.apache.nifi.remote.HttpRemoteSiteListener;
@@ -2128,6 +2130,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
     }
 
+    private void verifyBundleInVersionedFlow(final org.apache.nifi.registry.flow.Bundle requiredBundle, final Set<BundleCoordinate> supportedBundles) {
+        final BundleCoordinate requiredCoordinate = new BundleCoordinate(requiredBundle.getGroup(), requiredBundle.getArtifact(), requiredBundle.getVersion());
+        if (!supportedBundles.contains(requiredCoordinate)) {
+            throw new IllegalStateException("Unsupported bundle: " + requiredCoordinate);
+        }
+    }
+
     private void verifyProcessorsInSnippet(final FlowSnippetDTO templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) {
         if (templateContents.getProcessors() != null) {
             templateContents.getProcessors().forEach(processor -> {
@@ -2150,6 +2159,28 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
     }
 
+    private void verifyProcessorsInVersionedFlow(final VersionedProcessGroup versionedFlow, final Map<String, Set<BundleCoordinate>> supportedTypes) {
+        if (versionedFlow.getProcessors() != null) {
+            versionedFlow.getProcessors().forEach(processor -> {
+                if (processor.getBundle() == null) {
+                    throw new IllegalArgumentException("Processor bundle must be specified.");
+                }
+
+                if (supportedTypes.containsKey(processor.getType())) {
+                    verifyBundleInVersionedFlow(processor.getBundle(), supportedTypes.get(processor.getType()));
+                } else {
+                    throw new IllegalStateException("Invalid Processor Type: " + processor.getType());
+                }
+            });
+        }
+
+        if (versionedFlow.getProcessGroups() != null) {
+            versionedFlow.getProcessGroups().forEach(processGroup -> {
+                verifyProcessorsInVersionedFlow(processGroup, supportedTypes);
+            });
+        }
+    }
+
     private void verifyControllerServicesInSnippet(final FlowSnippetDTO templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) {
         if (templateContents.getControllerServices() != null) {
             templateContents.getControllerServices().forEach(controllerService -> {
@@ -2172,6 +2203,28 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
     }
 
+    private void verifyControllerServicesInVersionedFlow(final VersionedProcessGroup versionedFlow, final Map<String, Set<BundleCoordinate>> supportedTypes) {
+        if (versionedFlow.getControllerServices() != null) {
+            versionedFlow.getControllerServices().forEach(controllerService -> {
+                if (supportedTypes.containsKey(controllerService.getType())) {
+                    if (controllerService.getBundle() == null) {
+                        throw new IllegalArgumentException("Controller Service bundle must be specified.");
+                    }
+
+                    verifyBundleInVersionedFlow(controllerService.getBundle(), supportedTypes.get(controllerService.getType()));
+                } else {
+                    throw new IllegalStateException("Invalid Controller Service Type: " + controllerService.getType());
+                }
+            });
+        }
+
+        if (versionedFlow.getProcessGroups() != null) {
+            versionedFlow.getProcessGroups().forEach(processGroup -> {
+                verifyControllerServicesInVersionedFlow(processGroup, supportedTypes);
+            });
+        }
+    }
+
     public void verifyComponentTypesInSnippet(final FlowSnippetDTO templateContents) {
         final Map<String, Set<BundleCoordinate>> processorClasses = new HashMap<>();
         for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) {
@@ -2210,6 +2263,44 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
     }
 
+    public void verifyComponentTypesInSnippet(final VersionedProcessGroup versionedFlow) {
+        final Map<String, Set<BundleCoordinate>> processorClasses = new HashMap<>();
+        for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) {
+            final String name = c.getName();
+            processorClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
+        }
+        verifyProcessorsInVersionedFlow(versionedFlow, processorClasses);
+
+        final Map<String, Set<BundleCoordinate>> controllerServiceClasses = new HashMap<>();
+        for (final Class<?> c : ExtensionManager.getExtensions(ControllerService.class)) {
+            final String name = c.getName();
+            controllerServiceClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
+        }
+        verifyControllerServicesInVersionedFlow(versionedFlow, controllerServiceClasses);
+
+        final Set<String> prioritizerClasses = new HashSet<>();
+        for (final Class<?> c : ExtensionManager.getExtensions(FlowFilePrioritizer.class)) {
+            prioritizerClasses.add(c.getName());
+        }
+
+        final Set<VersionedConnection> allConns = new HashSet<>();
+        allConns.addAll(versionedFlow.getConnections());
+        for (final VersionedProcessGroup childGroup : versionedFlow.getProcessGroups()) {
+            allConns.addAll(findAllConnections(childGroup));
+        }
+
+        for (final VersionedConnection conn : allConns) {
+            final List<String> prioritizers = conn.getPrioritizers();
+            if (prioritizers != null) {
+                for (final String prioritizer : prioritizers) {
+                    if (!prioritizerClasses.contains(prioritizer)) {
+                        throw new IllegalStateException("Invalid FlowFile Prioritizer Type: " + prioritizer);
+                    }
+                }
+            }
+        }
+    }
+
     /**
      * <p>
      * Verifies that the given DTO is valid, according to the following:
@@ -2270,6 +2361,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         return conns;
     }
 
+    private Set<VersionedConnection> findAllConnections(final VersionedProcessGroup group) {
+        final Set<VersionedConnection> conns = new HashSet<>();
+        for (final VersionedConnection connection : group.getConnections()) {
+            conns.add(connection);
+        }
+
+        for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
+            conns.addAll(findAllConnections(childGroup));
+        }
+        return conns;
+    }
+
     //
     // Processor access
     //

http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index e879e38..5a7aeec 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -85,6 +85,7 @@ import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.SimpleProcessLogger;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
 import org.apache.nifi.registry.flow.StandardVersionControlInformation;
 import org.apache.nifi.registry.flow.VersionControlInformation;
 import org.apache.nifi.remote.RemoteGroupPort;
@@ -184,7 +185,10 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
         try {
             if (flowAlreadySynchronized) {
                 existingFlow = toBytes(controller);
-                existingFlowEmpty = controller.getGroup(controller.getRootGroupId()).isEmpty() && controller.getAllReportingTasks().isEmpty() && controller.getAllControllerServices().isEmpty();
+                existingFlowEmpty = controller.getGroup(controller.getRootGroupId()).isEmpty()
+                    && controller.getAllReportingTasks().isEmpty()
+                    && controller.getAllControllerServices().isEmpty()
+                    && controller.getFlowRegistryClient().getRegistryIdentifiers().isEmpty();
             } else {
                 existingFlow = readFlowFromDisk();
                 if (existingFlow == null || existingFlow.length == 0) {
@@ -220,10 +224,22 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
                         unrootedControllerServiceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
                     }
 
+                    final boolean registriesPresent;
+                    final Element registriesElement = DomUtils.getChild(rootElement, "registries");
+                    if (registriesElement == null) {
+                        registriesPresent = false;
+                    } else {
+                        final List<Element> flowRegistryElems = DomUtils.getChildElementsByTagName(registriesElement, "flowRegistry");
+                        registriesPresent = !flowRegistryElems.isEmpty();
+                    }
+
                     logger.trace("Parsing process group from DOM");
                     final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
                     final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor, encodingVersion);
-                    existingFlowEmpty = taskElements.isEmpty() && unrootedControllerServiceElements.isEmpty() && isEmpty(rootGroupDto);
+                    existingFlowEmpty = taskElements.isEmpty()
+                        && unrootedControllerServiceElements.isEmpty()
+                        && isEmpty(rootGroupDto)
+                        && registriesPresent;
                     logger.debug("Existing Flow Empty = {}", existingFlowEmpty);
                 }
             }
@@ -318,6 +334,22 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
                     // get the root group XML element
                     final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
 
+                    if (!flowAlreadySynchronized || existingFlowEmpty) {
+                        final Element registriesElement = DomUtils.getChild(rootElement, "registries");
+                        if (registriesElement != null) {
+                            final List<Element> flowRegistryElems = DomUtils.getChildElementsByTagName(registriesElement, "flowRegistry");
+                            for (final Element flowRegistryElement : flowRegistryElems) {
+                                final String registryId = getString(flowRegistryElement, "id");
+                                final String registryName = getString(flowRegistryElement, "name");
+                                final String registryUrl = getString(flowRegistryElement, "url");
+                                final String description = getString(flowRegistryElement, "description");
+
+                                final FlowRegistryClient client = controller.getFlowRegistryClient();
+                                client.addFlowRegistry(registryId, registryName, registryUrl, description);
+                            }
+                        }
+                    }
+
                     // if this controller isn't initialized or its empty, add the root group, otherwise update
                     final ProcessGroup rootGroup;
                     if (!flowAlreadySynchronized || existingFlowEmpty) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
index ecf2438..f921bc6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
@@ -39,6 +39,8 @@ import org.apache.nifi.persistence.TemplateSerializer;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.registry.VariableDescriptor;
 import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
 import org.apache.nifi.registry.flow.VersionControlInformation;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.remote.RootGroupPort;
@@ -74,7 +76,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class StandardFlowSerializer implements FlowSerializer {
 
-    private static final String MAX_ENCODING_VERSION = "1.2";
+    private static final String MAX_ENCODING_VERSION = "1.3";
 
     private final StringEncryptor encryptor;
 
@@ -98,6 +100,11 @@ public class StandardFlowSerializer implements FlowSerializer {
             doc.appendChild(rootNode);
             addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount());
             addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount());
+
+            final Element registriesElement = doc.createElement("registries");
+            rootNode.appendChild(registriesElement);
+
+            addFlowRegistries(registriesElement, controller.getFlowRegistryClient());
             addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup", scheduledStateLookup);
 
             // Add root-level controller services
@@ -130,6 +137,26 @@ public class StandardFlowSerializer implements FlowSerializer {
         }
     }
 
+    private void addFlowRegistries(final Element parentElement, final FlowRegistryClient registryClient) {
+        for (final String registryId : registryClient.getRegistryIdentifiers()) {
+            final FlowRegistry flowRegistry = registryClient.getFlowRegistry(registryId);
+
+            final Element registryElement = parentElement.getOwnerDocument().createElement("flowRegistry");
+            parentElement.appendChild(registryElement);
+
+            addStringElement(registryElement, "id", flowRegistry.getIdentifier());
+            addStringElement(registryElement, "name", flowRegistry.getName());
+            addStringElement(registryElement, "url", flowRegistry.getURL());
+            addStringElement(registryElement, "description", flowRegistry.getDescription());
+        }
+    }
+
+    private void addStringElement(final Element parentElement, final String elementName, final String value) {
+        final Element childElement = parentElement.getOwnerDocument().createElement(elementName);
+        childElement.setTextContent(value);
+        parentElement.appendChild(childElement);
+    }
+
     private void addSize(final Element parentElement, final Size size) {
         final Element element = parentElement.getOwnerDocument().createElement("size");
         element.setAttribute("width", String.valueOf(size.getWidth()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
index 3aa5084..e1846a0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.stream.Stream;
 
 import javax.xml.XMLConstants;
 import javax.xml.parsers.DocumentBuilder;
@@ -198,6 +199,21 @@ public class FingerprintFactory {
     }
 
     private StringBuilder addFlowControllerFingerprint(final StringBuilder builder, final Element flowControllerElem, final FlowController controller) {
+        // registries
+        final Element registriesElement = DomUtils.getChild(flowControllerElem, "registries");
+        if (registriesElement == null) {
+            builder.append("NO_VALUE");
+        } else {
+            final List<Element> flowRegistryElems = DomUtils.getChildElementsByTagName(registriesElement, "flowRegistry");
+            if (flowRegistryElems.isEmpty()) {
+                builder.append("NO_VALUE");
+            } else {
+                for (final Element flowRegistryElement : flowRegistryElems) {
+                    addFlowRegistryFingerprint(builder, flowRegistryElement);
+                }
+            }
+        }
+
         // root group
         final Element rootGroupElem = (Element) DomUtils.getChildNodesByTagName(flowControllerElem, "rootGroup").item(0);
         addProcessGroupFingerprint(builder, rootGroupElem, controller);
@@ -265,6 +281,11 @@ public class FingerprintFactory {
         return builder;
     }
 
+    private StringBuilder addFlowRegistryFingerprint(final StringBuilder builder, final Element flowRegistryElement) {
+        Stream.of("id", "name", "url", "description").forEach(elementName -> appendFirstValue(builder, DomUtils.getChildNodesByTagName(flowRegistryElement, elementName)));
+        return builder;
+    }
+
     private StringBuilder addProcessGroupFingerprint(final StringBuilder builder, final Element processGroupElem, final FlowController controller) throws FingerprintException {
         // id
         appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "id"));

http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 3b8117b..1d8652e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -72,7 +72,7 @@ import org.apache.nifi.registry.flow.Bundle;
 import org.apache.nifi.registry.flow.ConnectableComponent;
 import org.apache.nifi.registry.flow.FlowRegistry;
 import org.apache.nifi.registry.flow.FlowRegistryClient;
-import org.apache.nifi.registry.flow.RemoteFlowCoordinates;
+import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
 import org.apache.nifi.registry.flow.StandardVersionControlInformation;
 import org.apache.nifi.registry.flow.UnknownResourceException;
 import org.apache.nifi.registry.flow.VersionControlInformation;
@@ -2835,11 +2835,14 @@ public final class StandardProcessGroup implements ProcessGroup {
         }
     }
 
+    @Override
     public void disconnectVersionControl() {
         writeLock.lock();
         try {
-            // TODO remove version component ids from each component (until another versioned PG is encountered)
             this.versionControlInfo.set(null);
+
+            // remove version component ids from each component (until another versioned PG is encountered)
+            applyVersionedComponentIds(this, id -> null);
         } finally {
             writeLock.unlock();
         }
@@ -2850,36 +2853,41 @@ public final class StandardProcessGroup implements ProcessGroup {
             return;
         }
 
-        processGroup.setVersionedComponentId(versionedComponentIds.get(processGroup.getIdentifier()));
+        applyVersionedComponentIds(processGroup, versionedComponentIds::get);
+    }
+
+    private void applyVersionedComponentIds(final ProcessGroup processGroup, final Function<String, String> lookup) {
+        processGroup.setVersionedComponentId(lookup.apply(processGroup.getIdentifier()));
 
         processGroup.getConnections().stream()
-            .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+            .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
         processGroup.getProcessors().stream()
-            .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+            .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
         processGroup.getInputPorts().stream()
-            .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+            .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
         processGroup.getOutputPorts().stream()
-            .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+            .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
         processGroup.getLabels().stream()
-            .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+            .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
         processGroup.getFunnels().stream()
-            .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+            .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
         processGroup.getControllerServices(false).stream()
-            .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+            .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
 
         processGroup.getRemoteProcessGroups().stream()
             .forEach(rpg -> {
-                rpg.setVersionedComponentId(versionedComponentIds.get(rpg.getIdentifier()));
+                rpg.setVersionedComponentId(lookup.apply(rpg.getIdentifier()));
 
                 rpg.getInputPorts().stream()
-                    .forEach(port -> port.setVersionedComponentId(versionedComponentIds.get(port.getIdentifier())));
+                    .forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
 
                 rpg.getOutputPorts().stream()
-                    .forEach(port -> port.setVersionedComponentId(versionedComponentIds.get(port.getIdentifier())));
+                    .forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
             });
 
         processGroup.getProcessGroups().stream()
-            .forEach(childGroup -> updateVersionedComponentIds(childGroup, versionedComponentIds));
+            .filter(childGroup -> childGroup.getVersionControlInformation() != null)
+            .forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup));
     }
 
 
@@ -2931,10 +2939,10 @@ public final class StandardProcessGroup implements ProcessGroup {
 
 
     @Override
-    public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty) {
+    public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings) {
         writeLock.lock();
         try {
-            verifyCanUpdate(proposedSnapshot, true, verifyNotDirty); // TODO: Should perform more verification... verifyCanDelete, verifyCanUpdate, etc. Recursively if child is under VC also
+            verifyCanUpdate(proposedSnapshot, true, verifyNotDirty);
 
             final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
             final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient());
@@ -2950,15 +2958,15 @@ public final class StandardProcessGroup implements ProcessGroup {
                 .map(diff -> diff.getComponentA() == null ? diff.getComponentB().getIdentifier() : diff.getComponentA().getIdentifier())
                 .collect(Collectors.toSet());
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Updating {} to {}; there are {} differences to take into account: {}", this, proposedSnapshot, flowComparison.getDifferences().size(), flowComparison.getDifferences());
-            } else {
-                // TODO: Remove the actual differences from the info level log. It can be extremely verbose. Is here only for testing purposes becuase it's much more convenient
-                // than having to remember to enable DEBUG level logging every time a full build is done.
-                LOG.info("Updating {} to {}; there are {} differences to take into account: {}", this, proposedSnapshot, flowComparison.getDifferences().size(), flowComparison.getDifferences());
+            if (LOG.isInfoEnabled()) {
+                final String differencesByLine = flowComparison.getDifferences().stream()
+                    .map(FlowDifference::toString)
+                    .collect(Collectors.joining("\n"));
+
+                LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", this, proposedSnapshot, flowComparison.getDifferences().size(), differencesByLine);
             }
 
-            updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false);
+            updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings);
         } catch (final ProcessorInstantiationException pie) {
             throw new RuntimeException(pie);
         } finally {
@@ -2968,10 +2976,14 @@ public final class StandardProcessGroup implements ProcessGroup {
 
 
     private void updateProcessGroup(final ProcessGroup group, final VersionedProcessGroup proposed, final String componentIdSeed,
-        final Set<String> updatedVersionedComponentIds, final boolean updatePosition) throws ProcessorInstantiationException {
+        final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName) throws ProcessorInstantiationException {
 
         group.setComments(proposed.getComments());
-        group.setName(proposed.getName());
+
+        if (updateName) {
+            group.setName(proposed.getName());
+        }
+
         if (updatePosition && proposed.getPosition() != null) {
             group.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
         }
@@ -2998,7 +3010,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
         group.setVariables(updatedVariableMap);
 
-        final RemoteFlowCoordinates remoteCoordinates = proposed.getRemoteFlowCoordinates();
+        final VersionedFlowCoordinates remoteCoordinates = proposed.getVersionedFlowCoordinates();
         if (remoteCoordinates != null) {
             final String registryId = flowController.getFlowRegistryClient().getFlowRegistryId(remoteCoordinates.getRegistryUrl());
             final String bucketId = remoteCoordinates.getBucketId();
@@ -3022,7 +3034,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 final ProcessGroup added = addProcessGroup(proposedChildGroup, componentIdSeed);
                 LOG.info("Added {} to {}", added, this);
             } else {
-                updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true);
+                updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName);
                 LOG.info("Updated {}", childGroup);
             }
 
@@ -3136,14 +3148,29 @@ public final class StandardProcessGroup implements ProcessGroup {
         final Map<String, ProcessorNode> processorsByVersionedId = group.getProcessors().stream()
             .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
         final Set<String> processorsRemoved = new HashSet<>(processorsByVersionedId.keySet());
+        final Map<ProcessorNode, Set<Relationship>> autoTerminatedRelationships = new HashMap<>();
 
         for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) {
             final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier());
             if (processor == null) {
                 final ProcessorNode added = addProcessor(proposedProcessor, componentIdSeed);
+
+                final Set<Relationship> proposedAutoTerminated = proposedProcessor.getAutoTerminatedRelationships().stream()
+                    .map(relName -> added.getRelationship(relName))
+                    .collect(Collectors.toSet());
+                autoTerminatedRelationships.put(added, proposedAutoTerminated);
                 LOG.info("Added {} to {}", added, this);
             } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
                 updateProcessor(processor, proposedProcessor);
+
+                final Set<Relationship> proposedAutoTerminated = proposedProcessor.getAutoTerminatedRelationships().stream()
+                    .map(relName -> processor.getRelationship(relName))
+                    .collect(Collectors.toSet());
+
+                if (!processor.getAutoTerminatedRelationships().equals(proposedAutoTerminated)) {
+                    autoTerminatedRelationships.put(processor, proposedAutoTerminated);
+                }
+
                 LOG.info("Updated {}", processor);
             } else {
                 processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY()));
@@ -3205,6 +3232,13 @@ public final class StandardProcessGroup implements ProcessGroup {
             group.removeConnection(connection);
         }
 
+        // Once the appropriate connections have been removed, we may now update Processors' auto-terminated relationships.
+        // We cannot do this above, in the 'updateProcessor' call because if a connection is removed and changed to auto-terminated,
+        // then updating this in the updateProcessor call above would attempt to set the Relationship to being auto-terminated while a
+        // Connection for that relationship exists. This will throw an Exception.
+        autoTerminatedRelationships.forEach((proc, rels) -> proc.setAutoTerminatedRelationships(rels));
+
+        // Remove all controller services no longer in use
         for (final String removedVersionedId : controllerServicesRemoved) {
             final ControllerServiceNode service = servicesByVersionedId.get(removedVersionedId);
             LOG.info("Removing {} from {}", service, group);
@@ -3276,7 +3310,7 @@ public final class StandardProcessGroup implements ProcessGroup {
         final ProcessGroup group = flowController.createProcessGroup(generateUuid(componentIdSeed));
         group.setVersionedComponentId(proposed.getIdentifier());
         addProcessGroup(group);
-        updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true);
+        updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true);
         return group;
     }
 
@@ -3535,10 +3569,6 @@ public final class StandardProcessGroup implements ProcessGroup {
         processor.setYieldPeriod(proposed.getYieldDuration());
         processor.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
 
-        processor.setAutoTerminatedRelationships(proposed.getAutoTerminatedRelationships().stream()
-            .map(relName -> processor.getRelationship(relName))
-            .collect(Collectors.toSet()));
-
         if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) {
             final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
             final List<PropertyDescriptor> descriptors = new ArrayList<>(processor.getProperties().keySet());
@@ -3547,6 +3577,7 @@ public final class StandardProcessGroup implements ProcessGroup {
         }
     }
 
+
     private Map<String, String> populatePropertiesMap(final Map<PropertyDescriptor, String> currentProperties, final Map<String, String> proposedProperties) {
         final Map<String, String> fullPropertyMap = new HashMap<>();
         for (final PropertyDescriptor property : currentProperties.keySet()) {
@@ -3646,7 +3677,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
             @Override
             public String getName() {
-                return "Flow Under Version Control";
+                return "Versioned Flow";
             }
         };
 
@@ -3659,7 +3690,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             .findAny()
             .isPresent();
 
-        LOG.debug("There are {} differences between this flow and the versioned snapshot of this flow: {}", differences.size(), differences);
+        LOG.debug("There are {} differences between this Local FLow and the Versioned Flow: {}", differences.size(), differences);
         return Optional.of(modified);
     }
 
@@ -3669,27 +3700,24 @@ public final class StandardProcessGroup implements ProcessGroup {
         readLock.lock();
         try {
             final VersionControlInformation versionControlInfo = getVersionControlInformation();
-            if (versionControlInfo == null) {
-                throw new IllegalStateException("Cannot update the Version of the flow for " + this
-                    + " because the Process Group is not currently under Version Control");
-            }
-
-            if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getSnapshotMetadata().getFlowIdentifier())) {
-                throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with");
-            }
-
-            if (verifyNotDirty) {
-                final Optional<Boolean> modifiedOption = versionControlInfo.getModified();
-                if (!modifiedOption.isPresent()) {
-                    throw new IllegalStateException(this + " cannot be updated to a different version of the flow because the local flow "
-                        + "has not yet been synchronized with the Flow Registry. The Process Group must be"
-                        + " synched with the Flow Registry before continuing. This will happen periodically in the background, so please try the request again later");
+            if (versionControlInfo != null) {
+                if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getSnapshotMetadata().getFlowIdentifier())) {
+                    throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with");
                 }
 
-                if (Boolean.TRUE.equals(modifiedOption.get())) {
-                    throw new IllegalStateException("Cannot change the Version of the flow for " + this
-                        + " because the Process Group has been modified since it was last synchronized with the Flow Registry. The Process Group must be"
-                        + " restored to its original form before changing the version");
+                if (verifyNotDirty) {
+                    final Optional<Boolean> modifiedOption = versionControlInfo.getModified();
+                    if (!modifiedOption.isPresent()) {
+                        throw new IllegalStateException(this + " cannot be updated to a different version of the flow because the local flow "
+                            + "has not yet been synchronized with the Flow Registry. The Process Group must be"
+                            + " synched with the Flow Registry before continuing. This will happen periodically in the background, so please try the request again later");
+                    }
+
+                    if (Boolean.TRUE.equals(modifiedOption.get())) {
+                        throw new IllegalStateException("Cannot change the Version of the flow for " + this
+                            + " because the Process Group has been modified since it was last synchronized with the Flow Registry. The Process Group must be"
+                            + " restored to its original form before changing the version");
+                    }
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
new file mode 100644
index 0000000..da5880c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.registry.bucket.Bucket;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * A simple file-based implementation of a Flow Registry Client. Rather than interacting
+ * with an actual Flow Registry, this implementation simply reads flows from disk and writes
+ * them to disk. It is not meant for any production use but is available for testing purposes.
+ */
+public class FileBasedFlowRegistry implements FlowRegistry {
+    private final File directory;
+    private final Map<String, Set<String>> flowNamesByBucket = new HashMap<>();
+    private final JsonFactory jsonFactory = new JsonFactory();
+    private final String id;
+    private volatile String name = "Local Registry";
+    private volatile String url = "file:" + (new File("..").getAbsolutePath());
+    private volatile String description = "Default file-based Flow Registry";
+
+    public FileBasedFlowRegistry(final String id, final String url) throws IOException {
+        final URI uri = URI.create(url);
+        if (!uri.getScheme().equalsIgnoreCase("file")) {
+            throw new IllegalArgumentException("Cannot create a File Based Flow Registry with a URL of " + url + "; URL scheme must be 'file'");
+        }
+
+        this.directory = new File(URI.create(url).getPath());
+
+        if (!directory.exists() && !directory.mkdirs()) {
+            throw new IOException("Could not access or create directory " + directory.getAbsolutePath() + " for Flow Registry");
+        }
+
+        this.id = id;
+        this.url = url;
+        recoverBuckets();
+    }
+
+    private void recoverBuckets() throws IOException {
+        final File[] bucketDirs = directory.listFiles();
+        if (bucketDirs == null) {
+            throw new IOException("Could not get listing of directory " + directory);
+        }
+
+        for (final File bucketDir : bucketDirs) {
+            final File[] flowDirs = bucketDir.listFiles();
+            if (flowDirs == null) {
+                throw new IOException("Could not get listing of directory " + bucketDir);
+            }
+
+            final Set<String> flowNames = new HashSet<>();
+            for (final File flowDir : flowDirs) {
+                final File propsFile = new File(flowDir, "flow.properties");
+                if (!propsFile.exists()) {
+                    continue;
+                }
+
+                final Properties properties = new Properties();
+                try (final InputStream in = new FileInputStream(propsFile)) {
+                    properties.load(in);
+                }
+
+                final String flowName = properties.getProperty("name");
+                if (flowName == null) {
+                    continue;
+                }
+
+                flowNames.add(flowName);
+            }
+
+            if (!flowNames.isEmpty()) {
+                flowNamesByBucket.put(bucketDir.getName(), flowNames);
+            }
+        }
+    }
+
+    @Override
+    public String getURL() {
+        return url;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public Set<Bucket> getBuckets(NiFiUser user) throws IOException {
+        final Set<Bucket> buckets = new HashSet<>();
+
+        final File[] bucketDirs = directory.listFiles();
+        if (bucketDirs == null) {
+            throw new IOException("Could not get listing of directory " + directory);
+        }
+
+        for (final File bucketDirectory : bucketDirs) {
+            final String bucketIdentifier = bucketDirectory.getName();
+            final long creation = bucketDirectory.lastModified();
+
+            final Bucket bucket = new Bucket();
+            bucket.setIdentifier(bucketIdentifier);
+            bucket.setName("Bucket '" + bucketIdentifier + "'");
+            bucket.setCreatedTimestamp(creation);
+
+            final Set<VersionedFlow> versionedFlows = new HashSet<>();
+            final File[] flowDirs = bucketDirectory.listFiles();
+            if (flowDirs != null) {
+                for (final File flowDir : flowDirs) {
+                    final String flowIdentifier = flowDir.getName();
+                    try {
+                        final VersionedFlow versionedFlow = getVersionedFlow(bucketIdentifier, flowIdentifier);
+                        versionedFlows.add(versionedFlow);
+                    } catch (UnknownResourceException e) {
+                        continue;
+                    }
+                }
+            }
+
+            bucket.setVersionedFlows(versionedFlows);
+
+            buckets.add(bucket);
+        }
+
+        return buckets;
+    }
+
+
+    @Override
+    public synchronized VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, UnknownResourceException {
+        Objects.requireNonNull(flow);
+        Objects.requireNonNull(flow.getBucketIdentifier());
+        Objects.requireNonNull(flow.getName());
+
+        // Verify that bucket exists
+        final File bucketDir = new File(directory, flow.getBucketIdentifier());
+        if (!bucketDir.exists()) {
+            throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier());
+        }
+
+        // Verify that there is no flow with the same name in that bucket
+        final Set<String> flowNames = flowNamesByBucket.get(flow.getBucketIdentifier());
+        if (flowNames != null && flowNames.contains(flow.getName())) {
+            throw new IllegalArgumentException("Flow with name '" + flow.getName() + "' already exists for Bucket with ID " + flow.getBucketIdentifier());
+        }
+
+        final String flowIdentifier = UUID.randomUUID().toString();
+        final File flowDir = new File(bucketDir, flowIdentifier);
+        if (!flowDir.mkdirs()) {
+            throw new IOException("Failed to create directory " + flowDir + " for new Flow");
+        }
+
+        final File propertiesFile = new File(flowDir, "flow.properties");
+
+        final Properties flowProperties = new Properties();
+        flowProperties.setProperty("name", flow.getName());
+        flowProperties.setProperty("created", String.valueOf(flow.getCreatedTimestamp()));
+        flowProperties.setProperty("description", flow.getDescription());
+        flowProperties.setProperty("lastModified", String.valueOf(flow.getModifiedTimestamp()));
+
+        try (final OutputStream out = new FileOutputStream(propertiesFile)) {
+            flowProperties.store(out, null);
+        }
+
+        final VersionedFlow response = new VersionedFlow();
+        response.setBucketIdentifier(flow.getBucketIdentifier());
+        response.setCreatedTimestamp(flow.getCreatedTimestamp());
+        response.setDescription(flow.getDescription());
+        response.setIdentifier(flowIdentifier);
+        response.setModifiedTimestamp(flow.getModifiedTimestamp());
+        response.setName(flow.getName());
+
+        return response;
+    }
+
+    @Override
+    public synchronized VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments)
+            throws IOException, UnknownResourceException {
+        Objects.requireNonNull(flow);
+        Objects.requireNonNull(flow.getBucketIdentifier());
+        Objects.requireNonNull(flow.getName());
+        Objects.requireNonNull(snapshot);
+
+        // Verify that the bucket exists
+        final File bucketDir = new File(directory, flow.getBucketIdentifier());
+        if (!bucketDir.exists()) {
+            throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier());
+        }
+
+        // Verify that the flow exists
+        final File flowDir = new File(bucketDir, flow.getIdentifier());
+        if (!flowDir.exists()) {
+            throw new UnknownResourceException("No Flow with ID " + flow.getIdentifier() + " exists for Bucket with ID " + flow.getBucketIdentifier());
+        }
+
+        final File[] versionDirs = flowDir.listFiles();
+        if (versionDirs == null) {
+            throw new IOException("Unable to perform listing of directory " + flowDir);
+        }
+
+        int maxVersion = 0;
+        for (final File versionDir : versionDirs) {
+            final String versionName = versionDir.getName();
+
+            final int version;
+            try {
+                version = Integer.parseInt(versionName);
+            } catch (final NumberFormatException nfe) {
+                continue;
+            }
+
+            if (version > maxVersion) {
+                maxVersion = version;
+            }
+        }
+
+        final int snapshotVersion = maxVersion + 1;
+        final File snapshotDir = new File(flowDir, String.valueOf(snapshotVersion));
+        if (!snapshotDir.mkdir()) {
+            throw new IOException("Could not create directory " + snapshotDir);
+        }
+
+        final File contentsFile = new File(snapshotDir, "flow.xml");
+
+        try (final OutputStream out = new FileOutputStream(contentsFile);
+            final JsonGenerator generator = jsonFactory.createGenerator(out)) {
+            generator.setCodec(new ObjectMapper());
+            generator.setPrettyPrinter(new DefaultPrettyPrinter());
+            generator.writeObject(snapshot);
+        }
+
+        final Properties snapshotProperties = new Properties();
+        snapshotProperties.setProperty("comments", comments);
+        snapshotProperties.setProperty("name", flow.getName());
+        final File snapshotPropsFile = new File(snapshotDir, "snapshot.properties");
+        try (final OutputStream out = new FileOutputStream(snapshotPropsFile)) {
+            snapshotProperties.store(out, null);
+        }
+
+        final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
+        snapshotMetadata.setBucketIdentifier(flow.getBucketIdentifier());
+        snapshotMetadata.setComments(comments);
+        snapshotMetadata.setFlowIdentifier(flow.getIdentifier());
+        snapshotMetadata.setFlowName(flow.getName());
+        snapshotMetadata.setTimestamp(System.currentTimeMillis());
+        snapshotMetadata.setVersion(snapshotVersion);
+
+        final VersionedFlowSnapshot response = new VersionedFlowSnapshot();
+        response.setSnapshotMetadata(snapshotMetadata);
+        response.setFlowContents(snapshot);
+        return response;
+    }
+
+    @Override
+    public int getLatestVersion(final String bucketId, final String flowId) throws IOException, UnknownResourceException {
+        // Verify that the bucket exists
+        final File bucketDir = new File(directory, bucketId);
+        if (!bucketDir.exists()) {
+            throw new UnknownResourceException("No bucket exists with ID " + bucketId);
+        }
+
+        // Verify that the flow exists
+        final File flowDir = new File(bucketDir, flowId);
+        if (!flowDir.exists()) {
+            throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + bucketId);
+        }
+
+        final File[] versionDirs = flowDir.listFiles();
+        if (versionDirs == null) {
+            throw new IOException("Unable to perform listing of directory " + flowDir);
+        }
+
+        int maxVersion = 0;
+        for (final File versionDir : versionDirs) {
+            final String versionName = versionDir.getName();
+
+            final int version;
+            try {
+                version = Integer.parseInt(versionName);
+            } catch (final NumberFormatException nfe) {
+                continue;
+            }
+
+            if (version > maxVersion) {
+                maxVersion = version;
+            }
+        }
+
+        return maxVersion;
+    }
+
+    @Override
+    public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, int version) throws IOException, UnknownResourceException {
+        // Verify that the bucket exists
+        final File bucketDir = new File(directory, bucketId);
+        if (!bucketDir.exists()) {
+            throw new UnknownResourceException("No bucket exists with ID " + bucketId);
+        }
+
+        // Verify that the flow exists
+        final File flowDir = new File(bucketDir, flowId);
+        if (!flowDir.exists()) {
+            throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
+        }
+
+        final File versionDir = new File(flowDir, String.valueOf(version));
+        if (!versionDir.exists()) {
+            throw new UnknownResourceException("Flow with ID " + flowId + " in Bucket with ID " + bucketId + " does not contain a snapshot with version " + version);
+        }
+
+        final File contentsFile = new File(versionDir, "flow.xml");
+
+        final VersionedProcessGroup processGroup;
+        try (final JsonParser parser = jsonFactory.createParser(contentsFile)) {
+            final ObjectMapper mapper = new ObjectMapper();
+            mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+            parser.setCodec(mapper);
+            processGroup = parser.readValueAs(VersionedProcessGroup.class);
+        }
+
+        final Properties properties = new Properties();
+        final File snapshotPropsFile = new File(versionDir, "snapshot.properties");
+        try (final InputStream in = new FileInputStream(snapshotPropsFile)) {
+            properties.load(in);
+        }
+
+        final String comments = properties.getProperty("comments");
+        final String flowName = properties.getProperty("name");
+
+        final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
+        snapshotMetadata.setBucketIdentifier(bucketId);
+        snapshotMetadata.setComments(comments);
+        snapshotMetadata.setFlowIdentifier(flowId);
+        snapshotMetadata.setFlowName(flowName);
+        snapshotMetadata.setTimestamp(System.currentTimeMillis());
+        snapshotMetadata.setVersion(version);
+
+        final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+        snapshot.setFlowContents(processGroup);
+        snapshot.setSnapshotMetadata(snapshotMetadata);
+
+        return snapshot;
+    }
+
+    @Override
+    public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, UnknownResourceException {
+        // Verify that the bucket exists
+        final File bucketDir = new File(directory, bucketId);
+        if (!bucketDir.exists()) {
+            throw new UnknownResourceException("No bucket exists with ID " + bucketId);
+        }
+
+        // Verify that the flow exists
+        final File flowDir = new File(bucketDir, flowId);
+        if (!flowDir.exists()) {
+            throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
+        }
+
+        final File flowPropsFile = new File(flowDir, "flow.properties");
+        final Properties flowProperties = new Properties();
+        try (final InputStream in = new FileInputStream(flowPropsFile)) {
+            flowProperties.load(in);
+        }
+
+        final VersionedFlow flow = new VersionedFlow();
+        flow.setBucketIdentifier(bucketId);
+        flow.setCreatedTimestamp(Long.parseLong(flowProperties.getProperty("created")));
+        flow.setDescription(flowProperties.getProperty("description"));
+        flow.setIdentifier(flowId);
+        flow.setModifiedTimestamp(flowDir.lastModified());
+        flow.setName(flowProperties.getProperty("name"));
+
+        final Comparator<VersionedFlowSnapshotMetadata> versionComparator = (a, b) -> Integer.compare(a.getVersion(), b.getVersion());
+
+        final SortedSet<VersionedFlowSnapshotMetadata> snapshotMetadataSet = new TreeSet<>(versionComparator);
+        flow.setSnapshotMetadata(snapshotMetadataSet);
+
+        final File[] versionDirs = flowDir.listFiles();
+        flow.setVersionCount(versionDirs.length);
+
+        for (final File file : versionDirs) {
+            if (!file.isDirectory()) {
+                continue;
+            }
+
+            int version;
+            try {
+                version = Integer.parseInt(file.getName());
+            } catch (final NumberFormatException nfe) {
+                // not a version. skip.
+                continue;
+            }
+
+            final File snapshotPropsFile = new File(file, "snapshot.properties");
+            final Properties snapshotProperties = new Properties();
+            try (final InputStream in = new FileInputStream(snapshotPropsFile)) {
+                snapshotProperties.load(in);
+            }
+
+            final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
+            metadata.setBucketIdentifier(bucketId);
+            metadata.setComments(snapshotProperties.getProperty("comments"));
+            metadata.setFlowIdentifier(flowId);
+            metadata.setFlowName(snapshotProperties.getProperty("name"));
+            metadata.setTimestamp(file.lastModified());
+            metadata.setVersion(version);
+
+            snapshotMetadataSet.add(metadata);
+        }
+
+        return flow;
+    }
+
+    @Override
+    public String getIdentifier() {
+        return id;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+
+    @Override
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    @Override
+    public void setURL(String url) {
+        this.url = url;
+    }
+
+    @Override
+    public void setName(String name) {
+        this.name = name;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java
deleted file mode 100644
index 2cc39c6..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java
+++ /dev/null
@@ -1,435 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.registry.flow;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.nifi.authorization.user.NiFiUser;
-import org.apache.nifi.registry.bucket.Bucket;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
-
-/**
- * A simple file-based implementation of a Flow Registry Client. Rather than interacting
- * with an actual Flow Registry, this implementation simply reads flows from disk and writes
- * them to disk. It is not meant for any production use but is available for testing purposes.
- */
-public class FileBasedFlowRegistryClient implements FlowRegistryClient, FlowRegistry {
-    private final File directory;
-    private final Map<String, Set<String>> flowNamesByBucket = new HashMap<>();
-    private final JsonFactory jsonFactory = new JsonFactory();
-
-    public FileBasedFlowRegistryClient(final File directory) throws IOException {
-        if (!directory.exists() && !directory.mkdirs()) {
-            throw new IOException("Could not access or create directory " + directory.getAbsolutePath() + " for Flow Registry");
-        }
-
-        this.directory = directory;
-        recoverBuckets();
-    }
-
-    private void recoverBuckets() throws IOException {
-        final File[] bucketDirs = directory.listFiles();
-        if (bucketDirs == null) {
-            throw new IOException("Could not get listing of directory " + directory);
-        }
-
-        for (final File bucketDir : bucketDirs) {
-            final File[] flowDirs = bucketDir.listFiles();
-            if (flowDirs == null) {
-                throw new IOException("Could not get listing of directory " + bucketDir);
-            }
-
-            final Set<String> flowNames = new HashSet<>();
-            for (final File flowDir : flowDirs) {
-                final File propsFile = new File(flowDir, "flow.properties");
-                if (!propsFile.exists()) {
-                    continue;
-                }
-
-                final Properties properties = new Properties();
-                try (final InputStream in = new FileInputStream(propsFile)) {
-                    properties.load(in);
-                }
-
-                final String flowName = properties.getProperty("name");
-                if (flowName == null) {
-                    continue;
-                }
-
-                flowNames.add(flowName);
-            }
-
-            if (!flowNames.isEmpty()) {
-                flowNamesByBucket.put(bucketDir.getName(), flowNames);
-            }
-        }
-    }
-
-    @Override
-    public FlowRegistry getFlowRegistry(final String registryId) {
-        if (!"default".equals(registryId)) {
-            return null;
-        }
-
-        return this;
-    }
-
-    @Override
-    public String getURL() {
-        return directory.toURI().toString();
-    }
-
-    @Override
-    public String getName() {
-        return "Local Registry";
-    }
-
-    @Override
-    public Set<Bucket> getBuckets(NiFiUser user) throws IOException {
-        final Set<Bucket> buckets = new HashSet<>();
-
-        final File[] bucketDirs = directory.listFiles();
-        if (bucketDirs == null) {
-            throw new IOException("Could not get listing of directory " + directory);
-        }
-
-        for (final File bucketDirectory : bucketDirs) {
-            final String bucketIdentifier = bucketDirectory.getName();
-            final long creation = bucketDirectory.lastModified();
-
-            final Bucket bucket = new Bucket();
-            bucket.setIdentifier(bucketIdentifier);
-            bucket.setName("Bucket '" + bucketIdentifier + "'");
-            bucket.setCreatedTimestamp(creation);
-
-            buckets.add(bucket);
-        }
-
-        return buckets;
-    }
-
-    @Override
-    public synchronized VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, UnknownResourceException {
-        Objects.requireNonNull(flow);
-        Objects.requireNonNull(flow.getBucketIdentifier());
-        Objects.requireNonNull(flow.getName());
-
-        // Verify that bucket exists
-        final File bucketDir = new File(directory, flow.getBucketIdentifier());
-        if (!bucketDir.exists()) {
-            throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier());
-        }
-
-        // Verify that there is no flow with the same name in that bucket
-        final Set<String> flowNames = flowNamesByBucket.get(flow.getBucketIdentifier());
-        if (flowNames != null && flowNames.contains(flow.getName())) {
-            throw new IllegalArgumentException("Flow with name '" + flow.getName() + "' already exists for Bucket with ID " + flow.getBucketIdentifier());
-        }
-
-        final String flowIdentifier = UUID.randomUUID().toString();
-        final File flowDir = new File(bucketDir, flowIdentifier);
-        if (!flowDir.mkdirs()) {
-            throw new IOException("Failed to create directory " + flowDir + " for new Flow");
-        }
-
-        final File propertiesFile = new File(flowDir, "flow.properties");
-
-        final Properties flowProperties = new Properties();
-        flowProperties.setProperty("name", flow.getName());
-        flowProperties.setProperty("created", String.valueOf(flow.getCreatedTimestamp()));
-        flowProperties.setProperty("description", flow.getDescription());
-        flowProperties.setProperty("lastModified", String.valueOf(flow.getModifiedTimestamp()));
-
-        try (final OutputStream out = new FileOutputStream(propertiesFile)) {
-            flowProperties.store(out, null);
-        }
-
-        final VersionedFlow response = new VersionedFlow();
-        response.setBucketIdentifier(flow.getBucketIdentifier());
-        response.setCreatedTimestamp(flow.getCreatedTimestamp());
-        response.setDescription(flow.getDescription());
-        response.setIdentifier(flowIdentifier);
-        response.setModifiedTimestamp(flow.getModifiedTimestamp());
-        response.setName(flow.getName());
-
-        return response;
-    }
-
-    @Override
-    public synchronized VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments)
-            throws IOException, UnknownResourceException {
-        Objects.requireNonNull(flow);
-        Objects.requireNonNull(flow.getBucketIdentifier());
-        Objects.requireNonNull(flow.getName());
-        Objects.requireNonNull(snapshot);
-
-        // Verify that the bucket exists
-        final File bucketDir = new File(directory, flow.getBucketIdentifier());
-        if (!bucketDir.exists()) {
-            throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier());
-        }
-
-        // Verify that the flow exists
-        final File flowDir = new File(bucketDir, flow.getIdentifier());
-        if (!flowDir.exists()) {
-            throw new UnknownResourceException("No Flow with ID " + flow.getIdentifier() + " exists for Bucket with ID " + flow.getBucketIdentifier());
-        }
-
-        final File[] versionDirs = flowDir.listFiles();
-        if (versionDirs == null) {
-            throw new IOException("Unable to perform listing of directory " + flowDir);
-        }
-
-        int maxVersion = 0;
-        for (final File versionDir : versionDirs) {
-            final String versionName = versionDir.getName();
-
-            final int version;
-            try {
-                version = Integer.parseInt(versionName);
-            } catch (final NumberFormatException nfe) {
-                continue;
-            }
-
-            if (version > maxVersion) {
-                maxVersion = version;
-            }
-        }
-
-        final int snapshotVersion = maxVersion + 1;
-        final File snapshotDir = new File(flowDir, String.valueOf(snapshotVersion));
-        if (!snapshotDir.mkdir()) {
-            throw new IOException("Could not create directory " + snapshotDir);
-        }
-
-        final File contentsFile = new File(snapshotDir, "flow.xml");
-
-        try (final OutputStream out = new FileOutputStream(contentsFile);
-            final JsonGenerator generator = jsonFactory.createJsonGenerator(out)) {
-            generator.setCodec(new ObjectMapper());
-            generator.setPrettyPrinter(new DefaultPrettyPrinter());
-            generator.writeObject(snapshot);
-        }
-
-        final Properties snapshotProperties = new Properties();
-        snapshotProperties.setProperty("comments", comments);
-        snapshotProperties.setProperty("name", flow.getName());
-        final File snapshotPropsFile = new File(snapshotDir, "snapshot.properties");
-        try (final OutputStream out = new FileOutputStream(snapshotPropsFile)) {
-            snapshotProperties.store(out, null);
-        }
-
-        final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
-        snapshotMetadata.setBucketIdentifier(flow.getBucketIdentifier());
-        snapshotMetadata.setComments(comments);
-        snapshotMetadata.setFlowIdentifier(flow.getIdentifier());
-        snapshotMetadata.setFlowName(flow.getName());
-        snapshotMetadata.setTimestamp(System.currentTimeMillis());
-        snapshotMetadata.setVersion(snapshotVersion);
-
-        final VersionedFlowSnapshot response = new VersionedFlowSnapshot();
-        response.setSnapshotMetadata(snapshotMetadata);
-        response.setFlowContents(snapshot);
-        return response;
-    }
-
-    @Override
-    public Set<String> getRegistryIdentifiers() {
-        return Collections.singleton("default");
-    }
-
-    @Override
-    public int getLatestVersion(final String bucketId, final String flowId) throws IOException, UnknownResourceException {
-        // Verify that the bucket exists
-        final File bucketDir = new File(directory, bucketId);
-        if (!bucketDir.exists()) {
-            throw new UnknownResourceException("No bucket exists with ID " + bucketId);
-        }
-
-        // Verify that the flow exists
-        final File flowDir = new File(bucketDir, flowId);
-        if (!flowDir.exists()) {
-            throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + bucketId);
-        }
-
-        final File[] versionDirs = flowDir.listFiles();
-        if (versionDirs == null) {
-            throw new IOException("Unable to perform listing of directory " + flowDir);
-        }
-
-        int maxVersion = 0;
-        for (final File versionDir : versionDirs) {
-            final String versionName = versionDir.getName();
-
-            final int version;
-            try {
-                version = Integer.parseInt(versionName);
-            } catch (final NumberFormatException nfe) {
-                continue;
-            }
-
-            if (version > maxVersion) {
-                maxVersion = version;
-            }
-        }
-
-        return maxVersion;
-    }
-
-    @Override
-    public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, int version) throws IOException, UnknownResourceException {
-        // Verify that the bucket exists
-        final File bucketDir = new File(directory, bucketId);
-        if (!bucketDir.exists()) {
-            throw new UnknownResourceException("No bucket exists with ID " + bucketId);
-        }
-
-        // Verify that the flow exists
-        final File flowDir = new File(bucketDir, flowId);
-        if (!flowDir.exists()) {
-            throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
-        }
-
-        final File versionDir = new File(flowDir, String.valueOf(version));
-        if (!versionDir.exists()) {
-            throw new UnknownResourceException("Flow with ID " + flowId + " in Bucket with ID " + bucketId + " does not contain a snapshot with version " + version);
-        }
-
-        final File contentsFile = new File(versionDir, "flow.xml");
-
-        final VersionedProcessGroup processGroup;
-        try (final JsonParser parser = jsonFactory.createParser(contentsFile)) {
-            final ObjectMapper mapper = new ObjectMapper();
-            mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-
-            parser.setCodec(mapper);
-            processGroup = parser.readValueAs(VersionedProcessGroup.class);
-        }
-
-        final Properties properties = new Properties();
-        final File snapshotPropsFile = new File(versionDir, "snapshot.properties");
-        try (final InputStream in = new FileInputStream(snapshotPropsFile)) {
-            properties.load(in);
-        }
-
-        final String comments = properties.getProperty("comments");
-        final String flowName = properties.getProperty("name");
-
-        final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
-        snapshotMetadata.setBucketIdentifier(bucketId);
-        snapshotMetadata.setComments(comments);
-        snapshotMetadata.setFlowIdentifier(flowId);
-        snapshotMetadata.setFlowName(flowName);
-        snapshotMetadata.setTimestamp(System.currentTimeMillis());
-        snapshotMetadata.setVersion(version);
-
-        final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
-        snapshot.setFlowContents(processGroup);
-        snapshot.setSnapshotMetadata(snapshotMetadata);
-
-        return snapshot;
-    }
-
-    @Override
-    public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, UnknownResourceException {
-        // Verify that the bucket exists
-        final File bucketDir = new File(directory, bucketId);
-        if (!bucketDir.exists()) {
-            throw new UnknownResourceException("No bucket exists with ID " + bucketId);
-        }
-
-        // Verify that the flow exists
-        final File flowDir = new File(bucketDir, flowId);
-        if (!flowDir.exists()) {
-            throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
-        }
-
-        final File flowPropsFile = new File(flowDir, "flow.properties");
-        final Properties flowProperties = new Properties();
-        try (final InputStream in = new FileInputStream(flowPropsFile)) {
-            flowProperties.load(in);
-        }
-
-        final VersionedFlow flow = new VersionedFlow();
-        flow.setBucketIdentifier(bucketId);
-        flow.setCreatedTimestamp(Long.parseLong(flowProperties.getProperty("created")));
-        flow.setDescription(flowProperties.getProperty("description"));
-        flow.setIdentifier(flowId);
-        flow.setModifiedTimestamp(flowDir.lastModified());
-        flow.setName(flowProperties.getProperty("name"));
-
-        final Comparator<VersionedFlowSnapshotMetadata> versionComparator = (a, b) -> Integer.compare(a.getVersion(), b.getVersion());
-
-        final SortedSet<VersionedFlowSnapshotMetadata> snapshotMetadataSet = new TreeSet<>(versionComparator);
-        flow.setSnapshotMetadata(snapshotMetadataSet);
-
-        final File[] versionDirs = flowDir.listFiles();
-        for (final File file : versionDirs) {
-            if (!file.isDirectory()) {
-                continue;
-            }
-
-            int version;
-            try {
-                version = Integer.parseInt(file.getName());
-            } catch (final NumberFormatException nfe) {
-                // not a version. skip.
-                continue;
-            }
-
-            final File snapshotPropsFile = new File(file, "snapshot.properties");
-            final Properties snapshotProperties = new Properties();
-            try (final InputStream in = new FileInputStream(snapshotPropsFile)) {
-                snapshotProperties.load(in);
-            }
-
-            final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
-            metadata.setBucketIdentifier(bucketId);
-            metadata.setComments(snapshotProperties.getProperty("comments"));
-            metadata.setFlowIdentifier(flowId);
-            metadata.setFlowName(snapshotProperties.getProperty("name"));
-            metadata.setTimestamp(file.lastModified());
-            metadata.setVersion(version);
-
-            snapshotMetadataSet.add(metadata);
-        }
-
-        return flow;
-    }
-}