You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/10/26 18:39:53 UTC

[nifi] branch main updated: NIFI-10673: When a component is added to a ProcessGroup, we had a method called ensureUniqueVersionControlId but the method only took into accounts in that Process Group. Updated methods to now consider components in any Process Group that is part of the same Versioned Flow. Also added system test to verify the problem and the fix. Added Toolkit clients and additional methods as necessary in order to implement system tests

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d390a0becf NIFI-10673: When a component is added to a ProcessGroup, we had a method called ensureUniqueVersionControlId but the method only took into accounts in that Process Group. Updated methods to now consider components in any Process Group that is part of the same Versioned Flow. Also added system test to verify the problem and the fix. Added Toolkit clients and additional methods as necessary in order to implement system tests
d390a0becf is described below

commit d390a0becf4ee88dce4094e59bb3ddc1b7786585
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon Sep 26 10:19:32 2022 -0400

    NIFI-10673: When a component is added to a ProcessGroup, we had a method called ensureUniqueVersionControlId but the method only took into accounts in that Process Group. Updated methods to now consider components in any Process Group that is part of the same Versioned Flow. Also added system test to verify the problem and the fix. Added Toolkit clients and additional methods as necessary in order to implement system tests
    
    NIFI-10673: Addresed checkstyle issue
    
    NIFI-10673: Removed TODO comment
    Signed-off-by: Matthew Burgess <ma...@apache.org>
---
 .../web/api/dto/VersionControlInformationDTO.java  |   9 +-
 .../apache/nifi/groups/StandardProcessGroup.java   |  72 ++++-
 .../apache/nifi/web/StandardNiFiServiceFacade.java |   5 +-
 .../org/apache/nifi/web/api/VersionsResource.java  |   2 +-
 .../nifi-system-test-extensions/pom.xml            |   4 +
 .../registry/FileSystemFlowRegistryClient.java     | 350 +++++++++++++++++++++
 ...rg.apache.nifi.registry.flow.FlowRegistryClient |  16 +
 .../apache/nifi/tests/system/NiFiClientUtil.java   | 117 +++++++
 .../tests/system/registry/RegistryClientIT.java    | 217 +++++++++++++
 .../toolkit/cli/impl/client/NiFiClientFactory.java |  11 +
 .../toolkit/cli/impl/client/nifi/NiFiClient.java   |   6 +
 .../cli/impl/client/nifi/ProcessGroupClient.java   |   5 +
 .../{VersionsClient.java => SnippetClient.java}    |  22 +-
 .../cli/impl/client/nifi/VersionsClient.java       |   9 +
 .../impl/client/nifi/impl/JerseyNiFiClient.java    |  11 +
 .../client/nifi/impl/JerseyProcessGroupClient.java |  24 ++
 .../impl/client/nifi/impl/JerseySnippetClient.java |  54 ++++
 .../client/nifi/impl/JerseyVersionsClient.java     |  79 +++++
 18 files changed, 984 insertions(+), 29 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java
