You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/07/06 18:36:48 UTC

[4/4] nifi git commit: NIFI-2170: Refactor RevisionManager into a RevisionManager and a DistributedLockingManager. This closes #610

NIFI-2170: Refactor RevisionManager into a RevisionManager and a DistributedLockingManager. This closes #610


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f4c94e34
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f4c94e34
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f4c94e34

Branch: refs/heads/master
Commit: f4c94e349cdf819aa5e16957ff1c6897c469dc50
Parents: 181386b
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Jul 5 13:02:03 2016 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Wed Jul 6 14:36:12 2016 -0400

----------------------------------------------------------------------
 .../http/replication/RequestReplicator.java     |    6 +-
 .../ThreadPoolRequestReplicator.java            |   17 +-
 .../nifi/web/concurrent/DistributedLock.java    |  111 ++
 .../concurrent/DistributedLockingManager.java   |   71 ++
 .../web/concurrent/LockExpiredException.java    |   26 +
 .../nifi/web/revision/RevisionManager.java      |  127 +-
 .../org/apache/nifi/web/NiFiServiceFacade.java  |  136 ++-
 .../nifi/web/StandardNiFiServiceFacade.java     | 1108 +++++++-----------
 .../StandardNiFiWebConfigurationContext.java    |   58 +-
 .../nifi/web/api/ApplicationResource.java       |  141 ++-
 .../api/config/LockExpiredExceptionMapper.java  |   44 +
 .../src/main/resources/nifi-web-api-context.xml |    7 +-
 .../concurrent/DistributedReadWriteLock.java    |   49 +
 .../apache/nifi/web/concurrent/LockInfo.java    |   55 +
 .../apache/nifi/web/concurrent/LockMode.java    |   26 +
 .../nifi/web/concurrent/ReadWriteLockSync.java  |   32 +
 .../concurrent/ReentrantDistributedLock.java    |  174 +++
 .../nifi/web/revision/NaiveRevisionManager.java |  710 +----------
 .../TestReentrantDistributedLock.java           |  216 ++++
 .../web/revision/TestNaiveRevisionManager.java  |  660 -----------
 20 files changed, 1548 insertions(+), 2226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
