You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:13:58 UTC

[10/50] nifi git commit: NIFI-4436: - Initial checkpoint: able ot start version control and detect changes, in standalone mode, still 'crude' implementation - Checkpoint: Can place flow under version control and can determine if modified - Checkpoint: Ch

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
new file mode 100644
index 0000000..4b87b50
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.concurrent;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.web.ResourceNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AsyncRequestManager<T> implements RequestManager<T> {
+    private static final Logger logger = LoggerFactory.getLogger(AsyncRequestManager.class);
+
+    private final long requestExpirationMillis;
+    private final int maxConcurrentRequests;
+    private final ConcurrentMap<String, AsynchronousWebRequest<T>> requests = new ConcurrentHashMap<>();
+
+    private final ExecutorService threadPool;
+
+
+    public AsyncRequestManager(final int maxConcurrentRequests, final long requestExpirationMillis, final String threadNamePrefix) {
+        this.requestExpirationMillis = requestExpirationMillis;
+        this.maxConcurrentRequests = maxConcurrentRequests;
+
+        this.threadPool = new ThreadPoolExecutor(1, 50, 5L, TimeUnit.SECONDS,
+            new ArrayBlockingQueue<Runnable>(maxConcurrentRequests),
+            new ThreadFactory() {
+                private final AtomicLong counter = new AtomicLong(0L);
+
+                @Override
+                public Thread newThread(final Runnable r) {
+                    final Thread thread = Executors.defaultThreadFactory().newThread(r);
+                    thread.setName(threadNamePrefix + "-" + counter.incrementAndGet());
+                    thread.setDaemon(true);
+                    return thread;
+                }
+            });
+
+    }
+
+    private String getKey(final String type, final String request) {
+        return type + "/" + request;
+    }
+
+    @Override
+    public void submitRequest(final String type, final String requestId, final AsynchronousWebRequest<T> request, final Consumer<AsynchronousWebRequest<T>> task) {
+        Objects.requireNonNull(type);
+        Objects.requireNonNull(requestId);
+        Objects.requireNonNull(request);
+        Objects.requireNonNull(task);
+
+        // before adding to the request map, purge any old requests. Must do this by creating a List of ID's
+        // and then removing those ID's one-at-a-time in order to avoid ConcurrentModificationException.
+        final Date oneMinuteAgo = new Date(System.currentTimeMillis() - requestExpirationMillis);
+        final List<String> completedRequestIds = requests.entrySet().stream()
+            .filter(entry -> entry.getValue().isComplete())
+            .filter(entry -> entry.getValue().getLastUpdated().before(oneMinuteAgo))
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toList());
+
+        completedRequestIds.stream().forEach(id -> requests.remove(id));
+
+        final int requestCount = requests.size();
+        if (requestCount > maxConcurrentRequests) {
+            throw new IllegalStateException("There are already " + requestCount + " update requests for variable registries. "
+                + "Cannot issue any more requests until the older ones are deleted or expire");
+        }
+
+        final String key = getKey(type, requestId);
+        final AsynchronousWebRequest<T> existing = this.requests.putIfAbsent(key, request);
+        if (existing != null) {
+            throw new IllegalArgumentException("A requests already exists with this ID and type");
+        }
+
+        threadPool.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    task.accept(request);
+                } catch (final Exception e) {
+                    logger.error("Failed to perform asynchronous task", e);
+                    request.setFailureReason("Encountered unexpected error when performing asynchronous task: " + e);
+                    request.setLastUpdated(new Date());
+                }
+            }
+        });
+    }
+
+
+    @Override
+    public AsynchronousWebRequest<T> removeRequest(final String type, final String id, final NiFiUser user) {
+        Objects.requireNonNull(type);
+        Objects.requireNonNull(id);
+        Objects.requireNonNull(user);
+
+        final String key = getKey(type, id);
+        final AsynchronousWebRequest<T> request = requests.get(key);
+        if (request == null) {
+            throw new ResourceNotFoundException("Could not find a Request with identifier " + id);
+        }
+
+        if (!request.getUser().equals(user)) {
+            throw new IllegalArgumentException("Only the user that submitted the update request can delete it.");
+        }
+
+        if (!request.isComplete()) {
+            throw new IllegalStateException("Cannot remove the request because it is not yet complete");
+        }
+
+        return requests.remove(key);
+    }
+
+    @Override
+    public AsynchronousWebRequest<T> getRequest(final String type, final String id, final NiFiUser user) {
+        Objects.requireNonNull(type);
+        Objects.requireNonNull(id);
+        Objects.requireNonNull(user);
+
+        final String key = getKey(type, id);
+        final AsynchronousWebRequest<T> request = requests.get(key);
+        if (request == null) {
+            throw new ResourceNotFoundException("Could not find a Request with identifier " + id);
+        }
+
+        if (!request.getUser().equals(user)) {
+            throw new IllegalArgumentException("Only the user that submitted the update request can delete it.");
+        }
+
+        return request;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
new file mode 100644
index 0000000..d09f895
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.concurrent;
+
+import java.util.Date;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+
+public interface AsynchronousWebRequest<T> {
+
+    /**
+     * @return the ID of the process group that the request is for
+     */
+    String getProcessGroupId();
+
+    /**
+     * @return whether or not this request has completed
+     */
+    boolean isComplete();
+
+    /**
+     * @return the Date at which the status of this request was last updated
+     */
+    Date getLastUpdated();
+
+    /**
+     * Updates the Date at which the status of this request was last updated
+     *
+     * @param date the date at which the status of this request was last updated
+     */
+    void setLastUpdated(Date date);
+
+    /**
+     * @return the user who submitted the request
+     */
+    NiFiUser getUser();
+
+    /**
+     * Indicates that this request has completed, successfully or otherwise
+     *
+     * @param results the results of the request
+     */
+    void markComplete(T results);
+
+    /**
+     * Updates the request to indicate the reason that the request failed
+     *
+     * @param explanation the reason that the request failed
+     */
+    void setFailureReason(String explanation);
+
+    /**
+     * Indicates the reason that the request failed, or <code>null</code> if the request has not failed
+     *
+     * @param explanation the reason that the request failed, or <code>null</code> if the request has not failed
+     */
+    String getFailureReason();
+
+    /**
+     * Returns the results of the request, if it completed successfully, or <code>null</code> if the request either has no completed or failed
+     *
+     * @return the results of the request, if it completed successfully, or <code>null</code> if the request either has no completed or failed
+     */
+    T getResults();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/RequestManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/RequestManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/RequestManager.java
new file mode 100644
index 0000000..580ab47
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/RequestManager.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.concurrent;
+
+import java.util.function.Consumer;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+
+public interface RequestManager<T> {
+
+    /**
+     * Submits a request to be performed in the background
+     *
+     * @param requestType the type of request to submit. This value can be anything and is used along with the id in order to create
+     *            a composite key for the request so that different request types may easily be managed by the RequestManager.
+     * @param id the ID of the request
+     * @param request the request
+     * @param task the task that should be performed in the background
+     *
+     * @throws IllegalArgumentException if a request already exists with the given ID
+     * @throws NullPointerException if any argument is null
+     */
+    void submitRequest(String requestType, String id, AsynchronousWebRequest<T> request, Consumer<AsynchronousWebRequest<T>> task);
+
+    /**
+     * Retrieves the request with the given ID
+     *
+     * @param requestType the type of the request being retrieved
+     * @param id the ID of the request
+     * @param user the user who is retrieving the request
+     * @return the request with the given ID
+     *
+     * @throws ResourceNotFoundException if no request can be found with the given ID
+     * @throws IllegalArgumentException if the user given is not the user that submitted the request
+     * @throws NullPointerException if either the ID or the user is null
+     */
+    AsynchronousWebRequest<T> getRequest(String requestType, String id, NiFiUser user);
+
+    /**
+     * Removes the request with the given ID
+     *
+     * @param requestType the type of the request being removed
+     * @param id the ID of the request
+     * @param user the user who is retrieving the request
+     * @return the request with the given ID
+     *
+     * @throws ResourceNotFoundException if no request can be found with the given ID
+     * @throws IllegalArgumentException if the user given is not the user that submitted the request
+     * @throws IllegalStateException if the request with the given ID is not yet complete
+     * @throws NullPointerException if either the ID or the user is null
+     */
+    AsynchronousWebRequest<T> removeRequest(String requestType, String id, NiFiUser user);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
new file mode 100644
index 0000000..8ba9a58
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.concurrent;
+
+import java.util.Date;
+import java.util.Objects;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+
+public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest<T> {
+    private final String id;
+    private final String processGroupId;
+    private final NiFiUser user;
+
+    private volatile boolean complete = false;
+    private volatile Date lastUpdated = new Date();
+    private volatile String failureReason;
+    private volatile T results;
+
+    public StandardAsynchronousWebRequest(final String requestId, final String processGroupId, final NiFiUser user) {
+        this.id = requestId;
+        this.processGroupId = processGroupId;
+        this.user = user;
+    }
+
+    public String getRequestId() {
+        return id;
+    }
+
+    @Override
+    public boolean isComplete() {
+        return complete;
+    }
+
+    @Override
+    public String getProcessGroupId() {
+        return processGroupId;
+    }
+
+    @Override
+    public void markComplete(final T results) {
+        this.complete = true;
+        this.results = results;
+        this.lastUpdated = new Date();
+    }
+
+    @Override
+    public Date getLastUpdated() {
+        return lastUpdated;
+    }
+
+    @Override
+    public void setLastUpdated(final Date date) {
+        this.lastUpdated = lastUpdated;
+    }
+
+    @Override
+    public NiFiUser getUser() {
+        return user;
+    }
+
+    @Override
+    public void setFailureReason(final String explanation) {
+        this.failureReason = Objects.requireNonNull(explanation);
+        this.complete = true;
+        this.results = null;
+    }
+
+    @Override
+    public String getFailureReason() {
+        return failureReason;
+    }
+
+    @Override
+    public T getResults() {
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 58abdea..489e590 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -16,6 +16,33 @@
  */
 package org.apache.nifi.web.api.dto;
 
+import java.text.Collator;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
@@ -112,6 +139,15 @@ import org.apache.nifi.provenance.lineage.LineageEdge;
 import org.apache.nifi.provenance.lineage.LineageNode;
 import org.apache.nifi.provenance.lineage.ProvenanceEventLineageNode;
 import org.apache.nifi.registry.ComponentVariableRegistry;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedLabel;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedPort;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteProcessGroup;
 import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
 import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
 import org.apache.nifi.remote.RemoteGroupPort;
@@ -160,42 +196,19 @@ import org.apache.nifi.web.api.entity.AllowableValueEntity;
 import org.apache.nifi.web.api.entity.BulletinEntity;
 import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
 import org.apache.nifi.web.api.entity.ConnectionStatusSnapshotEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.FlowBreadcrumbEntity;
 import org.apache.nifi.web.api.entity.PortStatusSnapshotEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
 import org.apache.nifi.web.api.entity.ProcessorStatusSnapshotEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusSnapshotEntity;
 import org.apache.nifi.web.api.entity.TenantEntity;
 import org.apache.nifi.web.api.entity.VariableEntity;
 import org.apache.nifi.web.controller.ControllerFacade;
 import org.apache.nifi.web.revision.RevisionManager;
 
-import javax.ws.rs.WebApplicationException;
-import java.text.Collator;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
 public final class DtoFactory {
 
     @SuppressWarnings("rawtypes")
@@ -616,6 +629,7 @@ public final class DtoFactory {
         dto.setzIndex(connection.getZIndex());
         dto.setSource(createConnectableDto(connection.getSource()));
         dto.setDestination(createConnectableDto(connection.getDestination()));
+        dto.setVersionedComponentId(connection.getVersionedComponentId().orElse(null));
 
         dto.setBackPressureObjectThreshold(connection.getFlowFileQueue().getBackPressureObjectThreshold());
         dto.setBackPressureDataSizeThreshold(connection.getFlowFileQueue().getBackPressureDataSizeThreshold());
@@ -667,6 +681,7 @@ public final class DtoFactory {
         dto.setId(connectable.getIdentifier());
         dto.setName(isAuthorized ? connectable.getName() : connectable.getIdentifier());
         dto.setType(connectable.getConnectableType().name());
+        dto.setVersionedComponentId(connectable.getVersionedComponentId().orElse(null));
 
         if (connectable instanceof RemoteGroupPort) {
             final RemoteGroupPort remoteGroupPort = (RemoteGroupPort) connectable;
@@ -708,6 +723,7 @@ public final class DtoFactory {
         dto.setWidth(label.getSize().getWidth());
         dto.setLabel(label.getValue());
         dto.setParentGroupId(label.getProcessGroup().getIdentifier());
+        dto.setVersionedComponentId(label.getVersionedComponentId().orElse(null));
 
         return dto;
     }
@@ -824,6 +840,7 @@ public final class DtoFactory {
         dto.setId(funnel.getIdentifier());
         dto.setPosition(createPositionDto(funnel.getPosition()));
         dto.setParentGroupId(funnel.getProcessGroup().getIdentifier());
+        dto.setVersionedComponentId(funnel.getVersionedComponentId().orElse(null));
 
         return dto;
     }
@@ -1228,6 +1245,7 @@ public final class DtoFactory {
         dto.setParentGroupId(port.getProcessGroup().getIdentifier());
         dto.setState(port.getScheduledState().toString());
         dto.setType(port.getConnectableType().name());
+        dto.setVersionedComponentId(port.getVersionedComponentId().orElse(null));
 
         // if this port is on the root group, determine if its actually connected to another nifi
         if (port instanceof RootGroupPort) {
@@ -1354,6 +1372,7 @@ public final class DtoFactory {
         dto.setDeprecated(controllerServiceNode.isDeprecated());
         dto.setExtensionMissing(controllerServiceNode.isExtensionMissing());
         dto.setMultipleVersionsAvailable(compatibleBundles.size() > 1);
+        dto.setVersionedComponentId(controllerServiceNode.getVersionedComponentId().orElse(null));
 
         // sort a copy of the properties
         final Map<PropertyDescriptor, String> sortedProperties = new TreeMap<>(new Comparator<PropertyDescriptor>() {
@@ -1511,6 +1530,7 @@ public final class DtoFactory {
         dto.setConcurrentlySchedulableTaskCount(port.getMaxConcurrentTasks());
         dto.setUseCompression(port.isUseCompression());
         dto.setExists(port.getTargetExists());
+        dto.setVersionedComponentId(port.getVersionedComponentId().orElse(null));
 
         final BatchSettingsDTO batchDTO = new BatchSettingsDTO();
         batchDTO.setCount(port.getBatchCount());
@@ -1619,6 +1639,7 @@ public final class DtoFactory {
         dto.setInactiveRemoteInputPortCount(inactiveRemoteInputPortCount);
         dto.setActiveRemoteOutputPortCount(activeRemoteOutputPortCount);
         dto.setInactiveRemoteOutputPortCount(inactiveRemoteOutputPortCount);
+        dto.setVersionedComponentId(group.getVersionedComponentId().orElse(null));
 
         final ProcessGroupCounts counts = group.getCounts();
         if (counts != null) {
@@ -1679,6 +1700,7 @@ public final class DtoFactory {
         dto.setId(componentAuthorizable.getIdentifier());
         dto.setParentGroupId(componentAuthorizable.getProcessGroupIdentifier());
         dto.setName(authorizable.getResource().getName());
+
         return dto;
     }
 
@@ -1738,6 +1760,81 @@ public final class DtoFactory {
         return dto;
     }
 
+    public AffectedComponentEntity createAffectedComponentEntity(final ProcessorEntity processorEntity) {
+        if (processorEntity == null) {
+            return null;
+        }
+
+        final AffectedComponentEntity component = new AffectedComponentEntity();
+        component.setBulletins(processorEntity.getBulletins());
+        component.setId(processorEntity.getId());
+        component.setPermissions(processorEntity.getPermissions());
+        component.setPosition(processorEntity.getPosition());
+        component.setRevision(processorEntity.getRevision());
+        component.setUri(processorEntity.getUri());
+
+        final ProcessorDTO processorDto = processorEntity.getComponent();
+        final AffectedComponentDTO componentDto = new AffectedComponentDTO();
+        componentDto.setId(processorDto.getId());
+        componentDto.setName(processorDto.getName());
+        componentDto.setProcessGroupId(processorDto.getParentGroupId());
+        componentDto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+        componentDto.setState(processorDto.getState());
+        componentDto.setValidationErrors(processorDto.getValidationErrors());
+        component.setComponent(componentDto);
+
+        return component;
+    }
+
+    public AffectedComponentEntity createAffectedComponentEntity(final ControllerServiceEntity serviceEntity) {
+        if (serviceEntity == null) {
+            return null;
+        }
+
+        final AffectedComponentEntity component = new AffectedComponentEntity();
+        component.setBulletins(serviceEntity.getBulletins());
+        component.setId(serviceEntity.getId());
+        component.setPermissions(serviceEntity.getPermissions());
+        component.setPosition(serviceEntity.getPosition());
+        component.setRevision(serviceEntity.getRevision());
+        component.setUri(serviceEntity.getUri());
+
+        final ControllerServiceDTO serviceDto = serviceEntity.getComponent();
+        final AffectedComponentDTO componentDto = new AffectedComponentDTO();
+        componentDto.setId(serviceDto.getId());
+        componentDto.setName(serviceDto.getName());
+        componentDto.setProcessGroupId(serviceDto.getParentGroupId());
+        componentDto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+        componentDto.setState(serviceDto.getState());
+        componentDto.setValidationErrors(serviceDto.getValidationErrors());
+        component.setComponent(componentDto);
+
+        return component;
+    }
+
+    public AffectedComponentEntity createAffectedComponentEntity(final RemoteProcessGroupPortDTO remotePortDto, final String referenceType, final RemoteProcessGroupEntity rpgEntity) {
+        if (remotePortDto == null) {
+            return null;
+        }
+
+        final AffectedComponentEntity component = new AffectedComponentEntity();
+        component.setId(remotePortDto.getId());
+        component.setPermissions(rpgEntity.getPermissions());
+        component.setRevision(rpgEntity.getRevision());
+        component.setUri(rpgEntity.getUri());
+
+        final AffectedComponentDTO componentDto = new AffectedComponentDTO();
+        componentDto.setId(remotePortDto.getId());
+        componentDto.setName(remotePortDto.getName());
+        componentDto.setProcessGroupId(remotePortDto.getGroupId());
+        componentDto.setReferenceType(referenceType);
+        componentDto.setState(remotePortDto.isTransmitting() ? "Running" : "Stopped");
+        component.setComponent(componentDto);
+
+        return component;
+    }
+
+
     public AffectedComponentDTO createAffectedComponentDto(final ConfiguredComponent component) {
         final AffectedComponentDTO dto = new AffectedComponentDTO();
         dto.setId(component.getIdentifier());
@@ -2047,6 +2144,8 @@ public final class DtoFactory {
         dto.setPosition(createPositionDto(group.getPosition()));
         dto.setComments(group.getComments());
         dto.setName(group.getName());
+        dto.setVersionedComponentId(group.getVersionedComponentId().orElse(null));
+        dto.setVersionControlInformation(createVersionControlInformationDto(group.getVersionControlInformation()));
 
         final Map<String, String> variables = group.getVariableRegistry().getVariableMap().entrySet().stream()
             .collect(Collectors.toMap(entry -> entry.getKey().getName(), entry -> entry.getValue()));
@@ -2070,6 +2169,68 @@ public final class DtoFactory {
         return dto;
     }
 
+    public VersionControlInformationDTO createVersionControlInformationDto(final VersionControlInformation versionControlInfo) {
+        if (versionControlInfo == null) {
+            return null;
+        }
+
+        final VersionControlInformationDTO dto = new VersionControlInformationDTO();
+        dto.setRegistryId(versionControlInfo.getRegistryIdentifier());
+        dto.setBucketId(versionControlInfo.getBucketIdentifier());
+        dto.setFlowId(versionControlInfo.getFlowIdentifier());
+        dto.setVersion(versionControlInfo.getVersion());
+        dto.setCurrent(versionControlInfo.getCurrent().orElse(null));
+        dto.setModified(versionControlInfo.getModified().orElse(null));
+        return dto;
+    }
+
+    public Map<String, String> createVersionControlComponentMappingDto(final InstantiatedVersionedProcessGroup group) {
+        final Map<String, String> mapping = new HashMap<>();
+
+        mapping.put(group.getInstanceId(), group.getIdentifier());
+        group.getProcessors().stream()
+            .map(proc -> (InstantiatedVersionedProcessor) proc)
+            .forEach(proc -> mapping.put(proc.getInstanceId(), proc.getIdentifier()));
+        group.getInputPorts().stream()
+            .map(port -> (InstantiatedVersionedPort) port)
+            .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier()));
+        group.getOutputPorts().stream()
+            .map(port -> (InstantiatedVersionedPort) port)
+            .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier()));
+        group.getControllerServices().stream()
+            .map(service -> (InstantiatedVersionedControllerService) service)
+            .forEach(service -> mapping.put(service.getInstanceId(), service.getIdentifier()));
+        group.getLabels().stream()
+            .map(label -> (InstantiatedVersionedLabel) label)
+            .forEach(label -> mapping.put(label.getInstanceId(), label.getIdentifier()));
+        group.getConnections().stream()
+            .map(conn -> (InstantiatedVersionedConnection) conn)
+            .forEach(conn -> mapping.put(conn.getInstanceId(), conn.getIdentifier()));
+        group.getRemoteProcessGroups().stream()
+            .map(rpg -> (InstantiatedVersionedRemoteProcessGroup) rpg)
+            .forEach(rpg -> {
+                mapping.put(rpg.getInstanceId(), rpg.getIdentifier());
+
+                rpg.getInputPorts().stream()
+                    .map(port -> (InstantiatedVersionedRemoteGroupPort) port)
+                    .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier()));
+
+                rpg.getOutputPorts().stream()
+                    .map(port -> (InstantiatedVersionedRemoteGroupPort) port)
+                    .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier()));
+            });
+
+        group.getProcessGroups().stream()
+            .map(child -> (InstantiatedVersionedProcessGroup) child)
+            .forEach(child -> {
+                final Map<String, String> childMapping = createVersionControlComponentMappingDto(child);
+                mapping.putAll(childMapping);
+            });
+
+        return mapping;
+    }
+
+
     /**
      * Creates a ProcessGroupContentDTO from the specified ProcessGroup.
      *
@@ -2418,6 +2579,7 @@ public final class DtoFactory {
         dto.setDeprecated(node.isDeprecated());
         dto.setExtensionMissing(node.isExtensionMissing());
         dto.setMultipleVersionsAvailable(compatibleBundles.size() > 1);
+        dto.setVersionedComponentId(node.getVersionedComponentId().orElse(null));
 
         dto.setType(node.getCanonicalClassName());
         dto.setBundle(createBundleDto(bundleCoordinate));
@@ -2989,6 +3151,7 @@ public final class DtoFactory {
         copy.setPosition(original.getPosition());
         copy.setWidth(original.getWidth());
         copy.setHeight(original.getHeight());
+        copy.setVersionedComponentId(original.getVersionedComponentId());
 
         return copy;
     }
@@ -3012,6 +3175,7 @@ public final class DtoFactory {
         copy.setMultipleVersionsAvailable(original.getMultipleVersionsAvailable());
         copy.setPersistsState(original.getPersistsState());
         copy.setValidationErrors(copy(original.getValidationErrors()));
+        copy.setVersionedComponentId(original.getVersionedComponentId());
         return copy;
     }
 
@@ -3020,6 +3184,7 @@ public final class DtoFactory {
         copy.setId(original.getId());
         copy.setParentGroupId(original.getParentGroupId());
         copy.setPosition(original.getPosition());
+        copy.setVersionedComponentId(original.getVersionedComponentId());
 
         return copy;
     }
@@ -3088,6 +3253,7 @@ public final class DtoFactory {
         copy.setExtensionMissing(original.getExtensionMissing());
         copy.setMultipleVersionsAvailable(original.getMultipleVersionsAvailable());
         copy.setValidationErrors(copy(original.getValidationErrors()));
+        copy.setVersionedComponentId(original.getVersionedComponentId());
 
         return copy;
     }
@@ -3132,6 +3298,7 @@ public final class DtoFactory {
         copy.setzIndex(original.getzIndex());
         copy.setLabelIndex(original.getLabelIndex());
         copy.setBends(copy(original.getBends()));
+        copy.setVersionedComponentId(original.getVersionedComponentId());
 
         return copy;
     }
@@ -3164,6 +3331,7 @@ public final class DtoFactory {
         copy.setUserAccessControl(copy(original.getUserAccessControl()));
         copy.setGroupAccessControl(copy(original.getGroupAccessControl()));
         copy.setValidationErrors(copy(original.getValidationErrors()));
+        copy.setVersionedComponentId(original.getVersionedComponentId());
         return copy;
     }
 
@@ -3180,6 +3348,8 @@ public final class DtoFactory {
         copy.setConcurrentlySchedulableTaskCount(original.getConcurrentlySchedulableTaskCount());
         copy.setUseCompression(original.getUseCompression());
         copy.setExists(original.getExists());
+        copy.setVersionedComponentId(original.getVersionedComponentId());
+
         final BatchSettingsDTO batchOrg = original.getBatchSettings();
         if (batchOrg != null) {
             final BatchSettingsDTO batchCopy = new BatchSettingsDTO();
@@ -3199,8 +3369,10 @@ public final class DtoFactory {
         copy.setInputPortCount(original.getInputPortCount());
         copy.setInvalidCount(original.getInvalidCount());
         copy.setName(original.getName());
+        copy.setVersionControlInformation(copy(original.getVersionControlInformation()));
         copy.setOutputPortCount(original.getOutputPortCount());
         copy.setParentGroupId(original.getParentGroupId());
+        copy.setVersionedComponentId(original.getVersionedComponentId());
 
         copy.setRunningCount(original.getRunningCount());
         copy.setStoppedCount(original.getStoppedCount());
@@ -3215,6 +3387,21 @@ public final class DtoFactory {
         return copy;
     }
 
+    public VersionControlInformationDTO copy(final VersionControlInformationDTO original) {
+        if (original == null) {
+            return null;
+        }
+
+        final VersionControlInformationDTO copy = new VersionControlInformationDTO();
+        copy.setRegistryId(original.getRegistryId());
+        copy.setBucketId(original.getBucketId());
+        copy.setFlowId(original.getFlowId());
+        copy.setVersion(original.getVersion());
+        copy.setCurrent(original.getCurrent());
+        copy.setModified(original.getModified());
+        return copy;
+    }
+
     public RemoteProcessGroupDTO copy(final RemoteProcessGroupDTO original) {
         final RemoteProcessGroupContentsDTO originalContents = original.getContents();
         final RemoteProcessGroupContentsDTO copyContents = new RemoteProcessGroupContentsDTO();
@@ -3256,6 +3443,7 @@ public final class DtoFactory {
         copy.setProxyUser(original.getProxyUser());
         copy.setProxyPassword(original.getProxyPassword());
         copy.setLocalNetworkInterface(original.getLocalNetworkInterface());
+        copy.setVersionedComponentId(original.getVersionedComponentId());
 
         copy.setContents(copyContents);
 
@@ -3268,6 +3456,7 @@ public final class DtoFactory {
         connectable.setId(port.getId());
         connectable.setName(port.getName());
         connectable.setType(type.name());
+        connectable.setVersionedComponentId(port.getVersionedComponentId());
         return connectable;
     }
 
@@ -3277,6 +3466,7 @@ public final class DtoFactory {
         connectable.setId(processor.getId());
         connectable.setName(processor.getName());
         connectable.setType(ConnectableType.PROCESSOR.name());
+        connectable.setVersionedComponentId(processor.getVersionedComponentId());
         return connectable;
     }
 
@@ -3285,6 +3475,7 @@ public final class DtoFactory {
         connectable.setGroupId(funnel.getParentGroupId());
         connectable.setId(funnel.getId());
         connectable.setType(ConnectableType.FUNNEL.name());
+        connectable.setVersionedComponentId(funnel.getVersionedComponentId());
         return connectable;
     }
 
@@ -3294,6 +3485,7 @@ public final class DtoFactory {
         connectable.setId(remoteGroupPort.getId());
         connectable.setName(remoteGroupPort.getName());
         connectable.setType(type.name());
+        connectable.setVersionedComponentId(connectable.getVersionedComponentId());
         return connectable;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
index 16781c6..dd8d67f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
@@ -67,6 +67,7 @@ import org.apache.nifi.web.api.entity.TenantEntity;
 import org.apache.nifi.web.api.entity.UserEntity;
 import org.apache.nifi.web.api.entity.UserGroupEntity;
 import org.apache.nifi.web.api.entity.VariableRegistryEntity;
+import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
 
 import java.util.Date;
 import java.util.List;
@@ -537,4 +538,11 @@ public final class EntityFactory {
         }
         return entity;
     }
+
+    public VersionControlInformationEntity createVersionControlInformationEntity(final VersionControlInformationDTO dto, final RevisionDTO processGroupRevision) {
+        final VersionControlInformationEntity entity = new VersionControlInformationEntity();
+        entity.setVersionControlInformation(dto);
+        entity.setProcessGroupRevision(processGroupRevision);
+        return entity;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index a4e8000..615f00b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -1599,6 +1599,7 @@ public class ControllerFacade implements Authorizable {
         final List<String> matches = new ArrayList<>();
 
         addIfAppropriate(searchStr, port.getIdentifier(), "Id", matches);
+        addIfAppropriate(searchStr, port.getVersionedComponentId().orElse(null), "Version Control ID", matches);
         addIfAppropriate(searchStr, port.getName(), "Name", matches);
         addIfAppropriate(searchStr, port.getComments(), "Comments", matches);
 
@@ -1649,6 +1650,7 @@ public class ControllerFacade implements Authorizable {
         final Processor processor = procNode.getProcessor();
 
         addIfAppropriate(searchStr, procNode.getIdentifier(), "Id", matches);
+        addIfAppropriate(searchStr, procNode.getVersionedComponentId().orElse(null), "Version Control ID", matches);
         addIfAppropriate(searchStr, procNode.getName(), "Name", matches);
         addIfAppropriate(searchStr, procNode.getComments(), "Comments", matches);
 
@@ -1753,6 +1755,7 @@ public class ControllerFacade implements Authorizable {
         }
 
         addIfAppropriate(searchStr, group.getIdentifier(), "Id", matches);
+        addIfAppropriate(searchStr, group.getVersionedComponentId().orElse(null), "Version Control ID", matches);
         addIfAppropriate(searchStr, group.getName(), "Name", matches);
         addIfAppropriate(searchStr, group.getComments(), "Comments", matches);
 
@@ -1783,6 +1786,7 @@ public class ControllerFacade implements Authorizable {
 
         // search id and name
         addIfAppropriate(searchStr, connection.getIdentifier(), "Id", matches);
+        addIfAppropriate(searchStr, connection.getVersionedComponentId().orElse(null), "Version Control ID", matches);
         addIfAppropriate(searchStr, connection.getName(), "Name", matches);
 
         // search relationships
@@ -1864,6 +1868,7 @@ public class ControllerFacade implements Authorizable {
     private ComponentSearchResultDTO search(final String searchStr, final RemoteProcessGroup group) {
         final List<String> matches = new ArrayList<>();
         addIfAppropriate(searchStr, group.getIdentifier(), "Id", matches);
+        addIfAppropriate(searchStr, group.getVersionedComponentId().orElse(null), "Version Control ID", matches);
         addIfAppropriate(searchStr, group.getName(), "Name", matches);
         addIfAppropriate(searchStr, group.getComments(), "Comments", matches);
         addIfAppropriate(searchStr, group.getTargetUris(), "URLs", matches);
@@ -1889,6 +1894,7 @@ public class ControllerFacade implements Authorizable {
     private ComponentSearchResultDTO search(final String searchStr, final Funnel funnel) {
         final List<String> matches = new ArrayList<>();
         addIfAppropriate(searchStr, funnel.getIdentifier(), "Id", matches);
+        addIfAppropriate(searchStr, funnel.getVersionedComponentId().orElse(null), "Version Control ID", matches);
 
         if (matches.isEmpty()) {
             return null;

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
index d7ca806..5f4dba5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
@@ -16,14 +16,17 @@
  */
 package org.apache.nifi.web.dao;
 
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Future;
 
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
 import org.apache.nifi.web.api.dto.VariableRegistryDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
 
 public interface ProcessGroupDAO {
 
@@ -104,6 +107,27 @@ public interface ProcessGroupDAO {
     ProcessGroup updateProcessGroup(ProcessGroupDTO processGroup);
 
     /**
+     * Updates the process group so that it matches the proposed flow
+     *
+     * @param groupId the ID of the process group
+     * @param proposedSnapshot Flow the new version of the flow
+     * @param versionControlInformation the new Version Control Information
+     * @param the seed value to use for generating ID's for new components
+     * @return the process group
+     */
+    ProcessGroup updateProcessGroupFlow(String groupId, VersionedFlowSnapshot proposedSnapshot, VersionControlInformationDTO versionControlInformation, String componentIdSeed,
+        boolean verifyNotModified);
+
+    /**
+     * Applies the given Version Control Information to the Process Group
+     *
+     * @param versionControlInformation the Version Control Information to apply
+     * @param versionedComponentMapping a mapping of Component ID to Versioned Component ID
+     * @return the Process Group
+     */
+    ProcessGroup updateVersionControlInformation(VersionControlInformationDTO versionControlInformation, Map<String, String> versionedComponentMapping);
+
+    /**
      * Updates the specified variable registry
      *
      * @param variableRegistry the Variable Registry

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index ec584de..6fa316d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -16,15 +16,14 @@
  */
 package org.apache.nifi.web.dao.impl;
 
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 
 import org.apache.nifi.connectable.Connectable;
-import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.connectable.Position;
 import org.apache.nifi.controller.FlowController;
@@ -33,9 +32,14 @@ import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.registry.flow.StandardVersionControlInformation;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.web.ResourceNotFoundException;
 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
 import org.apache.nifi.web.api.dto.VariableRegistryDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
 import org.apache.nifi.web.api.entity.VariableEntity;
 import org.apache.nifi.web.dao.ProcessGroupDAO;
 
@@ -90,24 +94,30 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
     public void verifyScheduleComponents(final String groupId, final ScheduledState state,final Set<String> componentIds) {
         final ProcessGroup group = locateProcessGroup(flowController, groupId);
 
-        final Set<Connectable> connectables = new HashSet<>(componentIds.size());
         for (final String componentId : componentIds) {
             final Connectable connectable = group.findLocalConnectable(componentId);
             if (connectable == null) {
-                throw new ResourceNotFoundException("Unable to find component with id " + componentId);
-            }
+                final RemoteGroupPort remotePort = group.findRemoteGroupPort(componentId);
+                if (remotePort == null) {
+                    throw new ResourceNotFoundException("Unable to find component with id " + componentId);
+                }
 
-            connectables.add(connectable);
-        }
+                if (ScheduledState.RUNNING.equals(state)) {
+                    remotePort.verifyCanStart();
+                } else {
+                    remotePort.verifyCanStop();
+                }
 
-        // verify as appropriate
-        connectables.forEach(connectable -> {
+                continue;
+            }
+
+            // verify as appropriate
             if (ScheduledState.RUNNING.equals(state)) {
                 group.verifyCanStart(connectable);
             } else {
                 group.verifyCanStop(connectable);
             }
-        });
+        }
     }
 
     @Override
@@ -134,22 +144,46 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
         for (final String componentId : componentIds) {
             final Connectable connectable = group.findLocalConnectable(componentId);
             if (ScheduledState.RUNNING.equals(state)) {
-                if (ConnectableType.PROCESSOR.equals(connectable.getConnectableType())) {
-                    final CompletableFuture<?> processorFuture = connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
-                    future = CompletableFuture.allOf(future, processorFuture);
-                } else if (ConnectableType.INPUT_PORT.equals(connectable.getConnectableType())) {
-                    connectable.getProcessGroup().startInputPort((Port) connectable);
-                } else if (ConnectableType.OUTPUT_PORT.equals(connectable.getConnectableType())) {
-                    connectable.getProcessGroup().startOutputPort((Port) connectable);
+                switch (connectable.getConnectableType()) {
+                    case PROCESSOR:
+                        final CompletableFuture<?> processorFuture = connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
+                        future = CompletableFuture.allOf(future, processorFuture);
+                        break;
+                    case INPUT_PORT:
+                        connectable.getProcessGroup().startInputPort((Port) connectable);
+                        break;
+                    case OUTPUT_PORT:
+                        connectable.getProcessGroup().startOutputPort((Port) connectable);
+                        break;
+                    case REMOTE_INPUT_PORT:
+                        final RemoteGroupPort remoteInputPort = group.findRemoteGroupPort(componentId);
+                        remoteInputPort.getRemoteProcessGroup().startTransmitting(remoteInputPort);
+                        break;
+                    case REMOTE_OUTPUT_PORT:
+                        final RemoteGroupPort remoteOutputPort = group.findRemoteGroupPort(componentId);
+                        remoteOutputPort.getRemoteProcessGroup().startTransmitting(remoteOutputPort);
+                        break;
                 }
             } else {
-                if (ConnectableType.PROCESSOR.equals(connectable.getConnectableType())) {
-                    final CompletableFuture<?> processorFuture = connectable.getProcessGroup().stopProcessor((ProcessorNode) connectable);
-                    future = CompletableFuture.allOf(future, processorFuture);
-                } else if (ConnectableType.INPUT_PORT.equals(connectable.getConnectableType())) {
-                    connectable.getProcessGroup().stopInputPort((Port) connectable);
-                } else if (ConnectableType.OUTPUT_PORT.equals(connectable.getConnectableType())) {
-                    connectable.getProcessGroup().stopOutputPort((Port) connectable);
+                switch (connectable.getConnectableType()) {
+                    case PROCESSOR:
+                        final CompletableFuture<?> processorFuture = connectable.getProcessGroup().stopProcessor((ProcessorNode) connectable);
+                        future = CompletableFuture.allOf(future, processorFuture);
+                        break;
+                    case INPUT_PORT:
+                        connectable.getProcessGroup().stopInputPort((Port) connectable);
+                        break;
+                    case OUTPUT_PORT:
+                        connectable.getProcessGroup().stopOutputPort((Port) connectable);
+                        break;
+                    case REMOTE_INPUT_PORT:
+                        final RemoteGroupPort remoteInputPort = group.findRemoteGroupPort(componentId);
+                        remoteInputPort.getRemoteProcessGroup().stopTransmitting(remoteInputPort);
+                        break;
+                    case REMOTE_OUTPUT_PORT:
+                        final RemoteGroupPort remoteOutputPort = group.findRemoteGroupPort(componentId);
+                        remoteOutputPort.getRemoteProcessGroup().stopTransmitting(remoteOutputPort);
+                        break;
                 }
             }
         }
@@ -197,6 +231,41 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
     }
 
     @Override
+    public ProcessGroup updateVersionControlInformation(final VersionControlInformationDTO versionControlInformation, final Map<String, String> versionedComponentMapping) {
+        final String groupId = versionControlInformation.getGroupId();
+        final ProcessGroup group = locateProcessGroup(flowController, groupId);
+
+        final String registryId = versionControlInformation.getRegistryId();
+        final String bucketId = versionControlInformation.getBucketId();
+        final String flowId = versionControlInformation.getFlowId();
+        final int version = versionControlInformation.getVersion();
+
+        final VersionControlInformation vci = new StandardVersionControlInformation(registryId, bucketId, flowId, version, null, false, true);
+        group.setVersionControlInformation(vci, versionedComponentMapping);
+
+        return group;
+    }
+
+    @Override
+    public ProcessGroup updateProcessGroupFlow(final String groupId, final VersionedFlowSnapshot proposedSnapshot, final VersionControlInformationDTO versionControlInformation,
+        final String componentIdSeed, final boolean verifyNotModified) {
+        final ProcessGroup group = locateProcessGroup(flowController, groupId);
+        group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified);
+
+        final StandardVersionControlInformation svci = new StandardVersionControlInformation(
+            versionControlInformation.getRegistryId(),
+            versionControlInformation.getBucketId(),
+            versionControlInformation.getFlowId(),
+            versionControlInformation.getVersion(),
+            proposedSnapshot.getFlowContents(),
+            versionControlInformation.getModified(),
+            versionControlInformation.getCurrent());
+
+        group.setVersionControlInformation(svci, Collections.emptyMap());
+        return group;
+    }
+
+    @Override
     public ProcessGroup updateVariableRegistry(final VariableRegistryDTO variableRegistry) {
         final ProcessGroup group = locateProcessGroup(flowController, variableRegistry.getProcessGroupId());
         if (group == null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java
new file mode 100644
index 0000000..7fdaf56
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.util;
+
+import java.util.Optional;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.apache.nifi.web.api.dto.AffectedComponentDTO;
+import org.apache.nifi.web.api.dto.DtoFactory;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+import org.apache.nifi.web.api.entity.AffectedComponentEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
+
+public class AffectedComponentUtils {
+
+    public static AffectedComponentEntity updateEntity(final AffectedComponentEntity componentEntity, final NiFiServiceFacade serviceFacade,
+                final DtoFactory dtoFactory, final NiFiUser user) {
+
+        switch (componentEntity.getComponent().getReferenceType()) {
+            case AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR:
+                final ProcessorEntity procEntity = serviceFacade.getProcessor(componentEntity.getId(), user);
+                return dtoFactory.createAffectedComponentEntity(procEntity);
+            case AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT: {
+                final RemoteProcessGroupEntity remoteGroupEntity = serviceFacade.getRemoteProcessGroup(componentEntity.getComponent().getProcessGroupId(), user);
+                final RemoteProcessGroupContentsDTO remoteGroupContents = remoteGroupEntity.getComponent().getContents();
+                final Optional<RemoteProcessGroupPortDTO> portDtoOption = remoteGroupContents.getInputPorts().stream()
+                    .filter(port -> port.getId().equals(componentEntity.getId()))
+                    .findFirst();
+
+                if (portDtoOption.isPresent()) {
+                    final RemoteProcessGroupPortDTO portDto = portDtoOption.get();
+                    return dtoFactory.createAffectedComponentEntity(portDto, AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT, remoteGroupEntity);
+                }
+                break;
+            }
+            case AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT: {
+                final RemoteProcessGroupEntity remoteGroupEntity = serviceFacade.getRemoteProcessGroup(componentEntity.getComponent().getProcessGroupId(), user);
+                final RemoteProcessGroupContentsDTO remoteGroupContents = remoteGroupEntity.getComponent().getContents();
+                final Optional<RemoteProcessGroupPortDTO> portDtoOption = remoteGroupContents.getOutputPorts().stream()
+                    .filter(port -> port.getId().equals(componentEntity.getId()))
+                    .findFirst();
+
+                if (portDtoOption.isPresent()) {
+                    final RemoteProcessGroupPortDTO portDto = portDtoOption.get();
+                    return dtoFactory.createAffectedComponentEntity(portDto, AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT, remoteGroupEntity);
+                }
+                break;
+            }
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
new file mode 100644
index 0000000..a6efb71
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.util;
+
+import java.util.concurrent.TimeUnit;
+
+public class CancellableTimedPause implements Pause {
+    private final long expirationNanoTime;
+    private final long pauseNanos;
+    private volatile boolean cancelled = false;
+
+    public CancellableTimedPause(final long pauseTime, final long expirationTime, final TimeUnit timeUnit) {
+        final long expirationNanos = TimeUnit.NANOSECONDS.convert(expirationTime, timeUnit);
+        expirationNanoTime = System.nanoTime() + expirationNanos;
+        pauseNanos = Math.max(1L, TimeUnit.NANOSECONDS.convert(pauseTime, timeUnit));
+    }
+
+    public void cancel() {
+        cancelled = true;
+    }
+
+    @Override
+    public boolean pause() {
+        if (cancelled) {
+            return false;
+        }
+
+        long sysTime = System.nanoTime();
+        final long maxWaitTime = System.nanoTime() + pauseNanos;
+        while (sysTime < maxWaitTime) {
+            try {
+                TimeUnit.NANOSECONDS.wait(pauseNanos);
+            } catch (final InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                return false;
+            }
+
+            sysTime = System.nanoTime();
+        }
+
+        return sysTime < expirationNanoTime && !cancelled;
+    }
+
+}