You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/02/05 22:00:25 UTC

[1/3] incubator-nifi git commit: NIFI-250: - Refactoring revision checking so that we can lock appropriately on the Cluster Manager to manage controller services running there while other concurrent requests can be replicated amongst the cluster.

Repository: incubator-nifi
Updated Branches:
  refs/heads/NIFI-250 17add531f -> 4a8da6033


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
index 224ab18..a9dce5f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
@@ -251,7 +251,7 @@ public class OutputPortResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         final OutputPortEntity entity = new OutputPortEntity();
@@ -447,7 +447,7 @@ public class OutputPortResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         final OutputPortEntity entity = new OutputPortEntity();
@@ -504,7 +504,7 @@ public class OutputPortResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO revision = new RevisionDTO();
         revision.setClientId(clientId.getClientId());
-        revision.setVersion(controllerResponse.getRevision());
+        revision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         final OutputPortEntity entity = new OutputPortEntity();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 6439bda..1bf3f77 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -287,7 +287,7 @@ public class ProcessGroupResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO revision = new RevisionDTO();
         revision.setClientId(clientId.getClientId());
-        revision.setVersion(controllerResponse.getRevision());
+        revision.setVersion(controllerResponse.getVersion());
 
         // create the response entity
         final ProcessGroupEntity processGroupEntity = new ProcessGroupEntity();
@@ -365,7 +365,7 @@ public class ProcessGroupResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO revision = new RevisionDTO();
         revision.setClientId(clientId.getClientId());
-        revision.setVersion(controllerResponse.getRevision());
+        revision.setVersion(controllerResponse.getVersion());
 
         // create the response entity
         final FlowSnippetEntity entity = new FlowSnippetEntity();
@@ -441,7 +441,7 @@ public class ProcessGroupResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO revision = new RevisionDTO();
         revision.setClientId(clientId.getClientId());
-        revision.setVersion(response.getRevision());
+        revision.setVersion(response.getVersion());
 
         // create the response entity
         final FlowSnippetEntity entity = new FlowSnippetEntity();
@@ -559,7 +559,7 @@ public class ProcessGroupResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(response.getRevision());
+        updatedRevision.setVersion(response.getVersion());
 
         // create the response entity
         final ProcessGroupEntity entity = new ProcessGroupEntity();