index a4b762a..fad7454 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
@@ -38,7 +38,11 @@ public interface RequestReplicator {
     public static final String NODE_CONTINUE = "150-NodeContinue";
     public static final int NODE_CONTINUE_STATUS_CODE = 150;
 
-    public static final String CLAIM_CANCEL_HEADER = "X-Cancel-Claim";
+    /**
+     * Indicates that the request is intended to cancel a lock that was previously obtained without performing the action
+     */
+    public static final String LOCK_CANCELATION_HEADER = "X-Cancel-Lock";
+    public static final String LOCK_VERSION_ID_HEADER = "X-Lock-Version-Id";
 
     /**
      * Stops the instance from replicating requests. Calling this method on a stopped instance has no effect.

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index 3d90b6f..7a9b56f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -325,6 +325,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
     private void performVerification(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, StandardAsyncClusterResponse clusterResponse) {
         logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath());
 
+        // Add the Lock Version ID to the headers so that it is used in all requests for this transaction
+        final String lockVersionId = UUID.randomUUID().toString();
+        headers.put(RequestReplicator.LOCK_VERSION_ID_HEADER, lockVersionId);
+
         final Map<String, String> updatedHeaders = new HashMap<>(headers);
         updatedHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE);
 
@@ -361,20 +365,21 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
                             return;
                         }
 
-                        final Thread cancelClaimThread = new Thread(new Runnable() {
+                        final Map<String, String> cancelLockHeaders = new HashMap<>(updatedHeaders);
+                        cancelLockHeaders.put(LOCK_CANCELATION_HEADER, "true");
+                        final Thread cancelLockThread = new Thread(new Runnable() {
                             @Override
                             public void run() {
                                 logger.debug("Found {} dissenting nodes for {} {}; canceling claim request", dissentingCount, method, uri.getPath());
-                                updatedHeaders.put(CLAIM_CANCEL_HEADER, "true");
 
                                 final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
-                                    nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, null);
+                                    nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null);
 
-                                replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders);
+                                replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders);
                             }
                         });
-                        cancelClaimThread.setName("Cancel Claims");
-                        cancelClaimThread.start();
+                        cancelLockThread.setName("Cancel Flow Locks");
+                        cancelLockThread.start();
 
                         // Add a NodeResponse for each node to the Cluster Response
                         // Check that all nodes responded successfully.

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLock.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLock.java
new file mode 100644
index 0000000..b5c974f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLock.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+public interface DistributedLock {
+
+    /**
+     * Obtains a lock, blocking as long as necessary to obtain the lock.
+     * Once a lock has been obtained, the identifier of the version of the lock is returned,
+     * which can be passed to the {@link #withLock(String, Callable)} or
+     * {@link #unlock(String)} method. Once this method returns, it is
+     * important that either the {@link #withLock(String, Callable)} or
+     * {@link #unlock(String)} method be called with this identifier. Otherwise,
+     * any attempt to claim another read lock or write lock will block until this
+     * lock expires.
+     *
+     * @return the identifier
+     */
+    String lock();
+
+    /**
+     * Obtains a lock, blocking as long as necessary to obtain the lock.
+     * Once a lock has been obtained, the identifier of the version of the lock is returned,
+     * which can be passed to the {@link #withLock(String, Callable)} or
+     * {@link #unlock(String)} method. Once this method returns, it is
+     * important that either the {@link #withLock(String, Callable)} or
+     * {@link #unlock(String)} method be called with this identifier. Otherwise,
+     * any attempt to claim another read lock or write lock will block until this
+     * lock expires.
+     *
+     * @param versionIdentifier a value that should be used as the version id instead of generating one.
+     *            This allows us to ensure that all nodes in the cluster use the same id.
+     *
+     * @return the identifier
+     */
+    String lock(String versionIdentifier);
+
+    /**
+     * Waits up to the given amount of time to obtain a lock. If the lock is obtained
+     * within this time period, the identifier will be returned, as with {@link #lock()}.
+     * If the lock cannot be obtained within the given time period, <code>null</code> will
+     * be returned.
+     *
+     * @param time the maximum amount of time to wait for the lock
+     * @param timeUnit the unit of time that the time parameter is in
+     * @return the identifier of the lock, or <code>null</code> if no lock is obtained
+     */
+    String tryLock(long time, TimeUnit timeUnit);
+
+    /**
+     * Waits up to the given amount of time to obtain a lock. If the lock is obtained
+     * within this time period, the identifier will be returned, as with {@link #lock()}.
+     * If the lock cannot be obtained within the given time period, <code>null</code> will
+     * be returned.
+     *
+     * @param time the maximum amount of time to wait for the lock
+     * @param timeUnit the unit of time that the time parameter is in
+     * @param versionIdentifier a value that should be used as the version id instead of generating one.
+     *            This allows us to ensure that all nodes in the cluster use the same id.
+     * @return the identifier of the lock, or <code>null</code> if no lock is obtained
+     */
+    String tryLock(long time, TimeUnit timeUnit, String versionIdentifier);
+
+    /**
+     * Performs the given action while this lock is held. The identifier of the lock that was
+     * obtained by calling {@link #lock()} must be provided to this method. If the
+     * lock identifier is incorrect, or the lock has expired, a {@link LockExpiredException}
+     * will be thrown. This method provides a mechanism for verifying that the lock obtained
+     * by {@link #lock()} or {@link #tryLock(long, TimeUnit)} is still valid and that the action
+     * being performed will be done so without the lock expiring (i.e., if the lock expires while
+     * the action is being performed, the lock won't be released until the provided action completes).
+     *
+     * @param identifier the identifier of the lock that has already been obtained
+     * @param action the action to perform
+     *
+     * @return the value returned by the given action
+     *
+     * @throws LockExpiredException if the provided identifier is not the identifier of the currently
+     *             held lock, or if the lock that was obtained has already expired and is no longer valid
+     */
+    <T> T withLock(String identifier, Supplier<T> action) throws LockExpiredException;
+
+    /**
+     * Cancels the lock with the given identifier, so that the lock is no longer valid.
+     *
+     * @param identifier the identifier of the lock that was obtained by calling {@link #lock()}.
+     *
+     * @throws LockExpiredException if the provided identifier is not the identifier of the currently
+     *             held lock, or if the lock that was obtained has already expired and is no longer valid
+     */
+    void unlock(String identifier) throws LockExpiredException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLockingManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLockingManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLockingManager.java
new file mode 100644
index 0000000..09c1d5d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLockingManager.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.concurrent;
+
+/**
+ * <p>
+ * A DistributedLockingManager is responsible for exposing a mechanism that
+ * clients can use to obtain a lock on the dataflow.
+ * </p>
+ *
+ * <p>
+ * Because of the way in which NiFi replicates requests from one node to all
+ * other nodes in the cluster, it is important that all nodes in the cluster
+ * are able to obtain a lock for the request before the request is allowed to
+ * proceed. This is accomplished by using a two-phase approach. For each request
+ * that will require a lock (either a read (shared) lock or a write (mutually
+ * exclusive) lock), the request must be done in two phases. The first phase is
+ * responsible for obtaining the lock and optionally performing validation of
+ * the request. Once a node has obtained the necessary lock and performed any
+ * required validation, the node will respond to the web request with a status
+ * code of 150 - NodeContinue.
+ * </p>
+ *
+ * <p>
+ * At this point, the node that originated the request
+ * will verify that either all nodes obtained a lock or that at least one node
+ * failed to obtain a lock. If all nodes respond with a 150 - NodeContinue,
+ * then the second phase of the request will occur. In the second phase, the
+ * actual logic of the desired request is performed while the lock is held.
+ * The lock is then released, once the logic is performed (or if the logic fails
+ * to be performed).
+ * </p>
+ *
+ * <p>
+ * In the case that at least one node responds with a status code with than
+ * 150 - NodeContinue, the node that originated the request will instead issue
+ * a cancel request for the second phase so that all nodes are able to unlock
+ * the lock that was previously obtained for the request.
+ * </p>
+ *
+ * <p>
+ * A key consideration in this type of approach that must be taken into account
+ * is that the node that originated the request could, at any point in time, fail
+ * as a result of the process being killed, power loss, network connectivity problems,
+ * etc. As a result, the locks that are obtained through a DistributedLockingManager
+ * are designed to expire after some amount of time, so that locks are not held
+ * indefinitely, even in the case of node failure.
+ * </p>
+ */
+public interface DistributedLockingManager {
+
+    DistributedLock getReadLock();
+
+    DistributedLock getWriteLock();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/LockExpiredException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/LockExpiredException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/LockExpiredException.java
new file mode 100644
index 0000000..c673c9f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/LockExpiredException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.concurrent;
+
+public class LockExpiredException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+
+    public LockExpiredException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
index 7585299..6b6bc45 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
@@ -19,18 +19,16 @@ package org.apache.nifi.web.revision;
 
 import java.util.Collection;
 import java.util.List;
-import java.util.Set;
-import java.util.function.Supplier;
 
 import org.apache.nifi.authorization.user.NiFiUser;
-import org.apache.nifi.web.InvalidRevisionException;
 import org.apache.nifi.web.Revision;
+import org.apache.nifi.web.concurrent.DistributedLockingManager;
 
 
 /**
  * <p>
  * A Revision Manager provides the ability to prevent clients of the Web API from
- * stepping on one another. This is done by providing claims and locking mechanisms
+ * stepping on one another. This is done by providing revisions
  * for components individually.
  * </p>
  *
@@ -45,69 +43,28 @@ import org.apache.nifi.web.Revision;
  *
  * <p>
  * When the first phase of the two-phase commit is processed, the Revision Manager should
- * be used to obtain a Revision Claim by calling the {@link #requestClaim(Collection)}
- * method. If a Claim is granted, then the request validation may continue. If the
- * Claim is not granted, the request should fail and the second phase should not
- * be performed.
+ * be used to verify that the client-provided Revisions are current by calling the
+ * {@link #verifyRevisions(Collection)}
+ * method. If the revisions are up-to-date, the method will return successfully and the
+ * request validation may continue. Otherwise, the request should fail and the second phase
+ * should not be performed.
  * </p>
  *
  * <p>
  * If the first phase of the above two-phase commit completes and all nodes indicate that the
- * request may continue, this means that all nodes have provided granted a Claim on the Revisions
- * that are relevant. This Claim will automatically expire after some time. This expiration
- * means that if the node that issues the first phase never initiates the second phase (if the node
- * dies or loses network connectivitiy, for instance), then the Revision Claim will expire and
- * the Revision will remain unchanged.
+ * request may continue, this means that all nodes have agreed that the client's Revisios are
+ * acceptable.
  * </p>
  *
  * <p>
- * When the second phase begins, changes to the resource(s) must be made with the Revisions
- * locked. This is accomplished by wrapping the logic in a {@link Runnable} and passing the Runnable,
- * along with the {@link RevisionClaim} to the {@link #updateRevision(RevisionClaim, Supplier)} method.
+ * To ensure that the revisions remain consistent between the time that they are validated and
+ * the time that the modification takes place, it is important that the revisions always be
+ * validated while an appropriate read or write lock is held, via the {@link DistributedLockingManager}.
  * </p>
  */
 public interface RevisionManager {
 
     /**
-     * <p>
-     * Attempts to obtain a Revision Claim for Revisions supplied. If a Revision Claim
-     * is granted, no other thread will be allowed to modify any of the components for
-     * which a Revision is claimed until either the Revision Claim is relinquished by
-     * calling the {@link #updateRevision(RevisionClaim, Runnable)} method or the
-     * {@link #releaseClaim(RevisionClaim)} method, or the Revision Claim expires.
-     * </p>
-     *
-     * <p>
-     * This method is atomic. If a Revision Claim is unable to be obtained for any of the
-     * provided Revisions, then no Revision Claim will be obtained.
-     * </p>
-     *
-     * @param revisions a Set of Revisions, each of which corresponds to a different
-     *            component for which a Claim is to be acquired.
-     * @param user the user for which the claim is being requested
-     *
-     * @return the Revision Claim that was granted, if one was granted.
-     *
-     * @throws InvalidRevisionException if any of the Revisions provided is out-of-date.
-     */
-    RevisionClaim requestClaim(Collection<Revision> revisions, NiFiUser user) throws InvalidRevisionException;
-
-    /**
-     * <p>
-     * A convenience method that will call {@link #requestClaim(Collection)} by wrapping the given
-     * Revision in a Collection
-     * </p>
-     *
-     * @param revision the revision to request a claim for
-     * @param user the user for which the claim is being requested
-     *
-     * @return the Revision Claim that was granted, if one was granted.
-     *
-     * @throws InvalidRevisionException if any of the Revisions provided is out-of-date.
-     */
-    RevisionClaim requestClaim(Revision revision, NiFiUser user) throws InvalidRevisionException;
-
-    /**
      * Returns the current Revision for the component with the given ID. If no Revision yet exists for the
      * component with the given ID, one will be created with a Version of 0 and no Client ID.
      *
@@ -135,7 +92,7 @@ public interface RevisionManager {
      *
      * @throws ExpiredRevisionClaimException if the Revision Claim has expired
      */
-    <T> RevisionUpdate<T> updateRevision(RevisionClaim claim, NiFiUser modifier, UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException;
+    <T> RevisionUpdate<T> updateRevision(RevisionClaim claim, NiFiUser modifier, UpdateRevisionTask<T> task);
 
     /**
      * Performs the given task that is expected to remove a component from the flow. As a result,
@@ -152,64 +109,6 @@ public interface RevisionManager {
     <T> T deleteRevision(RevisionClaim claim, NiFiUser user, DeleteRevisionTask<T> task) throws ExpiredRevisionClaimException;
 
     /**
-     * Performs some operation to obtain an object of type T whose identifier is provided via
-     * the componentId argument, and return that object of type T while holding a Read Lock on
-     * the Revision for that object. Note that the callback provided must never modify the object
-     * with the given ID.
-     *
-     * @param callback the callback that is to be performed with the Read Lock held
-     * @return the value returned from the callback
-     */
-    <T> T get(String componentId, ReadOnlyRevisionCallback<T> callback);
-
-    /**
-     * Performs some operation to obtain an object of type T whose identifier is provided via
-     * the componentId argument, and return that object of type T while holding a Read Lock on
-     * the Revision for that object. Note that the callback provided must never modify the object
-     * with the given ID.
-     *
-     * @param callback the callback that is to be performed with the Read Lock held
-     * @return the value returned from the callback
-     */
-    <T> T get(Set<String> componentId, Supplier<T> callback);
-
-    /**
-     * Releases the claims on the revisions held by the given Revision Claim, if all of the Revisions
-     * are up-to-date.
-     *
-     * @param claim the claim that holds the revisions
-     * @param user the user that is releasing the claim. Must be the same user that claimed the revision.
-     *
-     * @return <code>true</code> if the claim was released, <code>false</code> if the Revisions were not
-     *         up-to-date
-     */
-    boolean releaseClaim(RevisionClaim claim, NiFiUser user);
-
-    /**
-     * Releases the claim on the revision for the given component if the claim was obtained by the calling thread
-     *
-     * @param componentId the ID of the component
-     * @return <code>true</code> if the claim was released, false otherwise
-     */
-    boolean cancelClaim(String componentId);
-
-    /**
-     * Releases the claim on the given revision if the claim was obtained by the calling thread
-     *
-     * @param revision the Revision to cancel the claim for
-     * @return <code>true</code> if the claim was released, false otherwise
-     */
-    boolean cancelClaim(Revision revision);
-
-    /**
-     * Releases the claims on the given revisions if the claim was obtained by the calling thread
-     *
-     * @param revisions the Revisions to cancel claims for
-     * @return <code>true</code> if all claims were released, false otherwise
-     */
-    boolean cancelClaims(Set<Revision> revisions);
-
-    /**
      * Clears any revisions that are currently held and resets the Revision Manager so that the revisions
      * present are those provided in the given collection
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index dd1f7e0..2d093ce 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -16,6 +16,14 @@
  */
 package org.apache.nifi.web;
 
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
@@ -90,13 +98,7 @@ import org.apache.nifi.web.api.entity.SnippetEntity;
 import org.apache.nifi.web.api.entity.TemplateEntity;
 import org.apache.nifi.web.api.entity.UserEntity;
 import org.apache.nifi.web.api.entity.UserGroupEntity;
-
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
+import org.apache.nifi.web.concurrent.LockExpiredException;
 
 /**
  * Defines the NiFiServiceFacade interface.
@@ -115,56 +117,120 @@ public interface NiFiServiceFacade {
     void authorizeAccess(AuthorizeAccess authorizeAccess);
 
     /**
-     * Claims the specified revision for the specified user.
+     * Obtains a read (shared) lock for the entire flow, so that no other
+     * requests can be made to modify the flow until either this read lock
+     * is released via {@link #releaseReadLock()} or the lock expires
      *
-     * @param revision revision
-     * @param user user
-     * @throws InvalidRevisionException invalid revision
+     * @return an identifier that indicates the version of the lock, so that other
+     *         requests cannot release a lock that was held by this request
      */
-    void claimRevision(Revision revision, NiFiUser user) throws InvalidRevisionException;
+    String obtainReadLock();
 
     /**
-     * Claims the specified revisions for the specified user.
+     * Obtains a read (shared) lock for the entire flow, so that no other
+     * requests can be made to modify the flow until either this read lock
+     * is released via {@link #releaseReadLock()} or the lock expires
      *
-     * @param revisions revisions
-     * @param user user
-     * @throws InvalidRevisionException invalid revision
+     * @param versionId specifies a value to use for the Version ID for the lock
+     *
+     * @return an identifier that indicates the version of the lock, so that other
+     *         requests cannot release a lock that was held by this request
      */
-    void claimRevisions(Set<Revision> revisions, NiFiUser user) throws InvalidRevisionException;
+    String obtainReadLock(String versionId);
 
     /**
-     * Cancels the specified revision. Cancellation is only supported based on the current thread.
+     * Performs the given action while holding the read lock that has already been obtained
+     * with the given versionIdentifier. This allows the given action to be performed without
+     * allowing the read lock to expire until the entire action has completed.
      *
-     * @param revision revision
-     * @throws InvalidRevisionException invalid revision
+     * @param versionIdentifier the identifier that indicates the version of the lock that
+     *            is held. The value that is to be passed here is the value that was returned from the
+     *            call to {@link #obtainReadLock()}.
+     * @param action the action to perform
+     *
+     * @return the value returned by the action
+     * @throws LockExpiredException if the lock has expired before the action is invoked
+     * @throws Exception any Exception thrown by the given action is propagated
      */
-    void cancelRevision(Revision revision) throws InvalidRevisionException;
+    <T> T withReadLock(String versionIdentifier, Supplier<T> action) throws LockExpiredException;
 
     /**
-     * Cancels the specified revisions. Cancellation is only supported based on the current thread.
+     * Releases the read lock held on this flow
      *
-     * @param revisions revision
-     * @throws InvalidRevisionException invalid revision
+     * @param versionIdentifier the identifier that indicates the version of the lock that
+     *            is held. The value that is to be passed here is the value that was returned from the
+     *            call to {@link #obtainReadLock()}.
+     *
+     * @throws LockExpiredException if the lock with the given identifier has already expired or is not valid
      */
-    void cancelRevisions(Set<Revision> revisions) throws InvalidRevisionException;
+    void releaseReadLock(String versionIdentifier) throws LockExpiredException;
 
     /**
-     * Releases the claim that is held on the given revision by the given user
+     * Obtains a write (mutually exclusive) lock for the entire flow, so that no other
+     * requests can be made to read or modify the flow until either this write lock
+     * is released via {@link #releaseWriteLock()} or the lock expires
      *
-     * @param revision the revision
-     * @param user the user
-     * @throws InvalidRevisionException if the revision is invalid
+     * @return an identifier that indicates the version of the lock, so that other
+     *         requests cannot release a lock that was held by this request
      */
-    void releaseRevisionClaim(Revision revision, NiFiUser user) throws InvalidRevisionException;
+    String obtainWriteLock();
 
     /**
-     * Releases the claim that is held on the given revisions by the given user
+     * Obtains a write (mutually exclusive) lock for the entire flow, so that no other
+     * requests can be made to read or modify the flow until either this write lock
+     * is released via {@link #releaseWriteLock()} or the lock expires
      *
-     * @param revisions the revisions
-     * @param user the user
-     * @throws InvalidRevisionException if the revision is invalid
+     * @param versionId specifies a value to use for the Version ID for the lock
+     *
+     * @return an identifier that indicates the version of the lock, so that other
+     *         requests cannot release a lock that was held by this request
+     */
+    String obtainWriteLock(String versionId);
+
+    /**
+     * Performs the given action while holding the write lock that has already been obtained
+     * with the given versionIdentifier. This allows the given action to be performed without
+     * allowing the write lock to expire until the entire action has completed.
+     *
+     * @param versionIdentifier the identifier that indicates the version of the lock that
+     *            is held. The value that is to be passed here is the value that was returned from the
+     *            call to {@link #obtainWriteLock()}.
+     * @param action the action to perform
+     *
+     * @return the value returned by the action
+     * @throws LockExpiredException if the lock has expired before the action is invoked
+     * @throws Exception any Exception thrown by the given action is propagated
+     */
+    <T> T withWriteLock(String versionIdentifier, Supplier<T> action) throws LockExpiredException;
+
+    /**
+     * Releases the write lock held on the flow
+     *
+     * @param versionIdentifier the identifier that indicates the version of the lock that
+     *            is held. The value that is to be passed here is the value that was returned from the
+     *            call to {@link #obtainWriteLock()}.
+     *
+     * @throws LockExpiredException if the lock with the given identifier has already expired or is not valid
+     */
+    void releaseWriteLock(String versionIdentifier) throws LockExpiredException;
+
+    /**
+     * Claims the specified revision for the specified user.
+     *
+     * @param revision revision
+     * @param user user
+     * @throws InvalidRevisionException invalid revision
+     */
+    void verifyRevision(Revision revision, NiFiUser user) throws InvalidRevisionException;
+
+    /**
+     * Claims the specified revisions for the specified user.
+     *
+     * @param revisions revisions
+     * @param user user
+     * @throws InvalidRevisionException invalid revision
      */
-    void releaseRevisionClaims(Set<Revision> revisions, NiFiUser user) throws InvalidRevisionException;
+    void verifyRevisions(Set<Revision> revisions, NiFiUser user) throws InvalidRevisionException;
 
     /**
      * Gets the current revisions for the components based on the specified function.