index e8e124d5f0..e78c37e4a4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java
@@ -23,6 +23,13 @@ import javax.xml.bind.annotation.XmlType;
 
 @XmlType(name = "versionControlInformation")
 public class VersionControlInformationDTO {
+    public static final String LOCALLY_MODIFIED = "LOCALLY_MODIFIED";
+    public static final String STALE = "STALE";
+    public static final String LOCALLY_MODIFIED_AND_STALE = "LOCALLY_MODIFIED_AND_STALE";
+    public static final String UP_TO_DATE = "UP_TO_DATE";
+    public static final String SYNC_FAILURE = "SYNC_FAILURE";
+    private static final String ALLOWABLE_STATES = String.join(", ", LOCALLY_MODIFIED, STALE, LOCALLY_MODIFIED_AND_STALE, UP_TO_DATE, SYNC_FAILURE);
+
     private String groupId;
     private String registryId;
     private String registryName;
@@ -128,7 +135,7 @@ public class VersionControlInformationDTO {
 
     @ApiModelProperty(accessMode = ApiModelProperty.AccessMode.READ_ONLY,
         value = "The current state of the Process Group, as it relates to the Versioned Flow",
-        allowableValues = "LOCALLY_MODIFIED, STALE, LOCALLY_MODIFIED_AND_STALE, UP_TO_DATE, SYNC_FAILURE")
+        allowableValues = LOCALLY_MODIFIED + ", " + STALE + ", " + LOCALLY_MODIFIED_AND_STALE + ", " + UP_TO_DATE + ", " + SYNC_FAILURE)
     public String getState() {
         return state;
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 496a43cfc5..f021cd7ad3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -611,7 +611,7 @@ public final class StandardProcessGroup implements ProcessGroup {
         try {
             // Unique port check within the same group.
             verifyPortUniqueness(port, inputPorts, this::getInputPortByName);
-            ensureUniqueVersionControlId(port, getInputPorts());
+            ensureUniqueVersionControlId(port, ProcessGroup::getInputPorts);
 
             port.setProcessGroup(this);
             inputPorts.put(requireNonNull(port).getIdentifier(), port);
@@ -695,7 +695,7 @@ public final class StandardProcessGroup implements ProcessGroup {
         try {
             // Unique port check within the same group.
             verifyPortUniqueness(port, outputPorts, this::getOutputPortByName);
-            ensureUniqueVersionControlId(port, getOutputPorts());
+            ensureUniqueVersionControlId(port, ProcessGroup::getOutputPorts);
 
             port.setProcessGroup(this);
             outputPorts.put(port.getIdentifier(), port);
@@ -771,7 +771,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
         writeLock.lock();
         try {
-            ensureUniqueVersionControlId(group, getProcessGroups());
+            ensureUniqueVersionControlId(group, ProcessGroup::getProcessGroups);
 
             group.setParent(this);
             group.getVariableRegistry().setParent(getVariableRegistry());
@@ -880,7 +880,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 throw new IllegalStateException("RemoteProcessGroup already exists with ID " + remoteGroup.getIdentifier());
             }
 
-            ensureUniqueVersionControlId(remoteGroup, getRemoteProcessGroups());
+            ensureUniqueVersionControlId(remoteGroup, ProcessGroup::getRemoteProcessGroups);
             remoteGroup.setProcessGroup(this);
             remoteGroups.put(Objects.requireNonNull(remoteGroup).getIdentifier(), remoteGroup);
             onComponentModified();
@@ -962,7 +962,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 throw new IllegalStateException("A processor is already registered to this ProcessGroup with ID " + processorId);
             }
 
-            ensureUniqueVersionControlId(processor, getProcessors());
+            ensureUniqueVersionControlId(processor, ProcessGroup::getProcessors);
 
             processor.setProcessGroup(this);
             processor.getVariableRegistry().setParent(getVariableRegistry());
@@ -977,6 +977,7 @@ public final class StandardProcessGroup implements ProcessGroup {
         }
     }
 
+
     /**
      * A component's Versioned Component ID is used to link a component on the canvas to a component in a versioned flow.
      * There may, however, be multiple instances of the same versioned flow in a single NiFi instance. In this case, we will have
@@ -989,16 +990,19 @@ public final class StandardProcessGroup implements ProcessGroup {
      * is copied & pasted instead of being moved whenever a conflict occurs.
      *
      * @param component the component whose Versioned Component ID should be nulled if there's a conflict
-     * @param componentsToCheck the components to check to determine if there's a conflict
+     * @param extractComponents a function to obtain the to check to determine if there's a conflict from a given Process Group
      */
-    private void ensureUniqueVersionControlId(final org.apache.nifi.components.VersionedComponent component,
-                                              final Collection<? extends org.apache.nifi.components.VersionedComponent> componentsToCheck) {
+    private <T extends org.apache.nifi.components.VersionedComponent> void ensureUniqueVersionControlId(final org.apache.nifi.components.VersionedComponent component,
+                                              final Function<ProcessGroup, Collection<T>> extractComponents) {
         final Optional<String> optionalVersionControlId = component.getVersionedComponentId();
         if (!optionalVersionControlId.isPresent()) {
             return;
         }
 
         final String versionControlId = optionalVersionControlId.get();
+
+        final ProcessGroup versionedGroup = getVersionedAncestorOrSelf().orElse(this);
+        final Set<T> componentsToCheck = getComponentsInVersionedFlow(versionedGroup, extractComponents);
         final boolean duplicateId = containsVersionedComponentId(componentsToCheck, versionControlId);
 
         if (duplicateId) {
@@ -1009,6 +1013,52 @@ public final class StandardProcessGroup implements ProcessGroup {
         }
     }
 
+    /**
+     * If this Process Group is under version control, returns <code>this</code>. Otherwise, returns the nearest parent/ancestor group
+     * that is under version control. In the event that no Process Group in the chain up to the root group is currently under version control,
+     * will return an empty optional.
+     * @return the nearest Process Group in the chain up to <code>this</code> that is currently under version control, or an empty optional.
+     */
+    private Optional<ProcessGroup> getVersionedAncestorOrSelf() {
+        return getVersionedAncestorOrSelf(this);
+    }
+
+    private Optional<ProcessGroup> getVersionedAncestorOrSelf(final ProcessGroup start) {
+        if (start == null) {
+            return Optional.empty();
+        }
+
+        if (start.getVersionControlInformation() != null) {
+            return Optional.of(start);
+        }
+
+        return getVersionedAncestorOrSelf(start.getParent());
+    }
+
+    /**
+     * Extracts all components from the given Process Group, recursively, but does not include any child group that is directly version controlled.
+     * @param group the highest-level Process Group to extract components from
+     * @param extractComponents a function that extracts the appropriate components from a given Process Group
+     * @return the set of all components in the given Process Group and children/descendant groups, excluding any child/descendant group(s) that are directly version controlled.
+     */
+    private <T extends org.apache.nifi.components.VersionedComponent> Set<T> getComponentsInVersionedFlow(final ProcessGroup group, final Function<ProcessGroup, Collection<T>> extractComponents) {
+        final Set<T> accumulated = new HashSet<>();
+        getComponentsInVersionedFlow(group, extractComponents, accumulated);
+        return accumulated;
+    }
+
+    private <T> void getComponentsInVersionedFlow(final ProcessGroup group, final Function<ProcessGroup, Collection<T>> extractComponents, final Set<T> accumulated) {
+        final Collection<T> components = extractComponents.apply(group);
+        accumulated.addAll(components);
+
+        for (final ProcessGroup child : group.getProcessGroups()) {
+            if (child.getVersionControlInformation() == null) {
+                getComponentsInVersionedFlow(child, extractComponents, accumulated);
+            }
+        }
+    }
+
+
     private boolean containsVersionedComponentId(final Collection<? extends org.apache.nifi.components.VersionedComponent> components, final String id) {
         for (final org.apache.nifi.components.VersionedComponent component : components) {
             final Optional<String> optionalConnectableId = component.getVersionedComponentId();
@@ -1268,7 +1318,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 }
             }
 
-            ensureUniqueVersionControlId(connection, getConnections());
+            ensureUniqueVersionControlId(connection, ProcessGroup::getConnections);
             connection.setProcessGroup(this);
             source.addConnection(connection);
             if (source != destination) {  // don't call addConnection twice if it's a self-looping connection.
@@ -1485,7 +1535,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 throw new IllegalStateException("A label already exists in this ProcessGroup with ID " + label.getIdentifier());
             }
 
-            ensureUniqueVersionControlId(label, getLabels());
+            ensureUniqueVersionControlId(label, ProcessGroup::getLabels);
             label.setProcessGroup(this);
             labels.put(label.getIdentifier(), label);
             onComponentModified();
@@ -2236,7 +2286,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 throw new IllegalStateException("A funnel already exists in this ProcessGroup with ID " + funnel.getIdentifier());
             }
 
-            ensureUniqueVersionControlId(funnel, getFunnels());
+            ensureUniqueVersionControlId(funnel, ProcessGroup::getFunnels);
 
             funnel.setProcessGroup(this);
             funnels.put(funnel.getIdentifier(), funnel);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 1d6bc59d51..30c03dc06e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -5190,10 +5190,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         } catch (final FlowRegistryException e) {
             logger.error(e.getMessage(), e);
             throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket "
-                    + bucketId + ", Flow " + flowId + ", Version " + flowVersion);
+                    + bucketId + ", Flow " + flowId + ", Version " + flowVersion, e);
         } catch (final IOException ioe) {
-            throw new IllegalStateException(
-                    "Failed to communicate with Flow Registry when attempting to retrieve a versioned flow");
+            throw new IllegalStateException("Failed to communicate with Flow Registry when attempting to retrieve a versioned flow", ioe);
         }
 
         return snapshot;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
index 5fb78ba8c9..a212ac7383 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
@@ -1082,7 +1082,7 @@ public class VersionsResource extends FlowUpdateResource<VersionControlInformati
         @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
     })
     public Response initiateRevertFlowVersion(@ApiParam("The process group id.") @PathParam("id") final String groupId,
-        @ApiParam(value = "The controller service configuration details.", required = true) final VersionControlInformationEntity requestEntity) {
+        @ApiParam(value = "The Version Control Information to revert to.", required = true) final VersionControlInformationEntity requestEntity) {
 
         if (requestEntity == null) {
             throw new IllegalArgumentException("Version control information must be specified.");
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/pom.xml b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/pom.xml
index d0eddfdcfa..7676d15ec2 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/pom.xml
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/pom.xml
@@ -44,5 +44,9 @@
             <artifactId>nifi-bin-manager</artifactId>
             <version>1.19.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java
new file mode 100644
index 0000000000..f47846361c
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.flow.registry;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.registry.flow.AbstractFlowRegistryClient;
+import org.apache.nifi.registry.flow.FlowRegistryBucket;
+import org.apache.nifi.registry.flow.FlowRegistryClientConfigurationContext;
+import org.apache.nifi.registry.flow.FlowRegistryPermissions;
+import org.apache.nifi.registry.flow.RegisteredFlow;
+import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
+import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata;
+import org.apache.nifi.registry.flow.RegisteredFlowVersionInfo;
+import org.apache.nifi.util.file.FileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    {
+        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+        objectMapper.setDefaultPropertyInclusion(JsonInclude.Value.construct(JsonInclude.Include.NON_NULL, JsonInclude.Include.NON_NULL));
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
+    static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
+        .name("Directory")
+        .displayName("Directory")
+        .description("The root directory to store flows in")
+        .required(true)
+        .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+        .defaultValue("target/flow-registry-storage")
+        .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Collections.singletonList(DIRECTORY);
+    }
+
+    @Override
+    public boolean isStorageLocationApplicable(final FlowRegistryClientConfigurationContext context, final String storageLocation) {
+        try {
+            final URL url = new URL(storageLocation);
+            final File file = new java.io.File(url.toURI());
+            final Path path = file.toPath();
+
+            final String configuredDirectory = context.getProperty(DIRECTORY).getValue();
+            final Path rootPath = Paths.get(configuredDirectory);
+
+            // If this doesn't throw an Exception, the given storageLocation is relative to the root path
+            rootPath.relativize(path);
+        } catch (final Exception e) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public Set<FlowRegistryBucket> getBuckets(final FlowRegistryClientConfigurationContext context) throws IOException {
+        final File rootDir = getRootDirectory(context);
+        final File[] children = rootDir.listFiles();
+        if (children == null) {
+            throw new IOException("Cannot get listing of directory " + rootDir.getAbsolutePath());
+        }
+
+        final Set<FlowRegistryBucket> buckets = Arrays.stream(children).map(this::toBucket).collect(Collectors.toSet());
+        return buckets;
+    }
+
+    private FlowRegistryBucket toBucket(final File file) {
+        final FlowRegistryBucket bucket = new FlowRegistryBucket();
+        bucket.setIdentifier(file.getName());
+        bucket.setName(bucket.getName());
+
+        final FlowRegistryPermissions permissions = new FlowRegistryPermissions();
+        permissions.setCanDelete(true);
+        permissions.setCanRead(true);
+        permissions.setCanWrite(true);
+
+        bucket.setPermissions(permissions);
+        return bucket;
+    }
+
+    private File getRootDirectory(final FlowRegistryClientConfigurationContext context) {
+        final String rootDirectory = context.getProperty(DIRECTORY).getValue();
+        if (rootDirectory == null) {
+            throw new IllegalStateException("Registry Client cannot be used, as Directory property has not been set");
+        }
+
+        return new File(rootDirectory);
+    }
+
+    @Override
+    public FlowRegistryBucket getBucket(final FlowRegistryClientConfigurationContext context, final String bucketId) {
+        final File rootDir = getRootDirectory(context);
+        final File bucketDir = new File(rootDir, bucketId);
+        final FlowRegistryBucket bucket = toBucket(bucketDir);
+        return bucket;
+    }
+
+    @Override
+    public RegisteredFlow registerFlow(final FlowRegistryClientConfigurationContext context, final RegisteredFlow flow) throws IOException {
+        final File rootDir = getRootDirectory(context);
+        final String bucketId = flow.getBucketIdentifier();
+        final File bucketDir = new File(rootDir, bucketId);
+        final File flowDir = new File(bucketDir, flow.getIdentifier());
+        Files.createDirectories(flowDir.toPath());
+
+        return flow;
+    }
+
+    @Override
+    public RegisteredFlow deregisterFlow(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws IOException {
+        final File rootDir = getRootDirectory(context);
+        final File bucketDir = new File(rootDir, bucketId);
+        final File flowDir = new File(bucketDir, flowId);
+
+        final File[] versionDirs = flowDir.listFiles();
+
+        final RegisteredFlow flow = new RegisteredFlow();
+        flow.setBucketIdentifier(bucketId);
+        flow.setBucketName(bucketId);
+        flow.setIdentifier(flowId);
+        flow.setLastModifiedTimestamp(flowDir.lastModified());
+        flow.setVersionCount(versionDirs == null ? 0 : versionDirs.length);
+
+        FileUtils.deleteFile(flowDir, true);
+        return flow;
+    }
+
+    @Override
+    public RegisteredFlow getFlow(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) {
+        final File rootDir = getRootDirectory(context);
+        final File bucketDir = new File(rootDir, bucketId);
+        final File flowDir = new File(bucketDir, flowId);
+
+        final File[] versionDirs = flowDir.listFiles();
+
+        final RegisteredFlow flow = new RegisteredFlow();
+        flow.setBucketIdentifier(bucketId);
+        flow.setBucketName(bucketId);
+        flow.setIdentifier(flowId);
+        flow.setLastModifiedTimestamp(flowDir.lastModified());
+        flow.setVersionCount(versionDirs == null ? 0 : versionDirs.length);
+
+        return flow;
+    }
+
+    @Override
+    public Set<RegisteredFlow> getFlows(final FlowRegistryClientConfigurationContext context, final String bucketId) throws IOException {
+        final File rootDir = getRootDirectory(context);
+        final File bucketDir = new File(rootDir, bucketId);
+        final File[] flowDirs = bucketDir.listFiles();
+        if (flowDirs == null) {
+            throw new IOException("Could not get listing of directory " + bucketDir);
+        }
+
+        final Set<RegisteredFlow> registeredFlows = new HashSet<>();
+        for (final File flowDir : flowDirs) {
+            final RegisteredFlow flow = getFlow(context, bucketId, flowDir.getName());
+            registeredFlows.add(flow);
+        }
+
+        return registeredFlows;
+    }
+
+    @Override
+    public RegisteredFlowSnapshot getFlowContents(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final int version) throws IOException {
+        final File rootDir = getRootDirectory(context);
+        final File bucketDir = new File(rootDir, bucketId);
+        final File flowDir = new File(bucketDir, flowId);
+        final File versionDir = new File(flowDir, String.valueOf(version));
+        final File snapshotFile = new File(versionDir, "snapshot.json");
+
+        final JsonFactory factory = new JsonFactory(objectMapper);
+        try (final JsonParser parser = factory.createParser(snapshotFile)) {
+            final RegisteredFlowSnapshot snapshot = parser.readValueAs(RegisteredFlowSnapshot.class);
+            return snapshot;
+        }
+    }
+
+    @Override
+    public RegisteredFlowSnapshot registerFlowSnapshot(final FlowRegistryClientConfigurationContext context, final RegisteredFlowSnapshot flowSnapshot) throws IOException {
+        final File rootDir = getRootDirectory(context);
+        final RegisteredFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata();
+        final String bucketId = metadata.getBucketIdentifier();
+        final String flowId = metadata.getFlowIdentifier();
+        final long version = metadata.getVersion();
+
+        final File bucketDir = new File(rootDir, bucketId);
+        final File flowDir = new File(bucketDir, flowId);
+        final File versionDir = new File(flowDir, String.valueOf(version));
+
+        // Create the directory for the version, if it doesn't exist.
+        if (!versionDir.exists()) {
+            Files.createDirectories(versionDir.toPath());
+        }
+
+        final File snapshotFile = new File(versionDir, "snapshot.json");
+
+        final RegisteredFlowSnapshot fullyPopulated = fullyPopulate(flowSnapshot, flowDir);
+        final JsonFactory factory = new JsonFactory(objectMapper);
+        try (final JsonGenerator generator = factory.createGenerator(snapshotFile, JsonEncoding.UTF8)) {
+            generator.writeObject(fullyPopulated);
+        }
+
+        return fullyPopulated;
+    }
+
+    private RegisteredFlowSnapshot fullyPopulate(final RegisteredFlowSnapshot requested, final File flowDir) {
+        final RegisteredFlowSnapshot full = new RegisteredFlowSnapshot();
+        full.setExternalControllerServices(requested.getExternalControllerServices());
+        full.setFlowContents(requested.getFlowContents());
+        full.setFlowEncodingVersion(requested.getFlowEncodingVersion());
+        full.setParameterContexts(requested.getParameterContexts());
+        full.setParameterProviders(requested.getParameterProviders());
+        full.setSnapshotMetadata(requested.getSnapshotMetadata());
+
+        // Populated the bucket
+        final FlowRegistryBucket bucket;
+        if (requested.getBucket() == null) {
+            bucket = new FlowRegistryBucket();
+            bucket.setCreatedTimestamp(System.currentTimeMillis());
+            bucket.setDescription("Description");
+            bucket.setIdentifier(requested.getSnapshotMetadata().getBucketIdentifier());
+            bucket.setName(requested.getSnapshotMetadata().getBucketIdentifier());
+
+            final FlowRegistryPermissions bucketPermissions = createAllowAllPermissions();
+            bucket.setPermissions(bucketPermissions);
+        } else {
+            bucket = requested.getBucket();
+        }
+        full.setBucket(bucket);
+
+        // Populate the flow
+        final RegisteredFlow flow;
+        if (requested.getFlow() == null) {
+            flow = new RegisteredFlow();
+            flow.setBucketIdentifier(requested.getSnapshotMetadata().getBucketIdentifier());
+            flow.setBucketName(requested.getSnapshotMetadata().getBucketIdentifier());
+            flow.setCreatedTimestamp(System.currentTimeMillis());
+            flow.setDescription("Description");
+            flow.setIdentifier(requested.getSnapshotMetadata().getFlowIdentifier());
+            flow.setName(requested.getSnapshotMetadata().getFlowIdentifier());
+            flow.setLastModifiedTimestamp(System.currentTimeMillis());
+            flow.setPermissions(createAllowAllPermissions());
+
+            final File[] flowVersionDirs = flowDir.listFiles();
+            final int versionCount = flowVersionDirs == null ? 0 : flowVersionDirs.length;;
+            flow.setVersionCount(versionCount);
+
+            final RegisteredFlowVersionInfo versionInfo = new RegisteredFlowVersionInfo();
+            versionInfo.setVersion(versionCount);
+            flow.setVersionInfo(versionInfo);
+        } else {
+            flow = requested.getFlow();
+        }
+        full.setFlow(flow);
+
+        return full;
+    }
+
+    private FlowRegistryPermissions createAllowAllPermissions() {
+        final FlowRegistryPermissions permissions = new FlowRegistryPermissions();
+        permissions.setCanWrite(true);
+        permissions.setCanRead(true);
+        permissions.setCanDelete(true);
+        return permissions;
+    }
+
+    @Override
+    public Set<RegisteredFlowSnapshotMetadata> getFlowVersions(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws IOException {
+        final File rootDir = getRootDirectory(context);
+        final File bucketDir = new File(rootDir, bucketId);
+        final File flowDir = new File(bucketDir, flowId);
+        final File[] versionDirs = flowDir.listFiles();
+        if (versionDirs == null) {
+            throw new IOException("Could not list directories of " + flowDir);
+        }
+
+        final Set<RegisteredFlowSnapshotMetadata> metadatas = new HashSet<>();
+        for (final File versionDir : versionDirs) {
+            final String versionName = versionDir.getName();
+
+            final RegisteredFlowSnapshotMetadata metadata = new RegisteredFlowSnapshotMetadata();
+            metadata.setVersion(Integer.parseInt(versionName));
+            metadata.setTimestamp(versionDir.lastModified());
+            metadata.setFlowIdentifier(flowId);
+            metadata.setBucketIdentifier(bucketId);
+            metadata.setAuthor("System Test Author");
+            metadatas.add(metadata);
+        }
+
+        return metadatas;
+    }
+
+    @Override
+    public int getLatestVersion(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws IOException {
+        final File rootDir = getRootDirectory(context);
+        final File bucketDir = new File(rootDir, bucketId);
+        final File flowDir = new File(bucketDir, flowId);
+        final File[] versionDirs = flowDir.listFiles();
+        if (versionDirs == null) {
+            throw new IOException("Cannot list directories of " + flowDir);
+        }
+
+        final OptionalInt greatestValue = Arrays.stream(versionDirs)
+            .map(File::getName)
+            .mapToInt(Integer::parseInt)
+            .max();
+        return greatestValue.orElse(-1);
+    }
+}
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowRegistryClient b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowRegistryClient
new file mode 100644
index 0000000000..d3f329dd8a
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowRegistryClient
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.flow.registry.FileSystemFlowRegistryClient
\ No newline at end of file
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index fb7f9948a5..b6f1c37623 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -30,6 +30,7 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.ConnectionClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.VersionsClient;
 import org.apache.nifi.web.api.dto.BundleDTO;
 import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
 import org.apache.nifi.web.api.dto.ConnectableDTO;
@@ -38,6 +39,7 @@ import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.CounterDTO;
 import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
 import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
+import org.apache.nifi.web.api.dto.FlowRegistryClientDTO;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.apache.nifi.web.api.dto.NodeDTO;
 import org.apache.nifi.web.api.dto.ParameterContextDTO;
@@ -52,9 +54,12 @@ import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.ReportingTaskDTO;
 import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.SnippetDTO;
 import org.apache.nifi.web.api.dto.VariableDTO;
 import org.apache.nifi.web.api.dto.VariableRegistryDTO;
 import org.apache.nifi.web.api.dto.VerifyConfigRequestDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
+import org.apache.nifi.web.api.dto.VersionedFlowDTO;
 import org.apache.nifi.web.api.dto.flow.FlowDTO;
 import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
@@ -67,9 +72,12 @@ import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceRunStatusEntity;
 import org.apache.nifi.web.api.entity.ControllerServicesEntity;
+import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
 import org.apache.nifi.web.api.entity.CountersEntity;
 import org.apache.nifi.web.api.entity.DropRequestEntity;
+import org.apache.nifi.web.api.entity.FlowEntity;
 import org.apache.nifi.web.api.entity.FlowFileEntity;
+import org.apache.nifi.web.api.entity.FlowRegistryClientEntity;
 import org.apache.nifi.web.api.entity.ListingRequestEntity;
 import org.apache.nifi.web.api.entity.NodeEntity;
 import org.apache.nifi.web.api.entity.ParameterContextEntity;
@@ -92,10 +100,14 @@ import org.apache.nifi.web.api.entity.ReportingTaskEntity;
 import org.apache.nifi.web.api.entity.ReportingTaskRunStatusEntity;
 import org.apache.nifi.web.api.entity.ReportingTasksEntity;
 import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
+import org.apache.nifi.web.api.entity.SnippetEntity;
+import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity;
 import org.apache.nifi.web.api.entity.VariableEntity;
 import org.apache.nifi.web.api.entity.VariableRegistryEntity;
 import org.apache.nifi.web.api.entity.VariableRegistryUpdateRequestEntity;
 import org.apache.nifi.web.api.entity.VerifyConfigRequestEntity;
+import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
+import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1552,4 +1564,109 @@ public class NiFiClientUtil {
 
         return nifiClient.getReportingTasksClient().updateReportingTask(entity);
     }
+
+    public FlowRegistryClientEntity createFlowRegistryClient(final String name) throws NiFiClientException, IOException {
+        final BundleDTO bundleDto = new BundleDTO(NiFiSystemIT.NIFI_GROUP_ID, NiFiSystemIT.TEST_EXTENSIONS_ARTIFACT_ID, nifiVersion);
+        final FlowRegistryClientDTO clientDto = new FlowRegistryClientDTO();
+        clientDto.setBundle(bundleDto);
+        clientDto.setName(name);
+        clientDto.setType("org.apache.nifi.flow.registry.FileSystemFlowRegistryClient");
+
+        final FlowRegistryClientEntity clientEntity = new FlowRegistryClientEntity();
+        clientEntity.setComponent(clientDto);
+        clientEntity.setRevision(createNewRevision());
+
+        return nifiClient.getControllerClient().createRegistryClient(clientEntity);
+    }
+
+    public FlowRegistryClientEntity updateRegistryClientProperties(final FlowRegistryClientEntity currentEntity, final Map<String, String> properties) throws NiFiClientException, IOException {
+        final FlowRegistryClientDTO updateDto = new FlowRegistryClientDTO();
+        updateDto.setProperties(properties);
+        updateDto.setId(currentEntity.getId());
+
+        final FlowRegistryClientEntity updateEntity = new FlowRegistryClientEntity();
+        updateEntity.setRevision(currentEntity.getRevision());
+        updateEntity.setId(currentEntity.getId());
+        updateEntity.setComponent(updateDto);
+
+        return nifiClient.getControllerClient().updateRegistryClient(updateEntity);
+    }
+
+    public VersionControlInformationEntity startVersionControl(final ProcessGroupEntity group, final FlowRegistryClientEntity registryClient, final String bucketId, final String flowName)
+            throws NiFiClientException, IOException{
+
+        final VersionedFlowDTO versionedFlowDto = new VersionedFlowDTO();
+        versionedFlowDto.setBucketId(bucketId);
+        versionedFlowDto.setFlowName(flowName);
+        versionedFlowDto.setRegistryId(registryClient.getId());
+        versionedFlowDto.setAction(VersionedFlowDTO.COMMIT_ACTION);
+
+        final StartVersionControlRequestEntity requestEntity = new StartVersionControlRequestEntity();
+        requestEntity.setProcessGroupRevision(group.getRevision());
+        requestEntity.setVersionedFlow(versionedFlowDto);
+
+        return nifiClient.getVersionsClient().startVersionControl(group.getId(), requestEntity);
+    }
+
+    public VersionedFlowUpdateRequestEntity revertChanges(final ProcessGroupEntity group) throws NiFiClientException, IOException, InterruptedException {
+        final VersionControlInformationEntity vciEntity = nifiClient.getVersionsClient().getVersionControlInfo(group.getId());
+        final VersionedFlowUpdateRequestEntity revertRequest = nifiClient.getVersionsClient().initiateRevertFlowVersion(group.getId(), vciEntity);
+        return waitForFlowRevertCompleted(revertRequest.getRequest().getRequestId());
+    }
+
+    private VersionedFlowUpdateRequestEntity waitForFlowRevertCompleted(final String requestId) throws NiFiClientException, IOException, InterruptedException {
+        final VersionsClient versionsClient = nifiClient.getVersionsClient();
+
+        while(true) {
+            final VersionedFlowUpdateRequestEntity entity = versionsClient.getRevertFlowVersionRequest(requestId);
+
+            if (entity.getRequest().isComplete()) {
+                return entity;
+            }
+
+            Thread.sleep(100L);
+        }
+    }
+
+    public ProcessGroupEntity importFlowFromRegistry(final String parentGroupId, final VersionControlInformationDTO vciDto)
+        throws NiFiClientException, IOException {
+        return importFlowFromRegistry(parentGroupId, vciDto.getRegistryId(), vciDto.getBucketId(), vciDto.getFlowId(), vciDto.getVersion());
+    }
+
+    public ProcessGroupEntity importFlowFromRegistry(final String parentGroupId, final String registryClientId, final String bucketId, final String flowId, final int version)
+                throws NiFiClientException, IOException {
+
+        final VersionControlInformationDTO vci = new VersionControlInformationDTO();
+        vci.setBucketId(bucketId);
+        vci.setFlowId(flowId);
+        vci.setVersion(version);
+        vci.setRegistryId(registryClientId);
+
+        final ProcessGroupDTO processGroupDto = new ProcessGroupDTO();
+        processGroupDto.setVersionControlInformation(vci);
+
+        final ProcessGroupEntity groupEntity = new ProcessGroupEntity();
+        groupEntity.setComponent(processGroupDto);
+        groupEntity.setRevision(createNewRevision());
+
+        return nifiClient.getProcessGroupClient().createProcessGroup(parentGroupId, groupEntity);
+    }
+
+    public FlowEntity copyAndPaste(final ProcessGroupEntity pgEntity, final String destinationGroupId) throws NiFiClientException, IOException {
+        final SnippetDTO snippetDto = new SnippetDTO();
+        snippetDto.setProcessGroups(Collections.singletonMap(pgEntity.getId(), pgEntity.getRevision()));
+        snippetDto.setParentGroupId(pgEntity.getComponent().getParentGroupId());
+
+        final SnippetEntity snippetEntity = new SnippetEntity();
+        snippetEntity.setSnippet(snippetDto);
+
+        final SnippetEntity createdSnippetEntity = nifiClient.getSnippetClient().createSnippet(snippetEntity);
+
+        final CopySnippetRequestEntity requestEntity = new CopySnippetRequestEntity();
+        requestEntity.setOriginX(0D);
+        requestEntity.setOriginY(0D);
+        requestEntity.setSnippetId(createdSnippetEntity.getSnippet().getId());
+
+        return nifiClient.getProcessGroupClient().copySnippet(destinationGroupId, requestEntity);
+    }
 }
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
new file mode 100644
index 0000000000..5f17db36f5
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.registry;
+
+import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
+import org.apache.nifi.web.api.dto.flow.FlowDTO;
+import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
+import org.apache.nifi.web.api.entity.FlowEntity;
+import org.apache.nifi.web.api.entity.FlowRegistryClientEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class RegistryClientIT extends NiFiSystemIT {
+    @Test
+    public void testStartVersionControlThenImport() throws NiFiClientException, IOException {
+        final FlowRegistryClientEntity clientEntity = registerClient();
+        final ProcessGroupEntity group = getClientUtil().createProcessGroup("Outer", "root");
+        final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", group.getId());
+
+        final VersionControlInformationEntity vci = getClientUtil().startVersionControl(group, clientEntity, "First Bucket", "First Flow");
+
+        final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", vci.getVersionControlInformation());
+        assertNotNull(imported);
+
+        final ProcessGroupFlowDTO importedFlow = getNifiClient().getFlowClient().getProcessGroup(imported.getId()).getProcessGroupFlow();
+        final FlowDTO importedGroupContents = importedFlow.getFlow();
+        final Set<ProcessorEntity> importedProcessors = importedGroupContents.getProcessors();
+        assertEquals(1, importedProcessors.size());
+
+        final ProcessorDTO importedProcessor = importedProcessors.iterator().next().getComponent();
+        assertEquals(terminate.getComponent().getType(), importedProcessor.getType());
+        assertEquals(terminate.getComponent().getName(), importedProcessor.getName());
+        assertNotEquals(terminate.getComponent().getId(), importedProcessor.getId());
+    }
+
+    private FlowRegistryClientEntity registerClient() throws NiFiClientException, IOException {
+        final FlowRegistryClientEntity clientEntity = getClientUtil().createFlowRegistryClient("FileRegistry");
+        final File storageDir = new File("target/flowRegistryStorage/" + getTestName().replace("\\(.*?\\)", ""));
+        Files.createDirectories(storageDir.toPath());
+        getClientUtil().updateRegistryClientProperties(clientEntity, Collections.singletonMap("Directory", storageDir.getAbsolutePath()));
+
+        return clientEntity;
+    }
+
+    @Test
+    public void testStartVersionControlThenModifyAndRevert() throws NiFiClientException, IOException, InterruptedException {
+        final FlowRegistryClientEntity clientEntity = registerClient();
+        final ProcessGroupEntity group = getClientUtil().createProcessGroup("Outer", "root");
+        final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", group.getId());
+
+        final VersionControlInformationEntity vci = getClientUtil().startVersionControl(group, clientEntity, "First Bucket", "First Flow");
+
+        String versionedFlowState = getVersionedFlowState(group.getId(), "root");
+        assertEquals("UP_TO_DATE", versionedFlowState);
+
+        getClientUtil().updateProcessorExecutionNode(terminate, ExecutionNode.PRIMARY);
+        versionedFlowState = getVersionedFlowState(group.getId(), "root");
+        assertEquals("LOCALLY_MODIFIED", versionedFlowState);
+
+        final ProcessorEntity locallyModifiedTerminate = getNifiClient().getProcessorClient().getProcessor(terminate.getId());
+        assertEquals(ExecutionNode.PRIMARY.name(), locallyModifiedTerminate.getComponent().getConfig().getExecutionNode());
+
+        getClientUtil().revertChanges(group);
+
+        final ProcessorEntity updatedTerminate = getNifiClient().getProcessorClient().getProcessor(terminate.getId());
+        assertEquals(ExecutionNode.ALL.name(), updatedTerminate.getComponent().getConfig().getExecutionNode());
+
+        versionedFlowState = getVersionedFlowState(group.getId(), "root");
+        assertEquals("UP_TO_DATE", versionedFlowState);
+    }
+
+    private String getVersionedFlowState(final String groupId, final String parentGroupId) throws NiFiClientException, IOException {
+        final ProcessGroupFlowDTO parentGroup = getNifiClient().getFlowClient().getProcessGroup(parentGroupId).getProcessGroupFlow();
+        final Set<ProcessGroupEntity> childGroups = parentGroup.getFlow().getProcessGroups();
+
+        return childGroups.stream()
+            .filter(childGroup -> groupId.equals(childGroup.getId()))
+            .map(ProcessGroupEntity::getVersionedFlowState)
+            .findAny()
+            .orElse(null);
+    }
+
+    @Test
+    public void testCopyPasteProcessGroupDoesNotDuplicateVersionedComponentId() throws NiFiClientException, IOException {
+        // Create a top-level PG and version it with nothing in it.
+        final FlowRegistryClientEntity clientEntity = registerClient();
+        final ProcessGroupEntity outerGroup = getClientUtil().createProcessGroup("Outer", "root");
+        getClientUtil().startVersionControl(outerGroup, clientEntity, "First Bucket", "First Flow");
+
+        // Create a lower level PG and add a Processor.
+        // Commit as Version 2 of the group.
+        final ProcessGroupEntity inner1 = getClientUtil().createProcessGroup("Inner 1", outerGroup.getId());
+        ProcessorEntity terminate1 = getClientUtil().createProcessor("TerminateFlowFile", inner1.getId());
+        VersionControlInformationEntity vciEntity = getClientUtil().startVersionControl(outerGroup, clientEntity, "First Bucket", "First Flow");
+        assertEquals(2, vciEntity.getVersionControlInformation().getVersion());
+
+        // Get an up-to-date copy of terminate1 because it should now have a non-null versioned component id
+        terminate1 = getNifiClient().getProcessorClient().getProcessor(terminate1.getId());
+        assertNotNull(terminate1.getComponent().getVersionedComponentId());
+
+        // Copy and paste the inner Process Group
+        final FlowEntity flowEntity = getClientUtil().copyAndPaste(inner1, outerGroup.getId());
+        final ProcessGroupEntity inner2Entity = flowEntity.getFlow().getProcessGroups().iterator().next();
+
+        final ProcessGroupFlowEntity inner2FlowEntity = getNifiClient().getFlowClient().getProcessGroup(inner2Entity.getId());
+        final Set<ProcessorEntity> inner2FlowProcessors = inner2FlowEntity.getProcessGroupFlow().getFlow().getProcessors();
+        assertEquals(1, inner2FlowProcessors.size());
+
+        ProcessorEntity terminate2 = inner2FlowProcessors.iterator().next();
+        assertEquals(terminate1.getComponent().getName(), terminate2.getComponent().getName());
+        assertEquals(terminate1.getComponent().getType(), terminate2.getComponent().getType());
+        assertNotEquals(terminate1.getComponent().getId(), terminate2.getComponent().getId());
+        assertNotEquals(terminate1.getComponent().getVersionedComponentId(), terminate2.getComponent().getVersionedComponentId());
+
+        // First Control again with the newly created components
+        vciEntity = getClientUtil().startVersionControl(outerGroup, clientEntity, "First Bucket", "First Flow");
+        assertEquals(3, vciEntity.getVersionControlInformation().getVersion());
+
+        // Get new version of terminate2 processor and terminate1 processor. Ensure that both have version control ID's but that they are different.
+        terminate1 = getNifiClient().getProcessorClient().getProcessor(terminate1.getId());
+        terminate2 = getNifiClient().getProcessorClient().getProcessor(terminate2.getId());
+
+        assertNotNull(terminate1.getComponent().getVersionedComponentId());
+        assertNotNull(terminate2.getComponent().getVersionedComponentId());
+        assertNotEquals(terminate1.getComponent().getVersionedComponentId(), terminate2.getComponent().getVersionedComponentId());
+    }
+
+    @Test
+    public void testCopyPasteProcessGroupUnderVersionControlMaintainsVersionedComponentId() throws NiFiClientException, IOException, InterruptedException {
+        // Create a top-level PG and version it with nothing in it.
+        final FlowRegistryClientEntity clientEntity = registerClient();
+        final ProcessGroupEntity topLevel1 = getClientUtil().createProcessGroup("Top Level 1", "root");
+
+        // Create a lower level PG and add a Processor.
+        // Commit as Version 2 of the group.
+        final ProcessGroupEntity innerGroup = getClientUtil().createProcessGroup("Inner 1", topLevel1.getId());
+        ProcessorEntity terminate1 = getClientUtil().createProcessor("TerminateFlowFile", innerGroup.getId());
+        VersionControlInformationEntity vciEntity = getClientUtil().startVersionControl(innerGroup, clientEntity, "First Bucket", "First Flow");
+        assertEquals(1, vciEntity.getVersionControlInformation().getVersion());
+
+        // Now that the inner group is under version control, copy it and paste it to a new PG.
+        // This should result in the pasted Process Group having a processor with the same Versioned Component ID, because the Processors
+        // have different Versioned groups, so they can have duplicate Versioned Component IDs.
+        final ProcessGroupEntity topLevel2 = getClientUtil().createProcessGroup("Top Level 2", "root");
+        final FlowEntity flowEntity = getClientUtil().copyAndPaste(innerGroup, topLevel2.getId());
+        final String pastedGroupId = flowEntity.getFlow().getProcessGroups().iterator().next().getId();
+        final ProcessGroupFlowEntity pastedGroupFlowEntity = getNifiClient().getFlowClient().getProcessGroup(pastedGroupId);
+        final ProcessorEntity terminate2 = pastedGroupFlowEntity.getProcessGroupFlow().getFlow().getProcessors().iterator().next();
+
+        // Get an up-to-date copy of terminate1 because it should now have a non-null versioned component id
+        terminate1 = getNifiClient().getProcessorClient().getProcessor(terminate1.getId());
+        assertNotNull(terminate1.getComponent().getVersionedComponentId());
+
+        // Both the pasted Process Group and the original should have the same Version Control Information.
+        final VersionControlInformationDTO originalGroupVci = getNifiClient().getProcessGroupClient().getProcessGroup(innerGroup.getId()).getComponent().getVersionControlInformation();
+        final VersionControlInformationDTO pastedGroupVci = getNifiClient().getProcessGroupClient().getProcessGroup(pastedGroupId).getComponent().getVersionControlInformation();
+        assertNotNull(originalGroupVci);
+        assertNotNull(pastedGroupVci);
+        assertEquals(originalGroupVci.getBucketId(), pastedGroupVci.getBucketId());
+        assertEquals(originalGroupVci.getFlowId(), pastedGroupVci.getFlowId());
+        assertEquals(originalGroupVci.getVersion(), pastedGroupVci.getVersion());
+
+        // Wait for the Version Control Information to show a state of UP_TO_DATE. We have to wait for this because it initially is set to SYNC_FAILURE and a background task
+        // is kicked off to determine the state.
+        waitFor(() -> VersionControlInformationDTO.UP_TO_DATE.equals(getVersionControlState(innerGroup.getId())) );
+        waitFor(() -> VersionControlInformationDTO.UP_TO_DATE.equals(getVersionControlState(pastedGroupId)) );
+
+        // The two processors should have the same Versioned Component ID
+        assertEquals(terminate1.getComponent().getName(), terminate2.getComponent().getName());
+        assertEquals(terminate1.getComponent().getType(), terminate2.getComponent().getType());
+        assertNotEquals(terminate1.getComponent().getId(), terminate2.getComponent().getId());
+        assertEquals(terminate1.getComponent().getVersionedComponentId(), terminate2.getComponent().getVersionedComponentId());
+    }
+
+    private String getVersionControlState(final String groupId) {
+        try {
+            final VersionControlInformationDTO vci = getNifiClient().getProcessGroupClient().getProcessGroup(groupId).getComponent().getVersionControlInformation();
+            return vci.getState();
+        } catch (final Exception e) {
+            Assertions.fail("Could not obtain Version Control Information for Group with ID " + groupId, e);
+            return null;
+        }
+    }
+}
diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/NiFiClientFactory.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/NiFiClientFactory.java
index 7a9f612c26..04e813845d 100644
--- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/NiFiClientFactory.java
+++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/NiFiClientFactory.java
@@ -40,6 +40,7 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.ProvenanceClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.RemoteProcessGroupClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.ReportingTasksClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.SnippetClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.TemplatesClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.TenantsClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.VersionsClient;
@@ -382,6 +383,16 @@ public class NiFiClientFactory implements ClientFactory<NiFiClient> {
             return wrappedClient.getAccessClient();
         }
 
+        @Override
+        public SnippetClient getSnippetClient() {
+            return wrappedClient.getSnippetClient();
+        }
+
+        @Override
+        public SnippetClient getSnippetClient(final RequestConfig requestConfig) {
+            return wrappedClient.getSnippetClient(requestConfig);
+        }
+
         @Override
         public void close() throws IOException {
             wrappedClient.close();
diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/NiFiClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/NiFiClient.java
index 92c42f51f8..47018b2ea2 100644
--- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/NiFiClient.java
+++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/NiFiClient.java
@@ -135,6 +135,12 @@ public interface NiFiClient extends Closeable {
 
     AccessClient getAccessClient();
 
+    // ----- SnippetClient -----
+
+    SnippetClient getSnippetClient();
+
+    SnippetClient getSnippetClient(RequestConfig requestConfig);
+
     /**
      * The builder interface that implementations should provide for obtaining the client.
      */
diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java
index 7204e3016a..5d954e3f0b 100644
--- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java
+++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java
@@ -18,6 +18,8 @@ package org.apache.nifi.toolkit.cli.impl.client.nifi;
 
 import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
+import org.apache.nifi.web.api.entity.FlowEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupImportEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupReplaceRequestEntity;
@@ -65,4 +67,7 @@ public interface ProcessGroupClient {
     ProcessGroupReplaceRequestEntity deleteProcessGroupReplaceRequest(String processGroupId, String requestId)
             throws NiFiClientException, IOException;
 
+    FlowEntity copySnippet(String processGroupId, CopySnippetRequestEntity copySnippetRequestEntity)
+        throws NiFiClientException, IOException;
+
 }
diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/VersionsClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/SnippetClient.java
similarity index 55%
copy from nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/VersionsClient.java
copy to nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/SnippetClient.java
index 61e782cbf2..37247a616f 100644
--- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/VersionsClient.java
+++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/SnippetClient.java
@@ -14,22 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.nifi.toolkit.cli.impl.client.nifi;
 
-import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
-import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity;
+import org.apache.nifi.web.api.entity.SnippetEntity;
 
 import java.io.IOException;
 
-public interface VersionsClient {
-
-    VersionControlInformationEntity getVersionControlInfo(String processGroupId) throws IOException, NiFiClientException;
-
-    VersionedFlowUpdateRequestEntity updateVersionControlInfo(String processGroupId, VersionControlInformationEntity entity)
-            throws IOException, NiFiClientException;
-
-    VersionedFlowUpdateRequestEntity getUpdateRequest(String updateRequestId) throws IOException, NiFiClientException;
-
-    VersionedFlowUpdateRequestEntity deleteUpdateRequest(String updateRequestId) throws IOException, NiFiClientException;
-
+public interface SnippetClient {
+    /**
+     * Creates the given snippet on the NiFi server
+     * @param snippet the snippet to create
+     * @return the created entity
+     */
+    SnippetEntity createSnippet(SnippetEntity snippet) throws NiFiClientException, IOException;
 }
diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/VersionsClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/VersionsClient.java
index 61e782cbf2..26ced81348 100644
--- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/VersionsClient.java
+++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/VersionsClient.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.toolkit.cli.impl.client.nifi;
 
+import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity;
 import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity;
 
@@ -32,4 +33,12 @@ public interface VersionsClient {
 
     VersionedFlowUpdateRequestEntity deleteUpdateRequest(String updateRequestId) throws IOException, NiFiClientException;
 
+    VersionControlInformationEntity startVersionControl(String processGroupId, StartVersionControlRequestEntity startVersionControlRequestEntity) throws IOException, NiFiClientException;
+
+    VersionedFlowUpdateRequestEntity initiateRevertFlowVersion(String processGroupId, VersionControlInformationEntity versionControlInformation) throws IOException, NiFiClientException;
+
+    VersionedFlowUpdateRequestEntity getRevertFlowVersionRequest(String requestId) throws IOException, NiFiClientException;
+
+    VersionedFlowUpdateRequestEntity deleteRevertFlowVersionRequest(String requestId) throws IOException, NiFiClientException;
+
 }
diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyNiFiClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyNiFiClient.java
index 3f0e10325a..edc153cb74 100644
--- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyNiFiClient.java
+++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyNiFiClient.java
@@ -41,6 +41,7 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.ProvenanceClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.RemoteProcessGroupClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.ReportingTasksClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.SnippetClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.TemplatesClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.TenantsClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.VersionsClient;
@@ -302,6 +303,16 @@ public class JerseyNiFiClient implements NiFiClient {
         return new JerseyAccessClient(baseTarget);
     }
 
+    @Override
+    public SnippetClient getSnippetClient() {
+        return new JerseySnippetClient(baseTarget);
+    }
+
+    @Override
+    public SnippetClient getSnippetClient(final RequestConfig requestConfig) {
+        return new JerseySnippetClient(baseTarget, requestConfig);
+    }
+
     @Override
     public void close() {
         if (this.client != null) {
diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java
index 6ac9d689e1..5bf9b5fccc 100644
--- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java
+++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java
@@ -22,6 +22,8 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessGroupClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig;
 import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
+import org.apache.nifi.web.api.entity.FlowEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupImportEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupReplaceRequestEntity;
@@ -309,4 +311,26 @@ public class JerseyProcessGroupClient extends AbstractJerseyClient implements Pr
             return getRequestBuilder(target).delete(ProcessGroupReplaceRequestEntity.class);
         });
     }
+
+    @Override
+    public FlowEntity copySnippet(final String processGroupId, final CopySnippetRequestEntity copySnippetRequestEntity) throws NiFiClientException, IOException {
+        if (StringUtils.isBlank(processGroupId)) {
+            throw new IllegalArgumentException("Process group id cannot be null or blank");
+        }
+
+        if (copySnippetRequestEntity == null) {
+            throw new IllegalArgumentException("Snippet Request Entity cannot be null");
+        }
+
+        return executeAction("Error copying snippet to Process Group", () -> {
+            final WebTarget target = processGroupsTarget
+                .path("{id}/snippet-instance")
+                .resolveTemplate("id", processGroupId);
+
+            return getRequestBuilder(target).post(
+                Entity.entity(copySnippetRequestEntity, MediaType.APPLICATION_JSON_TYPE),
+                FlowEntity.class);
+        });
+    }
+
 }
diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseySnippetClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseySnippetClient.java
new file mode 100644
index 0000000000..edefba7aab
--- /dev/null
+++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseySnippetClient.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.toolkit.cli.impl.client.nifi.impl;
+
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.SnippetClient;
+import org.apache.nifi.web.api.entity.SnippetEntity;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+
+public class JerseySnippetClient extends AbstractJerseyClient implements SnippetClient {
+    private final WebTarget snippetTarget;
+
+    public JerseySnippetClient(final WebTarget baseTarget) {
+        this(baseTarget, null);
+    }
+
+    public JerseySnippetClient(final WebTarget baseTarget, final RequestConfig requestConfig) {
+        super(requestConfig);
+        this.snippetTarget = baseTarget.path("/snippets");
+    }
+
+    @Override
+    public SnippetEntity createSnippet(final SnippetEntity snippet) throws NiFiClientException, IOException {
+        if (snippet == null) {
+            throw new IllegalArgumentException("Snippet entity cannot be null");
+        }
+
+        return executeAction("Error creating snippet", () ->
+            getRequestBuilder(snippetTarget).post(
+                Entity.entity(snippet, MediaType.APPLICATION_JSON),
+                SnippetEntity.class
+            ));
+    }
+}
diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyVersionsClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyVersionsClient.java
index b145b38307..4949ed1f32 100644
--- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyVersionsClient.java
+++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyVersionsClient.java
@@ -20,6 +20,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.VersionsClient;
+import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity;
 import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity;
 
@@ -119,4 +120,82 @@ public class JerseyVersionsClient extends AbstractJerseyClient implements Versio
             return getRequestBuilder(target).delete(VersionedFlowUpdateRequestEntity.class);
         });
     }
+
+    @Override
+    public VersionControlInformationEntity startVersionControl(final String processGroupId, final StartVersionControlRequestEntity startVersionControlRequestEntity)
+                throws IOException, NiFiClientException {
+
+        if (startVersionControlRequestEntity == null) {
+            throw new IllegalArgumentException("Request Entity cannot be null");
+        }
+
+        return executeAction("Error starting version control", () -> {
+            final WebTarget target = versionsTarget
+                .path("process-groups/{id}")
+                .resolveTemplate("id", processGroupId);
+
+            return getRequestBuilder(target).post(Entity.entity(startVersionControlRequestEntity, MediaType.APPLICATION_JSON_TYPE),
+                VersionControlInformationEntity.class);
+        });
+    }
+
+    // POST /versions/revert-requests/process-groups/id
+
+    @Override
+    public VersionedFlowUpdateRequestEntity initiateRevertFlowVersion(final String processGroupId, final VersionControlInformationEntity versionControlInformation)
+        throws IOException, NiFiClientException {
+        if (StringUtils.isBlank(processGroupId)) {
+            throw new IllegalArgumentException("Process group id cannot be null or blank");
+        }
+
+        if (versionControlInformation == null) {
+            throw new IllegalArgumentException("Version control information entity cannot be null");
+        }
+
+        return executeAction("Error reverting flow version", () -> {
+            final WebTarget target = versionsTarget
+                .path("revert-requests/process-groups/{id}")
+                .resolveTemplate("id", processGroupId);
+
+            return getRequestBuilder(target).post(
+                Entity.entity(versionControlInformation, MediaType.APPLICATION_JSON_TYPE),
+                VersionedFlowUpdateRequestEntity.class
+            );
+        });
+    }
+
+    // GET /versions/revert-requests/process-groups/id
+
+    @Override
+    public VersionedFlowUpdateRequestEntity getRevertFlowVersionRequest(final String requestId) throws IOException, NiFiClientException {
+        if (StringUtils.isBlank(requestId)) {
+            throw new IllegalArgumentException("Update request id cannot be null or blank");
+        }
+
+        return executeAction("Error getting revert request", () -> {
+            final WebTarget target = versionsTarget
+                .path("revert-requests/{id}")
+                .resolveTemplate("id", requestId);
+
+            return getRequestBuilder(target).get(VersionedFlowUpdateRequestEntity.class);
+        });
+
+    }
+
+    // DELETE /versions/revert-requests/process-groups/id
+
+    @Override
+    public VersionedFlowUpdateRequestEntity deleteRevertFlowVersionRequest(final String requestId) throws IOException, NiFiClientException {
+        if (StringUtils.isBlank(requestId)) {
+            throw new IllegalArgumentException("Update request id cannot be null or blank");
+        }
+
+        return executeAction("Error deleting revert request", () -> {
+            final WebTarget target = versionsTarget
+                .path("revert-requests/{id}")
+                .resolveTemplate("id", requestId);
+
+            return getRequestBuilder(target).delete(VersionedFlowUpdateRequestEntity.class);
+        });
+    }
 }