@@ -616,7 +616,7 @@ public class ProcessGroupResource extends ApplicationResource {
         // create the revision
         final RevisionDTO revision = new RevisionDTO();
         revision.setClientId(clientId.getClientId());
-        revision.setVersion(controllerResponse.getRevision());
+        revision.setVersion(controllerResponse.getVersion());
 
         // create the response entity
         final ProcessGroupEntity processGroupEntity = new ProcessGroupEntity();
@@ -795,7 +795,7 @@ public class ProcessGroupResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // create the response entity
         final ProcessGroupEntity entity = new ProcessGroupEntity();
@@ -931,7 +931,7 @@ public class ProcessGroupResource extends ApplicationResource {
         // create the revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(response.getRevision());
+        updatedRevision.setVersion(response.getVersion());
 
         // create the response entity
         final ProcessGroupEntity entity = new ProcessGroupEntity();
@@ -989,7 +989,7 @@ public class ProcessGroupResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO revision = new RevisionDTO();
         revision.setClientId(clientId.getClientId());
-        revision.setVersion(controllerResponse.getRevision());
+        revision.setVersion(controllerResponse.getVersion());
 
         // create the response entity
         final ProcessGroupEntity entity = new ProcessGroupEntity();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
index b11c40a..75995dd 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
@@ -288,7 +288,7 @@ public class ProcessorResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // generate the response entity
         final ProcessorEntity entity = new ProcessorEntity();
@@ -607,7 +607,7 @@ public class ProcessorResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // generate the response entity
         final ProcessorEntity entity = new ProcessorEntity();
@@ -664,7 +664,7 @@ public class ProcessorResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(clientId.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // generate the response entity
         final ProcessorEntity entity = new ProcessorEntity();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
index 6a5b536..12bb46f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
@@ -392,7 +392,7 @@ public class RemoteProcessGroupResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         final RemoteProcessGroupEntity entity = new RemoteProcessGroupEntity();
@@ -448,7 +448,7 @@ public class RemoteProcessGroupResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO revision = new RevisionDTO();
         revision.setClientId(clientId.getClientId());
-        revision.setVersion(controllerResponse.getRevision());
+        revision.setVersion(controllerResponse.getVersion());
 
         // create the response entity
         final RemoteProcessGroupEntity entity = new RemoteProcessGroupEntity();
@@ -582,7 +582,7 @@ public class RemoteProcessGroupResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         final RemoteProcessGroupPortEntity entity = new RemoteProcessGroupPortEntity();
@@ -716,7 +716,7 @@ public class RemoteProcessGroupResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         RemoteProcessGroupPortEntity entity = new RemoteProcessGroupPortEntity();
@@ -882,7 +882,7 @@ public class RemoteProcessGroupResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         final RemoteProcessGroupEntity entity = new RemoteProcessGroupEntity();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SnippetResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SnippetResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SnippetResource.java
index e0b7788..275b133 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SnippetResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SnippetResource.java
@@ -340,7 +340,7 @@ public class SnippetResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(response.getRevision());
+        updatedRevision.setVersion(response.getVersion());
 
         // build the response entity
         SnippetEntity entity = new SnippetEntity();
@@ -520,7 +520,7 @@ public class SnippetResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         SnippetEntity entity = new SnippetEntity();
@@ -577,7 +577,7 @@ public class SnippetResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO revision = new RevisionDTO();
         revision.setClientId(clientId.getClientId());
-        revision.setVersion(controllerResponse.getRevision());
+        revision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         SnippetEntity entity = new SnippetEntity();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index aef2feb..ab7cd4c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -127,6 +127,7 @@ import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.reporting.ReportingTask;
+import org.apache.nifi.web.FlowModification;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO.ControllerServiceReferenceDTO;
 
 /**
@@ -2252,14 +2253,17 @@ public final class DtoFactory {
     /**
      * Factory method for creating a new RevisionDTO based on this controller.
      *
-     * @param revision
+     * @param lastMod
      * @return
      */
-    public RevisionDTO createRevisionDTO(Revision revision) {
+    public RevisionDTO createRevisionDTO(FlowModification lastMod) {
+        final Revision revision = lastMod.getRevision();
+        
         // create the dto
         final RevisionDTO revisionDTO = new RevisionDTO();
         revisionDTO.setVersion(revision.getVersion());
         revisionDTO.setClientId(revision.getClientId());
+        revisionDTO.setLastModifier(lastMod.getLastModifier());
 
         return revisionDTO;
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/OptimisticLockingManagerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/OptimisticLockingManagerFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/OptimisticLockingManagerFactoryBean.java
index 8112b7b..8436793 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/OptimisticLockingManagerFactoryBean.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/OptimisticLockingManagerFactoryBean.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.web.spring;
 
 import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.ClusterAwareOptimisticLockingManager;
 import org.apache.nifi.web.OptimisticLockingManager;
 import org.apache.nifi.web.StandardOptimisticLockingManager;
 import org.springframework.beans.BeansException;
@@ -40,7 +39,7 @@ public class OptimisticLockingManagerFactoryBean implements FactoryBean, Applica
             if (properties.isClusterManager()) {
                 optimisticLockingManager = context.getBean("clusterManagerOptimisticLockingManager", OptimisticLockingManager.class);
             } else {
-                optimisticLockingManager = new ClusterAwareOptimisticLockingManager(new StandardOptimisticLockingManager());
+                optimisticLockingManager = new StandardOptimisticLockingManager();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/pom.xml
index 0857d6f..4efcd37 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/pom.xml
@@ -27,5 +27,17 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-administration</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-web-security</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-framework-cluster-web</artifactId>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationRequest.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationRequest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationRequest.java
new file mode 100644
index 0000000..3d7db9c
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationRequest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web;
+
+/**
+ * 
+ * @param <T>
+ */
+public interface ConfigurationRequest<T> {
+
+    T execute();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationSnapshot.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationSnapshot.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationSnapshot.java
index 6ad683c..8817d69 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationSnapshot.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationSnapshot.java
@@ -22,36 +22,36 @@ package org.apache.nifi.web;
  */
 public class ConfigurationSnapshot<T> {
 
-    private Long revision;
+    private Long version;
     private T configuration;
 
     /**
      * Creates a new ConfigurationSnapshot.
      *
-     * @param revision The model revision
+     * @param version The revision version
      */
-    public ConfigurationSnapshot(Long revision) {
-        this(revision, null);
+    public ConfigurationSnapshot(Long version) {
+        this(version, null);
     }
 
     /**
      * Creates a new ConfigurationSnapshot.
      *
-     * @param revision The model revision
+     * @param version The revision version
      * @param configuration The configuration
      */
-    public ConfigurationSnapshot(Long revision, T configuration) {
-        this.revision = revision;
+    public ConfigurationSnapshot(Long version, T configuration) {
+        this.version = version;
         this.configuration = configuration;
     }
 
     /**
-     * Get the new model revision.
+     * Get the revision version.
      *
-     * @return The model revision
+     * @return The revision version
      */
-    public Long getRevision() {
-        return revision;
+    public Long getVersion() {
+        return version;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/FlowModification.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/FlowModification.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/FlowModification.java
new file mode 100644
index 0000000..4bde6fe
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/FlowModification.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web;
+
+/**
+ * 
+ */
+public class FlowModification {
+
+    private final Revision revision;
+    private final String lastModifier;
+
+    public FlowModification(Revision revision, String lastModifier) {
+        this.revision = revision;
+        this.lastModifier = lastModifier;
+    }
+
+    public Revision getRevision() {
+        return revision;
+    }
+
+    public String getLastModifier() {
+        return lastModifier;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java
index b045247..455a5d6 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java
@@ -25,71 +25,27 @@ package org.apache.nifi.web;
  */
 public interface OptimisticLockingManager {
 
+    
     /**
-     * Checks the specified revision against the current revision. If the check
-     * succeeds, then the current revision's version is incremented and the
-     * current revision's client ID is set to the given revision's client ID.
-     *
-     * If the given revision's version is null, then the revision's client ID
-     * must match for the current revision's client ID for the check to succeed.
-     *
-     * If the versions and the clientIds do not match, then an
-     * InvalidRevisionException.
-     *
-     * @param revision the revision to check
-     *
-     * @return the current revision
-     *
-     * @throws InvalidRevisionException if the given revision does not match the
-     * current revision
+     * Attempts to execute the specified configuration request using the specified revision within a lock.
+     * 
+     * @param <T>
+     * @param revision
+     * @param configurationRequest
+     * @return 
      */
-    Revision checkRevision(Revision revision) throws InvalidRevisionException;
-
-    /**
-     * Returns true if the given revision matches the current revision.
-     *
-     * @param revision a revision
-     * @return true if given revision is current; false otherwise.
-     */
-    boolean isCurrent(Revision revision);
-
+    <T> ConfigurationSnapshot<T> configureFlow(Revision revision, ConfigurationRequest<T> configurationRequest);
+    
     /**
-     * @return the current revision
+     * Updates the revision using the specified revision within a lock.
+     * 
+     * @param updateRevision 
      */
-    Revision getRevision();
+    void setRevision(UpdateRevision updateRevision);
 
     /**
-     * Sets the current revision.
-     *
-     * @param revision a revision
+     * @return the last modification
      */
-    void setRevision(Revision revision);
+    FlowModification getLastModification();
 
-    /**
-     * Increments the current revision's version.
-     *
-     * @return the current revision
-     */
-    Revision incrementRevision();
-
-    /**
-     * Increments the current revision's version and sets the current revision's
-     * client ID to the given client ID.
-     *
-     * @param clientId a client ID
-     * @return the current revision
-     */
-    Revision incrementRevision(String clientId);
-
-    /**
-     * @return the last modifier.
-     */
-    String getLastModifier();
-
-    /**
-     * Sets the last modifier.
-     *
-     * @param lastModifier the last modifier
-     */
-    void setLastModifier(String lastModifier);
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/StandardOptimisticLockingManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/StandardOptimisticLockingManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/StandardOptimisticLockingManager.java
index 8da6d23..e8a98dd 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/StandardOptimisticLockingManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/StandardOptimisticLockingManager.java
@@ -16,6 +16,14 @@
  */
 package org.apache.nifi.web;
 
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.nifi.cluster.context.ClusterContext;
+import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
+import org.apache.nifi.web.security.user.NiFiUserUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Implements the OptimisticLockingManager interface.
  *
@@ -23,55 +31,129 @@ package org.apache.nifi.web;
  */
 public class StandardOptimisticLockingManager implements OptimisticLockingManager {
 
+    private static final Logger logger = LoggerFactory.getLogger(StandardOptimisticLockingManager.class);
+    
+    private static final String INVALID_REVISION_ERROR = "Given revision %s does not match current revision %s.";
+    private static final String SYNC_ERROR = "This NiFi instance has been updated by '%s'. Please refresh to synchronize the view.";
+    
     private Revision currentRevision = new Revision(0L, "");
-
     private String lastModifier = "unknown";
+    private final Lock lock = new ReentrantLock();
+    
+    private void lock() {
+        lock.lock();
+    }
+    
+    private void unlock() {
+        lock.unlock();
+    }
 
-    @Override
-    public Revision checkRevision(Revision revision) throws InvalidRevisionException {
-        if (currentRevision.equals(revision) == false) {
-            throw new InvalidRevisionException(String.format("Given revision %s does not match current revision %s.", revision, currentRevision));
-        } else {
-            currentRevision = revision.increment(revision.getClientId());
-            return currentRevision;
+    private void checkRevision(final Revision revision) {
+        final FlowModification lastMod = getLastModification();
+        
+        // with lock, verify revision
+        boolean approved = lastMod.getRevision().equals(revision);
+
+        if (!approved) {
+            logger.debug("Revision check failed because current revision is " + lastMod.getRevision() + " but supplied revision is " + revision);
+            
+            if (lastMod.getRevision().getClientId() == null || lastMod.getRevision().getClientId().trim().isEmpty() || lastMod.getRevision().getVersion() == null) {
+                throw new InvalidRevisionException(String.format(INVALID_REVISION_ERROR, revision, lastMod.getRevision()));
+            } else {
+                throw new InvalidRevisionException(String.format(SYNC_ERROR, lastMod.getLastModifier()));
+            }
         }
     }
-
-    @Override
-    public boolean isCurrent(Revision revision) {
-        return currentRevision.equals(revision);
+    
+    private Revision updateRevision(final Revision updatedRevision) {
+        // record the current modification
+        setLastModification(new FlowModification(updatedRevision, NiFiUserUtils.getNiFiUserName()));
+        
+        // return the revision
+        return updatedRevision;
     }
 
     @Override
-    public Revision getRevision() {
-        return currentRevision;
-    }
+    public <T> ConfigurationSnapshot<T> configureFlow(Revision revision, ConfigurationRequest<T> configurationRequest) {
+        lock();
+        try {
+            // check the revision
+            checkRevision(revision);
 
-    @Override
-    public void setRevision(Revision revision) {
-        currentRevision = revision;
-    }
+            // execute the configuration request
+            final T result = configurationRequest.execute();
 
-    @Override
-    public Revision incrementRevision() {
-        currentRevision = currentRevision.increment();
-        return currentRevision;
-    }
+            // update the revision
+            final Revision newRevision = updateRevision(incrementRevision(revision.getClientId()));
 
-    @Override
-    public Revision incrementRevision(String clientId) {
-        currentRevision = currentRevision.increment(clientId);
-        return currentRevision;
+            // build the result
+            return new ConfigurationSnapshot(newRevision.getVersion(), result);
+        } finally {
+            unlock();
+        }
     }
 
     @Override
-    public String getLastModifier() {
-        return lastModifier;
+    public void setRevision(UpdateRevision updateRevision) {
+        lock();
+        try {
+            final Revision updatedRevision = updateRevision.execute(getLastModification().getRevision());
+            
+            // update the revision
+            if (updatedRevision != null) {
+                updateRevision(updatedRevision);
+            }
+        } finally {
+            unlock();
+        }
     }
-
+    
     @Override
-    public void setLastModifier(String lastModifier) {
-        this.lastModifier = lastModifier;
+    public FlowModification getLastModification() {
+        lock();
+        try {
+            final Revision revision;
+            final ClusterContext ctx = ClusterContextThreadLocal.getContext();
+            if (ctx == null || ctx.getRevision() == null) {
+                revision = currentRevision;
+            } else {
+                revision = ctx.getRevision();
+            }
+            
+            return new FlowModification(revision, lastModifier);
+        } finally {
+            unlock();
+        }
     }
-
+    
+    private void setLastModification(final FlowModification lastModification) {
+        lock();
+        try {
+            // record the last modifier
+            lastModifier = lastModification.getLastModifier();
+            
+            // record the updated revision
+            final ClusterContext ctx = ClusterContextThreadLocal.getContext();
+            if (ctx != null) {
+                ctx.setRevision(lastModification.getRevision());
+            } else {
+                currentRevision = lastModification.getRevision();
+            }
+        } finally {
+            unlock();
+        }
+    }
+    
+    private Revision incrementRevision(String clientId) {
+        final Revision current = getLastModification().getRevision();
+        
+        final long incrementedVersion;
+        if (current.getVersion() == null) {
+            incrementedVersion = 0;
+        } else {
+            incrementedVersion = current.getVersion() + 1;
+        }
+        return new Revision(incrementedVersion, clientId);
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/UpdateRevision.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/UpdateRevision.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/UpdateRevision.java
new file mode 100644
index 0000000..700614f
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/UpdateRevision.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web;
+
+/**
+ * 
+ */
+public interface UpdateRevision {
+
+    Revision execute(Revision currentRevision);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/user/NiFiUserUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/user/NiFiUserUtils.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/user/NiFiUserUtils.java
index 0209cf7..acf37dc 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/user/NiFiUserUtils.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/user/NiFiUserUtils.java
@@ -79,4 +79,14 @@ public final class NiFiUserUtils {
 
         return user;
     }
+    
+    public static String getNiFiUserName() {
+        // get the nifi user to extract the username
+        NiFiUser user = NiFiUserUtils.getNiFiUser();
+        if (user == null) {
+            return "unknown";
+        } else {
+            return user.getUserName();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
index 61b3156..21dc296 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
@@ -25,7 +25,7 @@ nf.Settings = (function () {
             controllerConfig: '../nifi-api/controller/config',
             controllerArchive: '../nifi-api/controller/archive',
             controllerServiceTypes: '../nifi-api/controller/controller-service-types',
-            controllerServices: '../nifi-api/controller/controller-services',
+            controllerServices: '../nifi-api/controller/controller-services/node',
             reportingTaskTypes: '../nifi-api/controller/reporting-task-types',
             reportingTasks: '../nifi-api/controller/reporting-tasks'
         }


[3/3] incubator-nifi git commit: NIFI-250: - Refactoring revision checking so that we can lock appropriately on the Cluster Manager to manage controller services running there while other concurrent requests can be replicated amongst the cluster.

Posted by mc...@apache.org.
NIFI-250:
- Refactoring revision checking so that we can lock appropriately on the Cluster Manager to manage controller services running there while other concurrent requests can be replicated amongst the cluster.

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/4a8da603
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/4a8da603
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/4a8da603

Branch: refs/heads/NIFI-250
Commit: 4a8da60334f4b78a123906a6a712d95ec873c687
Parents: 17add53
Author: Matt Gilman <ma...@gmail.com>
Authored: Thu Feb 5 15:59:35 2015 -0500
Committer: Matt Gilman <ma...@gmail.com>
Committed: Thu Feb 5 15:59:35 2015 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/nifi/web/Revision.java |   32 +-
 .../apache/nifi/web/api/dto/RevisionDTO.java    |   15 +
 .../nifi-framework/nifi-cluster-web/pom.xml     |    4 -
 .../context/ClusterContextThreadLocal.java      |    7 +-
 .../ClusterAwareOptimisticLockingManager.java   |   96 --
 .../nifi-framework/nifi-cluster/pom.xml         |    4 +
 .../cluster/manager/impl/WebClusterManager.java |  179 +--
 .../java/org/apache/nifi/audit/NiFiAuditor.java |   10 +-
 .../nifi/web/StandardNiFiServiceFacade.java     | 1193 ++++++++----------
 .../nifi/web/api/ApplicationResource.java       |   76 +-
 .../apache/nifi/web/api/ClusterResource.java    |    2 +-
 .../apache/nifi/web/api/ConnectionResource.java |    6 +-
 .../apache/nifi/web/api/ControllerResource.java |    9 +-
 .../nifi/web/api/ControllerServiceResource.java |    6 +-
 .../org/apache/nifi/web/api/FunnelResource.java |    6 +-
 .../apache/nifi/web/api/InputPortResource.java  |    6 +-
 .../org/apache/nifi/web/api/LabelResource.java  |    6 +-
 .../apache/nifi/web/api/OutputPortResource.java |    6 +-
 .../nifi/web/api/ProcessGroupResource.java      |   16 +-
 .../apache/nifi/web/api/ProcessorResource.java  |    6 +-
 .../web/api/RemoteProcessGroupResource.java     |   10 +-
 .../apache/nifi/web/api/SnippetResource.java    |    6 +-
 .../org/apache/nifi/web/api/dto/DtoFactory.java |    8 +-
 .../OptimisticLockingManagerFactoryBean.java    |    3 +-
 .../nifi-web-optimistic-locking/pom.xml         |   12 +
 .../apache/nifi/web/ConfigurationRequest.java   |   26 +
 .../apache/nifi/web/ConfigurationSnapshot.java  |   22 +-
 .../org/apache/nifi/web/FlowModification.java   |   40 +
 .../nifi/web/OptimisticLockingManager.java      |   74 +-
 .../web/StandardOptimisticLockingManager.java   |  150 ++-
 .../org/apache/nifi/web/UpdateRevision.java     |   26 +
 .../nifi/web/security/user/NiFiUserUtils.java   |   10 +
 .../src/main/webapp/js/nf/canvas/nf-settings.js |    2 +-
 33 files changed, 1004 insertions(+), 1070 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-api/src/main/java/org/apache/nifi/web/Revision.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/web/Revision.java b/nifi/nifi-api/src/main/java/org/apache/nifi/web/Revision.java
index 1881c2f..8a6275e 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/web/Revision.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/web/Revision.java
@@ -37,12 +37,12 @@ public class Revision implements Serializable {
      * the client ID
      */
     private final String clientId;
-
+    
     public Revision(Long revision, String clientId) {
         this.version = revision;
         this.clientId = clientId;
     }
-
+    
     public String getClientId() {
         return clientId;
     }
@@ -51,34 +51,6 @@ public class Revision implements Serializable {
         return version;
     }
 
-    /**
-     * A factory method for creating a new Revision instance whose version is
-     * this instance's version plus 1.
-     *
-     * @return an updated revision
-     */
-    public Revision increment() {
-        final long incrementedVersion;
-        if (version == null) {
-            incrementedVersion = 0;
-        } else {
-            incrementedVersion = version + 1;
-        }
-        return new Revision(incrementedVersion, clientId);
-    }
-
-    /**
-     * A factory method for creating a new Revision instance whose version is
-     * this instance's version plus 1 and whose client ID is the given client
-     * ID.
-     *
-     * @param clientId the client ID
-     * @return an updated revision
-     */
-    public Revision increment(String clientId) {
-        return new Revision(increment().getVersion(), clientId);
-    }
-
     @Override
     public boolean equals(final Object obj) {
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RevisionDTO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RevisionDTO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RevisionDTO.java
index e608a7e..3327b49 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RevisionDTO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RevisionDTO.java
@@ -26,8 +26,10 @@ public class RevisionDTO {
 
     private String clientId;
     private Long version;
+    private String lastModifier;
 
     /* getters / setters */
+    
     /**
      * A client identifier used to make a request. By including a client
      * identifier, the API can allow multiple requests without needing the
@@ -60,4 +62,17 @@ public class RevisionDTO {
         this.version = version;
     }
 
+    /**
+     * The user that last modified the flow.
+     * 
+     * @return 
+     */
+    public String getLastModifier() {
+        return lastModifier;
+    }
+
+    public void setLastModifier(String lastModifier) {
+        this.lastModifier = lastModifier;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/pom.xml
index a9e0c12..10511ef 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/pom.xml
@@ -34,10 +34,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-web-optimistic-locking</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-administration</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
index 012e7c7..c8c7206 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
@@ -32,12 +32,7 @@ public class ClusterContextThreadLocal {
     }
     
     public static ClusterContext getContext() {
-        ClusterContext ctx = contextHolder.get();
-        if(ctx == null) {
-            ctx = createEmptyContext();
-            contextHolder.set(ctx);
-        }
-        return ctx;
+        return contextHolder.get();
     }
     
     public static void setContext(final ClusterContext context) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java
deleted file mode 100644
index 90b8a37..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.web;
-
-import org.apache.nifi.cluster.context.ClusterContext;
-import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
-
-/**
- * An optimistic locking manager that provides for optimistic locking in a clustered
- * environment.
- * 
- * @author unattributed
- */
-public class ClusterAwareOptimisticLockingManager implements OptimisticLockingManager {
-
-    private final OptimisticLockingManager optimisticLockingManager;
-    
-    public ClusterAwareOptimisticLockingManager(final OptimisticLockingManager optimisticLockingManager) {
-        this.optimisticLockingManager = optimisticLockingManager;
-    }
-    
-    @Override
-    public Revision checkRevision(Revision revision) throws InvalidRevisionException {
-        final Revision currentRevision = getRevision();
-        if(currentRevision.equals(revision) == false) {
-            throw new InvalidRevisionException(String.format("Given revision %s does not match current revision %s.", revision, currentRevision));
-        } else {
-            return revision.increment(revision.getClientId());
-        }
-    }
-
-    @Override
-    public boolean isCurrent(Revision revision) {
-        return getRevision().equals(revision);
-    }
-
-    @Override
-    public Revision getRevision() {
-        final ClusterContext ctx = ClusterContextThreadLocal.getContext();
-        if(ctx == null || ctx.getRevision() == null) {
-            return optimisticLockingManager.getRevision();
-        } else {
-            return ctx.getRevision();
-        }
-    }
-
-    @Override
-    public void setRevision(final Revision revision) {
-        final ClusterContext ctx = ClusterContextThreadLocal.getContext();
-        if(ctx != null) {
-            ctx.setRevision(revision);
-        }
-        optimisticLockingManager.setRevision(revision);
-    }
-
-    @Override
-    public Revision incrementRevision() {
-        final Revision currentRevision = getRevision();
-        final Revision incRevision = currentRevision.increment();
-        setRevision(incRevision);
-        return incRevision;
-    }
-
-    @Override
-    public Revision incrementRevision(final String clientId) {
-        final Revision currentRevision = getRevision();
-        final Revision incRevision = currentRevision.increment(clientId);
-        setRevision(incRevision);
-        return incRevision;
-    }
-
-    @Override
-    public String getLastModifier() {
-        return optimisticLockingManager.getLastModifier();
-    }
-
-    @Override
-    public void setLastModifier(final String lastModifier) {
-        optimisticLockingManager.setLastModifier(lastModifier);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/pom.xml
index 4a75a34..1c9242a 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/pom.xml
@@ -47,6 +47,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-web-optimistic-locking</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-framework-core</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 585d151..499d709 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -210,7 +210,9 @@ import org.xml.sax.SAXException;
 import org.xml.sax.SAXParseException;
 
 import com.sun.jersey.api.client.ClientResponse;
+import org.apache.nifi.util.ObjectHolder;
 import org.apache.nifi.web.OptimisticLockingManager;
+import org.apache.nifi.web.UpdateRevision;
 
 /**
  * Provides a cluster manager implementation. The manager federates incoming
@@ -2133,65 +2135,114 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         // check if this request can change the flow
         final boolean mutableRequest = canChangeNodeState(method, uri);
 
-        // update headers to contain cluster contextual information to send to the node
-        final Map<String, String> updatedHeaders = new HashMap<>(headers);
-        final ClusterContext clusterCtx = new ClusterContextImpl();
-        clusterCtx.setRequestSentByClusterManager(true);                 // indicate request is sent from cluster manager
-        clusterCtx.setRevision(optimisticLockingManager.getRevision());
+        final ObjectHolder<NodeResponse> holder = new ObjectHolder<>(null);
+        final UpdateRevision federateRequest = new UpdateRevision() {
+            @Override
+            public Revision execute(Revision currentRevision) {
+                // update headers to contain cluster contextual information to send to the node
+                final Map<String, String> updatedHeaders = new HashMap<>(headers);
+                final ClusterContext clusterCtx = new ClusterContextImpl();
+                clusterCtx.setRequestSentByClusterManager(true);                 // indicate request is sent from cluster manager
+                clusterCtx.setRevision(currentRevision);
+
+                // serialize cluster context and add to request header
+                final String serializedClusterCtx = WebUtils.serializeObjectToHex(clusterCtx);
+                updatedHeaders.put(CLUSTER_CONTEXT_HTTP_HEADER, serializedClusterCtx);
+
+                // if the request is mutable, we need to verify that it is a valid request for all nodes in the cluster.
+                if (mutableRequest) {
+                    updatedHeaders.put(NCM_EXPECTS_HTTP_HEADER, "150-NodeContinue");
 
-        // serialize cluster context and add to request header
-        final String serializedClusterCtx = WebUtils.serializeObjectToHex(clusterCtx);
-        updatedHeaders.put(CLUSTER_CONTEXT_HTTP_HEADER, serializedClusterCtx);
+                    final Set<NodeResponse> nodeResponses;
+                    if (entity == null) {
+                        nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, parameters, updatedHeaders);
+                    } else {
+                        nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, entity, updatedHeaders);
+                    }
 
-        // if the request is mutable, we need to verify that it is a valid request for all nodes in the cluster.
-        if (mutableRequest) {
-            updatedHeaders.put(NCM_EXPECTS_HTTP_HEADER, "150-NodeContinue");
+                    updatedHeaders.remove(NCM_EXPECTS_HTTP_HEADER);
 
-            final Set<NodeResponse> nodeResponses;
-            if (entity == null) {
-                nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, parameters, updatedHeaders);
-            } else {
-                nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, entity, updatedHeaders);
-            }
+                    for (final NodeResponse response : nodeResponses) {
+                        if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) {
+                            final String nodeDescription = response.getNodeId().getApiAddress() + ":" + response.getNodeId().getApiPort();
+                            final ClientResponse clientResponse = response.getClientResponse();
+                            if (clientResponse == null) {
+                                throw new IllegalClusterStateException("Node " + nodeDescription + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus());
+                            }
+                            final String nodeExplanation = clientResponse.getEntity(String.class);
+                            throw new IllegalClusterStateException("Node " + nodeDescription + " is unable to fulfill this request due to: " + nodeExplanation, response.getThrowable());
+                        }
+                    }
 
-            updatedHeaders.remove(NCM_EXPECTS_HTTP_HEADER);
+                    // set flow state to unknown to denote a mutable request replication in progress
+                    logger.debug("Setting Flow State to UNKNOWN due to mutable request to {} {}", method, uri);
+                    notifyDataFlowManagmentServiceOfFlowStateChange(PersistedFlowState.UNKNOWN);
+                }
 
-            for (final NodeResponse response : nodeResponses) {
-                if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) {
-                    final String nodeDescription = response.getNodeId().getApiAddress() + ":" + response.getNodeId().getApiPort();
-                    final ClientResponse clientResponse = response.getClientResponse();
-                    if (clientResponse == null) {
-                        throw new IllegalClusterStateException("Node " + nodeDescription + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus());
+                // replicate request
+                final Set<NodeResponse> nodeResponses;
+                try {
+                    if (entity == null) {
+                        nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, parameters, updatedHeaders);
+                    } else {
+                        nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, entity, updatedHeaders);
+                    }
+                } catch (final UriConstructionException uce) {
+                    // request was not replicated, so mark the flow with its original state
+                    if (mutableRequest) {
+                        notifyDataFlowManagmentServiceOfFlowStateChange(originalPersistedFlowState);
                     }
-                    final String nodeExplanation = clientResponse.getEntity(String.class);
-                    throw new IllegalClusterStateException("Node " + nodeDescription + " is unable to fulfill this request due to: " + nodeExplanation, response.getThrowable());
-                }
-            }
 
-            // set flow state to unknown to denote a mutable request replication in progress
-            logger.debug("Setting Flow State to UNKNOWN due to mutable request to {} {}", method, uri);
-            notifyDataFlowManagmentServiceOfFlowStateChange(PersistedFlowState.UNKNOWN);
-        }
+                    throw uce;
+                }
 
-        // replicate request
-        final Set<NodeResponse> nodeResponses;
-        try {
-            if (entity == null) {
-                nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, parameters, updatedHeaders);
-            } else {
-                nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, entity, updatedHeaders);
-            }
-        } catch (final UriConstructionException uce) {
-            // request was not replicated, so mark the flow with its original state
-            if (mutableRequest) {
-                notifyDataFlowManagmentServiceOfFlowStateChange(originalPersistedFlowState);
+                // merge the response
+                final NodeResponse clientResponse = mergeResponses(uri, method, nodeResponses, mutableRequest);
+                holder.set(clientResponse);
+                
+                // if we have a response get the updated cluster context for auditing and revision updating
+                Revision updatedRevision = null;
+                if (mutableRequest && clientResponse != null) {
+                    try {
+                        // get the cluster context from the response header
+                        final String serializedClusterContext = clientResponse.getClientResponse().getHeaders().getFirst(CLUSTER_CONTEXT_HTTP_HEADER);
+                        if (StringUtils.isNotBlank(serializedClusterContext)) {
+                            // deserialize object
+                            final Serializable clusterContextObj = WebUtils.deserializeHexToObject(serializedClusterContext);
+
+                            // if we have a valid object, audit the actions
+                            if (clusterContextObj instanceof ClusterContext) {
+                                final ClusterContext clusterContext = (ClusterContext) clusterContextObj;
+                                if (auditService != null) {
+                                    try {
+                                        auditService.addActions(clusterContext.getActions());
+                                    } catch (Throwable t) {
+                                        logger.warn("Unable to record actions: " + t.getMessage());
+                                        if (logger.isDebugEnabled()) {
+                                            logger.warn(StringUtils.EMPTY, t);
+                                        }
+                                    }
+                                }
+                                updatedRevision = clusterContext.getRevision();
+                            }
+                        }
+                    } catch (final ClassNotFoundException cnfe) {
+                        logger.warn("Classpath issue detected because failed to deserialize cluster context from node response due to: " + cnfe, cnfe);
+                    }
+                }
+                
+                return updatedRevision;
             }
-
-            throw uce;
+        };
+        
+        // federate the request and lock on the revision
+        if (mutableRequest) {
+            optimisticLockingManager.setRevision(federateRequest);
+        } else {
+            federateRequest.execute(optimisticLockingManager.getLastModification().getRevision());
         }
-
-        final NodeResponse clientResponse = mergeResponses(uri, method, nodeResponses, mutableRequest);
-        return clientResponse;
+        
+        return holder.get();
     }
 
     private static boolean isProcessorsEndpoint(final URI uri, final String method) {
@@ -2783,36 +2834,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     logger.warn("All nodes failed to process URI {}. As a result, no node will be disconnected from cluster", uri);
                 }
             }
-
-            // if at least one node satisfied the request, then audit the action 
-            if (hasClientResponse) {
-                try {
-                    // get the cluster context from the response header
-                    final String serializedClusterContext = clientResponse.getClientResponse().getHeaders().getFirst(CLUSTER_CONTEXT_HTTP_HEADER);
-                    if (StringUtils.isNotBlank(serializedClusterContext)) {
-                        // deserialize object
-                        final Serializable clusterContextObj = WebUtils.deserializeHexToObject(serializedClusterContext);
-
-                        // if we have a valid object, audit the actions
-                        if (clusterContextObj instanceof ClusterContext) {
-                            final ClusterContext clusterContext = (ClusterContext) clusterContextObj;
-                            if (auditService != null) {
-                                try {
-                                    auditService.addActions(clusterContext.getActions());
-                                } catch (Throwable t) {
-                                    logger.warn("Unable to record actions: " + t.getMessage());
-                                    if (logger.isDebugEnabled()) {
-                                        logger.warn(StringUtils.EMPTY, t);
-                                    }
-                                }
-                            }
-                            optimisticLockingManager.setRevision(clusterContext.getRevision());
-                        }
-                    }
-                } catch (final ClassNotFoundException cnfe) {
-                    logger.warn("Classpath issue detected because failed to deserialize cluster context from node response due to: " + cnfe, cnfe);
-                }
-            }
         }
 
         return clientResponse;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/NiFiAuditor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/NiFiAuditor.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/NiFiAuditor.java
index 231f3e3..adff9d1 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/NiFiAuditor.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/NiFiAuditor.java
@@ -57,12 +57,10 @@ public abstract class NiFiAuditor {
      * @param logger
      */
     protected void saveActions(Collection<Action> actions, Logger logger) {
-        /*
-         * if we're a clustered node, then set actions on threadlocal
-         */
-        if (serviceFacade.isClustered()) {
-            // if we're a connected node, then put audit actions on threadlocal to propagate back to manager
-            ClusterContext ctx = ClusterContextThreadLocal.getContext();
+        ClusterContext ctx = ClusterContextThreadLocal.getContext();
+        
+        // if we're a connected node, then put audit actions on threadlocal to propagate back to manager
+        if (ctx != null) {
             ctx.getActions().addAll(actions);
         } else {
             // if we're the cluster manager, or a disconnected node, or running standalone, then audit actions


[2/3] incubator-nifi git commit: NIFI-250: - Refactoring revision checking so that we can lock appropriately on the Cluster Manager to manage controller services running there while other concurrent requests can be replicated amongst the cluster.

Posted by mc...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index f953585..f56d787 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -166,8 +166,6 @@ import org.springframework.security.access.AccessDeniedException;
 public class StandardNiFiServiceFacade implements NiFiServiceFacade {
 
     private static final Logger logger = LoggerFactory.getLogger(StandardNiFiServiceFacade.class);
-    private static final String INVALID_REVISION_ERROR = "Given revision %s does not match current revision %s.";
-    private static final String SYNC_ERROR = "This NiFi instance has been updated by '%s'. Please refresh to synchronize the view.";
 
     // nifi core components
     private ControllerFacade controllerFacade;
@@ -200,51 +198,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     private NiFiProperties properties;
     private DtoFactory dtoFactory;
 
-    /**
-     * Checks the specified revision against the current revision.
-     *
-     * @param revision The revision to check
-     * @param clientId The client id
-     * @return Whether or not the request should proceed
-     * @throws NiFiCoreException If the specified revision is not current
-     */
-    private void checkRevision(Revision revision) {
-
-        boolean approved = optimisticLockingManager.isCurrent(revision);
-
-        if (!approved) {
-            Revision currentRevision = optimisticLockingManager.getRevision();
-            logger.debug("Revision check failed because current revision is " + currentRevision + " but supplied revision is " + revision);
-
-            if (StringUtils.isBlank(currentRevision.getClientId()) || currentRevision.getVersion() == null) {
-                throw new InvalidRevisionException(String.format(INVALID_REVISION_ERROR, revision, currentRevision));
-            } else {
-                throw new InvalidRevisionException(String.format(SYNC_ERROR, optimisticLockingManager.getLastModifier()));
-            }
-        }
-    }
-
-    /**
-     * Increments the revision and updates the last modifier.
-     *
-     * @param revision
-     * @return
-     */
-    private Revision updateRevision(Revision revision) {
-        // update the client id and modifier
-        final Revision updatedRevision = optimisticLockingManager.incrementRevision(revision.getClientId());
-
-        // get the nifi user to extract the username
-        NiFiUser user = NiFiUserUtils.getNiFiUser();
-        if (user == null) {
-            optimisticLockingManager.setLastModifier("unknown");
-        } else {
-            optimisticLockingManager.setLastModifier(user.getUserName());
-        }
-
-        return updatedRevision;
-    }
-
     // -----------------------------------------
     // Verification Operations
     // -----------------------------------------
@@ -381,96 +334,85 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     // -----------------------------------------
     
     @Override
-    public ConfigurationSnapshot<ConnectionDTO> updateConnection(Revision revision, String groupId, ConnectionDTO connectionDTO) {
-
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
+    public ConfigurationSnapshot<ConnectionDTO> updateConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) {
         // if connection does not exist, then create new connection
         if (connectionDAO.hasConnection(groupId, connectionDTO.getId()) == false) {
             return createConnection(revision, groupId, connectionDTO);
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ConnectionDTO>() {
+            @Override
+            public ConnectionDTO execute() {
+                final Connection connection = connectionDAO.updateConnection(groupId, connectionDTO);
 
-        final Connection connection = connectionDAO.updateConnection(groupId, connectionDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<ConnectionDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createConnectionDto(connection));
-
-        // save the flow
-        controllerFacade.save();
-
-        return response;
+                controllerFacade.save();
+                
+                return dtoFactory.createConnectionDto(connection);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<ProcessorDTO> updateProcessor(Revision revision, String groupId, ProcessorDTO processorDTO) {
-
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
+    public ConfigurationSnapshot<ProcessorDTO> updateProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) {
         // if processor does not exist, then create new processor
         if (processorDAO.hasProcessor(groupId, processorDTO.getId()) == false) {
             return createProcessor(revision, groupId, processorDTO);
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() {
+            @Override
+            public ProcessorDTO execute() {
+                // update the processor
+                ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
 
-        // update the processor
-        ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<ProcessorDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessorDto(processor));
-
-        // save the flow
-        controllerFacade.save();
-
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return dtoFactory.createProcessorDto(processor);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<LabelDTO> updateLabel(Revision revision, String groupId, LabelDTO labelDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
+    public ConfigurationSnapshot<LabelDTO> updateLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) {
         // if label does not exist, then create new label
         if (labelDAO.hasLabel(groupId, labelDTO.getId()) == false) {
             return createLabel(revision, groupId, labelDTO);
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<LabelDTO>() {
+            @Override
+            public LabelDTO execute() {
+                // update the existing label
+                final Label label = labelDAO.updateLabel(groupId, labelDTO);
 
-        // update the existing label
-        final Label label = labelDAO.updateLabel(groupId, labelDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<LabelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createLabelDto(label));
-
-        // save updated controller
-        controllerFacade.save();
-
-        return response;
+                // save updated controller
+                controllerFacade.save();
+                
+                return dtoFactory.createLabelDto(label);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<FunnelDTO> updateFunnel(Revision revision, String groupId, FunnelDTO funnelDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
+    public ConfigurationSnapshot<FunnelDTO> updateFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) {
         // if label does not exist, then create new label
         if (funnelDAO.hasFunnel(groupId, funnelDTO.getId()) == false) {
             return createFunnel(revision, groupId, funnelDTO);
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FunnelDTO>() {
+            @Override
+            public FunnelDTO execute() {
+                // update the existing label
+                final Funnel funnel = funnelDAO.updateFunnel(groupId, funnelDTO);
 
-        // update the existing label
-        final Funnel funnel = funnelDAO.updateFunnel(groupId, funnelDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<FunnelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createFunnelDto(funnel));
-
-        // save updated controller
-        controllerFacade.save();
-
-        return response;
+                // save updated controller
+                controllerFacade.save();
+                
+                return dtoFactory.createFunnelDto(funnel);
+            }
+        });
     }
 
     @Override
@@ -483,141 +425,126 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
-    public ConfigurationSnapshot<SnippetDTO> updateSnippet(Revision revision, SnippetDTO snippetDto) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
+    public ConfigurationSnapshot<SnippetDTO> updateSnippet(final Revision revision, final SnippetDTO snippetDto) {
         // if label does not exist, then create new label
         if (snippetDAO.hasSnippet(snippetDto.getId()) == false) {
             return createSnippet(revision, snippetDto);
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<SnippetDTO>() {
+            @Override
+            public SnippetDTO execute() {
+                // update the snippet
+                final Snippet snippet = snippetDAO.updateSnippet(snippetDto);
 
-        // update the snippet
-        final Snippet snippet = snippetDAO.updateSnippet(snippetDto);
-
-        // build the snippet dto
-        final SnippetDTO responseSnippetDto = dtoFactory.createSnippetDto(snippet);
-        responseSnippetDto.setContents(snippetUtils.populateFlowSnippet(snippet, false));
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<SnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), responseSnippetDto);
-
-        // save updated controller if applicable
-        if (snippetDto.getParentGroupId() != null && snippet.isLinked()) {
-            controllerFacade.save();
-        }
+                // build the snippet dto
+                final SnippetDTO responseSnippetDto = dtoFactory.createSnippetDto(snippet);
+                responseSnippetDto.setContents(snippetUtils.populateFlowSnippet(snippet, false));
 
-        return response;
+                // save updated controller if applicable
+                if (snippetDto.getParentGroupId() != null && snippet.isLinked()) {
+                    controllerFacade.save();
+                }
+                
+                return responseSnippetDto;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<PortDTO> updateInputPort(Revision revision, String groupId, PortDTO inputPortDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
+    public ConfigurationSnapshot<PortDTO> updateInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) {
         // if input port does not exist, then create new input port
         if (inputPortDAO.hasPort(groupId, inputPortDTO.getId()) == false) {
             return createInputPort(revision, groupId, inputPortDTO);
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
+            @Override
+            public PortDTO execute() {
+                final Port inputPort = inputPortDAO.updatePort(groupId, inputPortDTO);
 
-        final Port inputPort = inputPortDAO.updatePort(groupId, inputPortDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(inputPort));
-
-        // save updated controller
-        controllerFacade.save();
-
-        return response;
+                // save updated controller
+                controllerFacade.save();
+                
+                return  dtoFactory.createPortDto(inputPort);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<PortDTO> updateOutputPort(Revision revision, String groupId, PortDTO outputPortDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
+    public ConfigurationSnapshot<PortDTO> updateOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) {
         // if output port does not exist, then create new output port
         if (outputPortDAO.hasPort(groupId, outputPortDTO.getId()) == false) {
             return createOutputPort(revision, groupId, outputPortDTO);
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
+            @Override
+            public PortDTO execute() {
+                final Port outputPort = outputPortDAO.updatePort(groupId, outputPortDTO);
 
-        final Port outputPort = outputPortDAO.updatePort(groupId, outputPortDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(outputPort));
-
-        // save updated controller
-        controllerFacade.save();
-
-        return response;
+                // save updated controller
+                controllerFacade.save();
+                
+                return dtoFactory.createPortDto(outputPort);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<RemoteProcessGroupDTO> updateRemoteProcessGroup(Revision revision, String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
+    public ConfigurationSnapshot<RemoteProcessGroupDTO> updateRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) {
         // if controller reference does not exist, then create new controller reference
         if (remoteProcessGroupDAO.hasRemoteProcessGroup(groupId, remoteProcessGroupDTO.getId()) == false) {
             return createRemoteProcessGroup(revision, groupId, remoteProcessGroupDTO);
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupDTO>() {
+            @Override
+            public RemoteProcessGroupDTO execute() {
+                RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.updateRemoteProcessGroup(groupId, remoteProcessGroupDTO);
 
-        RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.updateRemoteProcessGroup(groupId, remoteProcessGroupDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<RemoteProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
-
-        // save updated controller
-        controllerFacade.save();
-
-        return response;
+                // save updated controller
+                controllerFacade.save();
+                
+                return dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupInputPort(Revision revision, String groupId, String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // update the remote port
-        RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<RemoteProcessGroupPortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort));
+    public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupInputPort(final Revision revision, final String groupId, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupPortDTO>() {
+            @Override
+            public RemoteProcessGroupPortDTO execute() {
+                // update the remote port
+                RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
 
-        // save updated controller
-        controllerFacade.save();
-
-        return response;
+                // save updated controller
+                controllerFacade.save();
+                
+                return dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupOutputPort(Revision revision, String groupId, String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // update the remote port
-        RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
+    public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupOutputPort(final Revision revision, final String groupId, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupPortDTO>() {
+            @Override
+            public RemoteProcessGroupPortDTO execute() {
+                // update the remote port
+                RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
 
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<RemoteProcessGroupPortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort));
-
-        // save updated controller
-        controllerFacade.save();
-
-        return response;
+                // save updated controller
+                controllerFacade.save();
+                
+                return dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<ProcessGroupDTO> updateProcessGroup(Revision revision, String parentGroupId, ProcessGroupDTO processGroupDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
+    public ConfigurationSnapshot<ProcessGroupDTO> updateProcessGroup(final Revision revision, final String parentGroupId, final ProcessGroupDTO processGroupDTO) {
         // if process group does not exist, then create new process group
         if (processGroupDAO.hasProcessGroup(processGroupDTO.getId()) == false) {
             if (parentGroupId == null) {
@@ -626,50 +553,49 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
                 return createProcessGroup(parentGroupId, revision, processGroupDTO);
             }
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessGroupDTO>() {
+            @Override
+            public ProcessGroupDTO execute() {
+                // update the process group
+                ProcessGroup processGroup = processGroupDAO.updateProcessGroup(processGroupDTO);
 
-        // update the process group
-        ProcessGroup processGroup = processGroupDAO.updateProcessGroup(processGroupDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessGroupDto(processGroup));
-
-        // save updated controller
-        controllerFacade.save();
-
-        return response;
+                // save updated controller
+                controllerFacade.save();
+                
+                return dtoFactory.createProcessGroupDto(processGroup);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<ControllerConfigurationDTO> updateControllerConfiguration(Revision revision, ControllerConfigurationDTO controllerConfigurationDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // update the controller configuration through the proxy
-        if (controllerConfigurationDTO.getName() != null) {
-            controllerFacade.setName(controllerConfigurationDTO.getName());
-        }
-        if (controllerConfigurationDTO.getComments() != null) {
-            controllerFacade.setComments(controllerConfigurationDTO.getComments());
-        }
-        if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) {
-            controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount());
-        }
-        if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) {
-            controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount());
-        }
-
-        // create the controller configuration dto
-        ControllerConfigurationDTO controllerConfig = getControllerConfiguration();
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<ControllerConfigurationDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), controllerConfig);
+    public ConfigurationSnapshot<ControllerConfigurationDTO> updateControllerConfiguration(final Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ControllerConfigurationDTO>() {
+            @Override
+            public ControllerConfigurationDTO execute() {
+                // update the controller configuration through the proxy
+                if (controllerConfigurationDTO.getName() != null) {
+                    controllerFacade.setName(controllerConfigurationDTO.getName());
+                }
+                if (controllerConfigurationDTO.getComments() != null) {
+                    controllerFacade.setComments(controllerConfigurationDTO.getComments());
+                }
+                if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) {
+                    controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount());
+                }
+                if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) {
+                    controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount());
+                }
 
-        // save the flow
-        controllerFacade.save();
+                // create the controller configuration dto
+                ControllerConfigurationDTO controllerConfig = getControllerConfiguration();
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return controllerConfig;
+            }
+        });
     }
 
     @Override
@@ -702,74 +628,66 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
-    public ConfigurationSnapshot<Void> deleteConnection(Revision revision, String groupId, String connectionId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
+    public ConfigurationSnapshot<Void> deleteConnection(final Revision revision, final String groupId, final String connectionId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>(){
+            @Override
+            public Void execute() {
+                connectionDAO.deleteConnection(groupId, connectionId);
 
-        connectionDAO.deleteConnection(groupId, connectionId);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
-        // save the flow
-        controllerFacade.save();
-
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return null;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<Void> deleteProcessor(Revision revision, String groupId, String processorId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // delete the processor and synchronize the connection state
-        processorDAO.deleteProcessor(groupId, processorId);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+    public ConfigurationSnapshot<Void> deleteProcessor(final Revision revision, final String groupId, final String processorId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                // delete the processor and synchronize the connection state
+                processorDAO.deleteProcessor(groupId, processorId);
 
-        // save the flow
-        controllerFacade.save();
-
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return null;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<Void> deleteLabel(Revision revision, String groupId, String labelId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // delete the label
-        labelDAO.deleteLabel(groupId, labelId);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
-        // save the flow
-        controllerFacade.save();
+    public ConfigurationSnapshot<Void> deleteLabel(final Revision revision, final String groupId, final String labelId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                // delete the label
+                labelDAO.deleteLabel(groupId, labelId);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return null;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<Void> deleteFunnel(Revision revision, String groupId, String funnelId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // delete the label
-        funnelDAO.deleteFunnel(groupId, funnelId);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
-        // save the flow
-        controllerFacade.save();
+    public ConfigurationSnapshot<Void> deleteFunnel(final Revision revision, final String groupId, final String funnelId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                // delete the label
+                funnelDAO.deleteFunnel(groupId, funnelId);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return null;
+            }
+        });
     }
 
     @Override
@@ -778,95 +696,85 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
-    public ConfigurationSnapshot<Void> deleteSnippet(Revision revision, String snippetId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // determine if this snippet was linked to the data flow
-        Snippet snippet = snippetDAO.getSnippet(snippetId);
-        boolean linked = snippet.isLinked();
+    public ConfigurationSnapshot<Void> deleteSnippet(final Revision revision, final String snippetId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                // determine if this snippet was linked to the data flow
+                Snippet snippet = snippetDAO.getSnippet(snippetId);
+                boolean linked = snippet.isLinked();
 
-        // delete the snippet
-        snippetDAO.deleteSnippet(snippetId);
+                // delete the snippet
+                snippetDAO.deleteSnippet(snippetId);
 
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
-        // save the flow if necessary
-        if (linked) {
-            controllerFacade.save();
-        }
-
-        return response;
+                // save the flow if necessary
+                if (linked) {
+                    controllerFacade.save();
+                }
+                
+                return null;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<Void> deleteInputPort(Revision revision, String groupId, String inputPortId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        inputPortDAO.deletePort(groupId, inputPortId);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
-        // save the flow
-        controllerFacade.save();
+    public ConfigurationSnapshot<Void> deleteInputPort(final Revision revision, final String groupId, final String inputPortId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                inputPortDAO.deletePort(groupId, inputPortId);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return null;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<Void> deleteOutputPort(Revision revision, String groupId, String outputPortId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        outputPortDAO.deletePort(groupId, outputPortId);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+    public ConfigurationSnapshot<Void> deleteOutputPort(final Revision revision, final String groupId, final String outputPortId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                outputPortDAO.deletePort(groupId, outputPortId);
 
-        // save the flow
-        controllerFacade.save();
-
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return null;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<Void> deleteProcessGroup(Revision revision, String groupId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        processGroupDAO.deleteProcessGroup(groupId);
+    public ConfigurationSnapshot<Void> deleteProcessGroup(final Revision revision, final String groupId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                processGroupDAO.deleteProcessGroup(groupId);
 
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
-        // save the flow
-        controllerFacade.save();
-
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return null;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<Void> deleteRemoteProcessGroup(Revision revision, String groupId, String remoteProcessGroupId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        remoteProcessGroupDAO.deleteRemoteProcessGroup(groupId, remoteProcessGroupId);
+    public ConfigurationSnapshot<Void> deleteRemoteProcessGroup(final Revision revision, final String groupId, final String remoteProcessGroupId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                remoteProcessGroupDAO.deleteRemoteProcessGroup(groupId, remoteProcessGroupId);
 
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
-        // save the flow
-        controllerFacade.save();
-
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return null;
+            }
+        });
     }
 
     @Override
@@ -876,97 +784,86 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
-    public ConfigurationSnapshot<ConnectionDTO> createConnection(Revision revision, String groupId, ConnectionDTO connectionDTO) {
-
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(connectionDTO.getId())) {
-            connectionDTO.setId(UUID.randomUUID().toString());
-        }
-
-        final Connection connection = connectionDAO.createConnection(groupId, connectionDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<ConnectionDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createConnectionDto(connection));
+    public ConfigurationSnapshot<ConnectionDTO> createConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ConnectionDTO>() {
+            @Override
+            public ConnectionDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(connectionDTO.getId())) {
+                    connectionDTO.setId(UUID.randomUUID().toString());
+                }
 
-        // save the flow
-        controllerFacade.save();
+                final Connection connection = connectionDAO.createConnection(groupId, connectionDTO);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return dtoFactory.createConnectionDto(connection);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<ProcessorDTO> createProcessor(Revision revision, String groupId, ProcessorDTO processorDTO) {
-
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(processorDTO.getId())) {
-            processorDTO.setId(UUID.randomUUID().toString());
-        }
-
-        // create the processor
-        final ProcessorNode processor = processorDAO.createProcessor(groupId, processorDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<ProcessorDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessorDto(processor));
+    public ConfigurationSnapshot<ProcessorDTO> createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() {
+            @Override
+            public ProcessorDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(processorDTO.getId())) {
+                    processorDTO.setId(UUID.randomUUID().toString());
+                }
 
-        // save the flow
-        controllerFacade.save();
+                // create the processor
+                final ProcessorNode processor = processorDAO.createProcessor(groupId, processorDTO);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return dtoFactory.createProcessorDto(processor);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<LabelDTO> createLabel(Revision revision, String groupId, LabelDTO labelDTO) {
-
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(labelDTO.getId())) {
-            labelDTO.setId(UUID.randomUUID().toString());
-        }
-
-        // add the label
-        final Label label = labelDAO.createLabel(groupId, labelDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<LabelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createLabelDto(label));
+    public ConfigurationSnapshot<LabelDTO> createLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<LabelDTO>() {
+            @Override
+            public LabelDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(labelDTO.getId())) {
+                    labelDTO.setId(UUID.randomUUID().toString());
+                }
 
-        // save the flow
-        controllerFacade.save();
+                // add the label
+                final Label label = labelDAO.createLabel(groupId, labelDTO);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return dtoFactory.createLabelDto(label);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<FunnelDTO> createFunnel(Revision revision, String groupId, FunnelDTO funnelDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(funnelDTO.getId())) {
-            funnelDTO.setId(UUID.randomUUID().toString());
-        }
-
-        // add the label
-        final Funnel funnel = funnelDAO.createFunnel(groupId, funnelDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<FunnelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createFunnelDto(funnel));
+    public ConfigurationSnapshot<FunnelDTO> createFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FunnelDTO>() {
+            @Override
+            public FunnelDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(funnelDTO.getId())) {
+                    funnelDTO.setId(UUID.randomUUID().toString());
+                }
 
-        // save the flow
-        controllerFacade.save();
+                // add the label
+                final Funnel funnel = funnelDAO.createFunnel(groupId, funnelDTO);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return dtoFactory.createFunnelDto(funnel);
+            }
+        });
     }
 
     private void validateSnippetContents(final FlowSnippetDTO flowSnippet, final String groupId) {
@@ -1025,139 +922,129 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
-    public ConfigurationSnapshot<FlowSnippetDTO> copySnippet(Revision revision, String groupId, String snippetId, Double originX, Double originY) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(snippetId)) {
-            snippetId = UUID.randomUUID().toString();
-        }
-
-        // create the new snippet
-        FlowSnippetDTO flowSnippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY);
-
-        // validate the new snippet
-        validateSnippetContents(flowSnippet, groupId);
+    public ConfigurationSnapshot<FlowSnippetDTO> copySnippet(final Revision revision, final String groupId, final String snippetId, final Double originX, final Double originY) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FlowSnippetDTO>() {
+            @Override
+            public FlowSnippetDTO execute() {
+                String id = snippetId;
+                
+                // ensure id is set
+                if (StringUtils.isBlank(id)) {
+                    id = UUID.randomUUID().toString();
+                }
 
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<FlowSnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), flowSnippet);
+                // create the new snippet
+                FlowSnippetDTO flowSnippet = snippetDAO.copySnippet(groupId, id, originX, originY);
 
-        // save the flow
-        controllerFacade.save();
+                // validate the new snippet
+                validateSnippetContents(flowSnippet, groupId);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return flowSnippet;
+            }
+        });
     }
 
     @Override
     public ConfigurationSnapshot<SnippetDTO> createSnippet(final Revision revision, final SnippetDTO snippetDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(snippetDTO.getId())) {
-            snippetDTO.setId(UUID.randomUUID().toString());
-        }
-
-        // add the snippet
-        final Snippet snippet = snippetDAO.createSnippet(snippetDTO);
-        final SnippetDTO responseSnippetDTO = dtoFactory.createSnippetDto(snippet);
-        responseSnippetDTO.setContents(snippetUtils.populateFlowSnippet(snippet, false));
-
-        // create the response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<SnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), responseSnippetDTO);
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<SnippetDTO>() {
+            @Override
+            public SnippetDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(snippetDTO.getId())) {
+                    snippetDTO.setId(UUID.randomUUID().toString());
+                }
 
-        return response;
+                // add the snippet
+                final Snippet snippet = snippetDAO.createSnippet(snippetDTO);
+                final SnippetDTO responseSnippetDTO = dtoFactory.createSnippetDto(snippet);
+                responseSnippetDTO.setContents(snippetUtils.populateFlowSnippet(snippet, false));
+                
+                return responseSnippetDTO;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<PortDTO> createInputPort(Revision revision, String groupId, PortDTO inputPortDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(inputPortDTO.getId())) {
-            inputPortDTO.setId(UUID.randomUUID().toString());
-        }
-
-        final Port inputPort = inputPortDAO.createPort(groupId, inputPortDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(inputPort));
+    public ConfigurationSnapshot<PortDTO> createInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
+            @Override
+            public PortDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(inputPortDTO.getId())) {
+                    inputPortDTO.setId(UUID.randomUUID().toString());
+                }
 
-        // save the flow
-        controllerFacade.save();
+                final Port inputPort = inputPortDAO.createPort(groupId, inputPortDTO);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return dtoFactory.createPortDto(inputPort);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<PortDTO> createOutputPort(Revision revision, String groupId, PortDTO outputPortDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(outputPortDTO.getId())) {
-            outputPortDTO.setId(UUID.randomUUID().toString());
-        }
-
-        final Port outputPort = outputPortDAO.createPort(groupId, outputPortDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(outputPort));
+    public ConfigurationSnapshot<PortDTO> createOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
+            @Override
+            public PortDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(outputPortDTO.getId())) {
+                    outputPortDTO.setId(UUID.randomUUID().toString());
+                }
 
-        // save the flow
-        controllerFacade.save();
+                final Port outputPort = outputPortDAO.createPort(groupId, outputPortDTO);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return dtoFactory.createPortDto(outputPort);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<ProcessGroupDTO> createProcessGroup(String parentGroupId, Revision revision, ProcessGroupDTO processGroupDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(processGroupDTO.getId())) {
-            processGroupDTO.setId(UUID.randomUUID().toString());
-        }
-
-        final ProcessGroup processGroup = processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessGroupDto(processGroup));
+    public ConfigurationSnapshot<ProcessGroupDTO> createProcessGroup(final String parentGroupId, final Revision revision, final ProcessGroupDTO processGroupDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessGroupDTO>() {
+            @Override
+            public ProcessGroupDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(processGroupDTO.getId())) {
+                    processGroupDTO.setId(UUID.randomUUID().toString());
+                }
 
-        // save the flow
-        controllerFacade.save();
+                final ProcessGroup processGroup = processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return dtoFactory.createProcessGroupDto(processGroup);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<RemoteProcessGroupDTO> createRemoteProcessGroup(Revision revision, String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(remoteProcessGroupDTO.getId())) {
-            remoteProcessGroupDTO.setId(UUID.randomUUID().toString());
-        }
-
-        final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<RemoteProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
+    public ConfigurationSnapshot<RemoteProcessGroupDTO> createRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupDTO>() {
+            @Override
+            public RemoteProcessGroupDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(remoteProcessGroupDTO.getId())) {
+                    remoteProcessGroupDTO.setId(UUID.randomUUID().toString());
+                }
 
-        // save the flow
-        controllerFacade.save();
+                final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup);
+            }
+        });
     }
 
     @Override
@@ -1203,148 +1090,137 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
-    public ConfigurationSnapshot<FlowSnippetDTO> createTemplateInstance(Revision revision, String groupId, Double originX, Double originY, String templateId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // instantiate the template - there is no need to make another copy of the flow snippet since the actual template
-        // was copied and this dto is only used to instantiate it's components (which as already completed)
-        FlowSnippetDTO flowSnippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId);
+    public ConfigurationSnapshot<FlowSnippetDTO> createTemplateInstance(final Revision revision, final String groupId, final Double originX, final Double originY, final String templateId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FlowSnippetDTO>() {
+            @Override
+            public FlowSnippetDTO execute() {
+                // instantiate the template - there is no need to make another copy of the flow snippet since the actual template
+                // was copied and this dto is only used to instantiate it's components (which as already completed)
+                FlowSnippetDTO flowSnippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId);
 
-        // validate the new snippet
-        validateSnippetContents(flowSnippet, groupId);
+                // validate the new snippet
+                validateSnippetContents(flowSnippet, groupId);
 
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<FlowSnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), flowSnippet);
-
-        // save the flow
-        controllerFacade.save();
-
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return flowSnippet;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<Void> createArchive(Revision revision) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // create the archive
-        controllerFacade.createArchive();
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-        return response;
+    public ConfigurationSnapshot<Void> createArchive(final Revision revision) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                // create the archive
+                controllerFacade.createArchive();
+                return null;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<ProcessorDTO> setProcessorAnnotationData(Revision revision, String processorId, String annotationData) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // create the processor config
-        final ProcessorConfigDTO config = new ProcessorConfigDTO();
-        config.setAnnotationData(annotationData);
+    public ConfigurationSnapshot<ProcessorDTO> setProcessorAnnotationData(final Revision revision, final String processorId, final String annotationData) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() {
+            @Override
+            public ProcessorDTO execute() {
+                // create the processor config
+                final ProcessorConfigDTO config = new ProcessorConfigDTO();
+                config.setAnnotationData(annotationData);
 
-        // create the processor dto
-        final ProcessorDTO processorDTO = new ProcessorDTO();
-        processorDTO.setId(processorId);
-        processorDTO.setConfig(config);
+                // create the processor dto
+                final ProcessorDTO processorDTO = new ProcessorDTO();
+                processorDTO.setId(processorId);
+                processorDTO.setConfig(config);
 
-        // get the parent group id for the specified processor
-        String groupId = controllerFacade.findProcessGroupIdForProcessor(processorId);
+                // get the parent group id for the specified processor
+                String groupId = controllerFacade.findProcessGroupIdForProcessor(processorId);
 
-        // ensure the parent group id was found
-        if (groupId == null) {
-            throw new ResourceNotFoundException(String.format("Unable to locate Processor with id '%s'.", processorId));
-        }
-
-        // update the processor configuration
-        ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
+                // ensure the parent group id was found
+                if (groupId == null) {
+                    throw new ResourceNotFoundException(String.format("Unable to locate Processor with id '%s'.", processorId));
+                }
 
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<ProcessorDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessorDto(processor));
+                // update the processor configuration
+                ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
 
-        // save the flow
-        controllerFacade.save();
+                // save the flow
+                controllerFacade.save();
 
-        return response;
+                return dtoFactory.createProcessorDto(processor);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<ControllerServiceDTO> createControllerService(Revision revision, ControllerServiceDTO controllerServiceDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(controllerServiceDTO.getId())) {
-            controllerServiceDTO.setId(UUID.randomUUID().toString());
-        }
-
-        final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<ControllerServiceDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createControllerServiceDto(controllerService));
+    public ConfigurationSnapshot<ControllerServiceDTO> createControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ControllerServiceDTO>() {
+            @Override
+            public ControllerServiceDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(controllerServiceDTO.getId())) {
+                    controllerServiceDTO.setId(UUID.randomUUID().toString());
+                }
 
-        // save the update
-        if (properties.isClusterManager()) {
-            clusterManager.saveControllerServices();
-        } else {
-            controllerFacade.save();
-        }
+                // create the controller service
+                final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO);
 
-        return response;
+                // save the update
+                if (properties.isClusterManager()) {
+                    clusterManager.saveControllerServices();
+                } else {
+                    controllerFacade.save();
+                }
+                
+                return dtoFactory.createControllerServiceDto(controllerService);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<ControllerServiceDTO> updateControllerService(Revision revision, ControllerServiceDTO controllerServiceDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
+    public ConfigurationSnapshot<ControllerServiceDTO> updateControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) {
         // if controller service does not exist, then create new controller service
         if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId()) == false) {
             return createControllerService(revision, controllerServiceDTO);
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ControllerServiceDTO>() {
+            @Override
+            public ControllerServiceDTO execute() {
+                final ControllerServiceNode controllerService = controllerServiceDAO.updateControllerService(controllerServiceDTO);
 
-        final ControllerServiceNode controllerService = controllerServiceDAO.updateControllerService(controllerServiceDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<ControllerServiceDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createControllerServiceDto(controllerService));
-
-        // save the update
-        if (properties.isClusterManager()) {
-            clusterManager.saveControllerServices();
-        } else {
-            controllerFacade.save();
-        }
+                // save the update
+                if (properties.isClusterManager()) {
+                    clusterManager.saveControllerServices();
+                } else {
+                    controllerFacade.save();
+                }
 
-        return response;
+                return dtoFactory.createControllerServiceDto(controllerService);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<Void> deleteControllerService(Revision revision, String controllerServiceId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // delete the label
-        controllerServiceDAO.deleteControllerService(controllerServiceId);
+    public ConfigurationSnapshot<Void> deleteControllerService(final Revision revision, final String controllerServiceId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                // delete the label
+                controllerServiceDAO.deleteControllerService(controllerServiceId);
 
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
-        // save the update
-        if (properties.isClusterManager()) {
-            clusterManager.saveControllerServices();
-        } else {
-            controllerFacade.save();
-        }
-
-        return response;
+                // save the update
+                if (properties.isClusterManager()) {
+                    clusterManager.saveControllerServices();
+                } else {
+                    controllerFacade.save();
+                }
+                
+                return null;
+            }
+        });
     }
 
     @Override
@@ -1499,9 +1375,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     // -----------------------------------------
     // Read Operations
     // -----------------------------------------
+    
     @Override
     public RevisionDTO getRevision() {
-        return dtoFactory.createRevisionDTO(optimisticLockingManager.getRevision());
+        return dtoFactory.createRevisionDTO(optimisticLockingManager.getLastModification());
     }
 
     @Override
@@ -2031,8 +1908,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     @Override
     public ConfigurationSnapshot<ProcessGroupDTO> getProcessGroup(String groupId, final boolean recurse) {
         ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
-        Long version = optimisticLockingManager.getRevision().getVersion();
-        ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(version, dtoFactory.createProcessGroupDto(processGroup, recurse));
+        Revision revision = optimisticLockingManager.getLastModification().getRevision();
+        ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(revision.getVersion(), dtoFactory.createProcessGroupDto(processGroup, recurse));
         return response;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 1b9ae7d..787fffa 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -178,53 +178,55 @@ public abstract class ApplicationResource {
 
         // get cluster context from threadlocal
         ClusterContext clusterCtx = ClusterContextThreadLocal.getContext();
+        if (clusterCtx != null) {
+            
+            // serialize cluster context
+            String serializedClusterContext = WebUtils.serializeObjectToHex(clusterCtx);
+            if (serializedClusterContext.length() > CLUSTER_CONTEXT_HEADER_VALUE_MAX_BYTES) {
+                /*
+                 * Actions is the only field that can vary in size. If we have no
+                 * actions and we exceeded the header size, then basic assumptions
+                 * about the cluster context have been violated.
+                 */
+                if (clusterCtx.getActions().isEmpty()) {
+                    throw new IllegalStateException(
+                            String.format("Serialized Cluster context size '%d' is too big for response header", serializedClusterContext.length()));
+                }
 
-        // serialize cluster context
-        String serializedClusterContext = WebUtils.serializeObjectToHex(clusterCtx);
-        if (serializedClusterContext.length() > CLUSTER_CONTEXT_HEADER_VALUE_MAX_BYTES) {
-            /*
-             * Actions is the only field that can vary in size. If we have no
-             * actions and we exceeded the header size, then basic assumptions
-             * about the cluster context have been violated.
-             */
-            if (clusterCtx.getActions().isEmpty()) {
-                throw new IllegalStateException(
-                        String.format("Serialized Cluster context size '%d' is too big for response header", serializedClusterContext.length()));
-            }
+                // use the first action as the prototype for creating the "batch" action
+                Action prototypeAction = clusterCtx.getActions().get(0);
 
-            // use the first action as the prototype for creating the "batch" action
-            Action prototypeAction = clusterCtx.getActions().get(0);
+                // log the batched actions
+                StringBuilder loggedActions = new StringBuilder();
+                createBatchedActionLogStatement(loggedActions, clusterCtx.getActions());
+                logger.info(loggedActions.toString());
 
-            // log the batched actions
-            StringBuilder loggedActions = new StringBuilder();
-            createBatchedActionLogStatement(loggedActions, clusterCtx.getActions());
-            logger.info(loggedActions.toString());
+                // remove current actions and replace with batch action
+                clusterCtx.getActions().clear();
 
-            // remove current actions and replace with batch action
-            clusterCtx.getActions().clear();
+                // create the batch action
+                Action batchAction = new Action();
+                batchAction.setOperation(Operation.Batch);
 
-            // create the batch action
-            Action batchAction = new Action();
-            batchAction.setOperation(Operation.Batch);
+                // copy values from prototype action 
+                batchAction.setTimestamp(prototypeAction.getTimestamp());
+                batchAction.setUserDn(prototypeAction.getUserDn());
+                batchAction.setUserName(prototypeAction.getUserName());
+                batchAction.setSourceId(prototypeAction.getSourceId());
+                batchAction.setSourceName(prototypeAction.getSourceName());
+                batchAction.setSourceType(prototypeAction.getSourceType());
 
-            // copy values from prototype action 
-            batchAction.setTimestamp(prototypeAction.getTimestamp());
-            batchAction.setUserDn(prototypeAction.getUserDn());
-            batchAction.setUserName(prototypeAction.getUserName());
-            batchAction.setSourceId(prototypeAction.getSourceId());
-            batchAction.setSourceName(prototypeAction.getSourceName());
-            batchAction.setSourceType(prototypeAction.getSourceType());
+                // add batch action
+                clusterCtx.getActions().add(batchAction);
 
-            // add batch action
-            clusterCtx.getActions().add(batchAction);
+                // create the final serialized copy of the cluster context
+                serializedClusterContext = WebUtils.serializeObjectToHex(clusterCtx);
+            }
 
-            // create the final serialized copy of the cluster context
-            serializedClusterContext = WebUtils.serializeObjectToHex(clusterCtx);
+            // put serialized cluster context in response header
+            response.header(WebClusterManager.CLUSTER_CONTEXT_HTTP_HEADER, serializedClusterContext);
         }
 
-        // put serialized cluster context in response header
-        response.header(WebClusterManager.CLUSTER_CONTEXT_HTTP_HEADER, serializedClusterContext);
-
         return response;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
index e87f388..3a74782 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
@@ -379,7 +379,7 @@ public class ClusterResource extends ApplicationResource {
             // update the revision
             RevisionDTO updatedRevision = new RevisionDTO();
             updatedRevision.setClientId(revision.getClientId());
-            updatedRevision.setVersion(controllerResponse.getRevision());
+            updatedRevision.setVersion(controllerResponse.getVersion());
 
             // generate the response entity
             final ProcessorEntity entity = new ProcessorEntity();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
index a941444..5d233f7 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
@@ -450,7 +450,7 @@ public class ConnectionResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // create the response entity
         ConnectionEntity entity = new ConnectionEntity();
@@ -684,7 +684,7 @@ public class ConnectionResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // create the response entity
         ConnectionEntity entity = new ConnectionEntity();
@@ -742,7 +742,7 @@ public class ConnectionResource extends ApplicationResource {
         // create the revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(clientId.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // create the response entity
         final ConnectionEntity entity = new ConnectionEntity();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
index a77e9ea..f8c539d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
@@ -315,7 +315,7 @@ public class ControllerResource extends ApplicationResource {
         // create the revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(clientId.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // create the response entity
         final ProcessGroupEntity controllerEntity = new ProcessGroupEntity();
@@ -337,11 +337,6 @@ public class ControllerResource extends ApplicationResource {
     @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
     @TypeHint(Entity.class)
     public Response getRevision() {
-        // replicate if cluster manager
-        if (properties.isClusterManager()) {
-            return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
-        }
-
         // create the current revision
         final RevisionDTO revision = serviceFacade.getRevision();
 
@@ -607,7 +602,7 @@ public class ControllerResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // create the response entity
         final ControllerConfigurationEntity entity = new ControllerConfigurationEntity();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
index 4b36f28..bccf218 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
@@ -281,7 +281,7 @@ public class ControllerServiceResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         final ControllerServiceEntity entity = new ControllerServiceEntity();
@@ -502,7 +502,7 @@ public class ControllerServiceResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         final ControllerServiceEntity entity = new ControllerServiceEntity();
@@ -563,7 +563,7 @@ public class ControllerServiceResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO revision = new RevisionDTO();
         revision.setClientId(clientId.getClientId());
-        revision.setVersion(controllerResponse.getRevision());
+        revision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         final ControllerServiceEntity entity = new ControllerServiceEntity();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java
index 4406c2e..3492de2 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java
@@ -244,7 +244,7 @@ public class FunnelResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         final FunnelEntity entity = new FunnelEntity();
@@ -408,7 +408,7 @@ public class FunnelResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         final FunnelEntity entity = new FunnelEntity();
@@ -465,7 +465,7 @@ public class FunnelResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO revision = new RevisionDTO();
         revision.setClientId(clientId.getClientId());
-        revision.setVersion(controllerResponse.getRevision());
+        revision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         final FunnelEntity entity = new FunnelEntity();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
index 58c3c9e..f3a6326 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
@@ -251,7 +251,7 @@ public class InputPortResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         final InputPortEntity entity = new InputPortEntity();
@@ -446,7 +446,7 @@ public class InputPortResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         final InputPortEntity entity = new InputPortEntity();
@@ -503,7 +503,7 @@ public class InputPortResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO revision = new RevisionDTO();
         revision.setClientId(clientId.getClientId());
-        revision.setVersion(controllerResponse.getRevision());
+        revision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         final InputPortEntity entity = new InputPortEntity();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java
index 9a61cfc..6435671 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java
@@ -260,7 +260,7 @@ public class LabelResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         final LabelEntity entity = new LabelEntity();
@@ -463,7 +463,7 @@ public class LabelResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
-        updatedRevision.setVersion(controllerResponse.getRevision());
+        updatedRevision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         final LabelEntity entity = new LabelEntity();
@@ -519,7 +519,7 @@ public class LabelResource extends ApplicationResource {
         // get the updated revision
         final RevisionDTO revision = new RevisionDTO();
         revision.setClientId(clientId.getClientId());
-        revision.setVersion(controllerResponse.getRevision());
+        revision.setVersion(controllerResponse.getVersion());
 
         // build the response entity
         final LabelEntity entity = new LabelEntity();