You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/07/06 18:36:48 UTC
[4/4] nifi git commit: NIFI-2170: Refactor RevisionManager into a
RevisionManager and a DistributedLockingManager. This closes #610
NIFI-2170: Refactor RevisionManager into a RevisionManager and a DistributedLockingManager. This closes #610
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f4c94e34
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f4c94e34
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f4c94e34
Branch: refs/heads/master
Commit: f4c94e349cdf819aa5e16957ff1c6897c469dc50
Parents: 181386b
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Jul 5 13:02:03 2016 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Wed Jul 6 14:36:12 2016 -0400
----------------------------------------------------------------------
.../http/replication/RequestReplicator.java | 6 +-
.../ThreadPoolRequestReplicator.java | 17 +-
.../nifi/web/concurrent/DistributedLock.java | 111 ++
.../concurrent/DistributedLockingManager.java | 71 ++
.../web/concurrent/LockExpiredException.java | 26 +
.../nifi/web/revision/RevisionManager.java | 127 +-
.../org/apache/nifi/web/NiFiServiceFacade.java | 136 ++-
.../nifi/web/StandardNiFiServiceFacade.java | 1108 +++++++-----------
.../StandardNiFiWebConfigurationContext.java | 58 +-
.../nifi/web/api/ApplicationResource.java | 141 ++-
.../api/config/LockExpiredExceptionMapper.java | 44 +
.../src/main/resources/nifi-web-api-context.xml | 7 +-
.../concurrent/DistributedReadWriteLock.java | 49 +
.../apache/nifi/web/concurrent/LockInfo.java | 55 +
.../apache/nifi/web/concurrent/LockMode.java | 26 +
.../nifi/web/concurrent/ReadWriteLockSync.java | 32 +
.../concurrent/ReentrantDistributedLock.java | 174 +++
.../nifi/web/revision/NaiveRevisionManager.java | 710 +----------
.../TestReentrantDistributedLock.java | 216 ++++
.../web/revision/TestNaiveRevisionManager.java | 660 -----------
20 files changed, 1548 insertions(+), 2226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
index a4b762a..fad7454 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
@@ -38,7 +38,11 @@ public interface RequestReplicator {
public static final String NODE_CONTINUE = "150-NodeContinue";
public static final int NODE_CONTINUE_STATUS_CODE = 150;
- public static final String CLAIM_CANCEL_HEADER = "X-Cancel-Claim";
+ /**
+ * Indicates that the request is intended to cancel a lock that was previously obtained without performing the action
+ */
+ public static final String LOCK_CANCELATION_HEADER = "X-Cancel-Lock";
+ public static final String LOCK_VERSION_ID_HEADER = "X-Lock-Version-Id";
/**
* Stops the instance from replicating requests. Calling this method on a stopped instance has no effect.
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index 3d90b6f..7a9b56f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -325,6 +325,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
private void performVerification(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, StandardAsyncClusterResponse clusterResponse) {
logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath());
+ // Add the Lock Version ID to the headers so that it is used in all requests for this transaction
+ final String lockVersionId = UUID.randomUUID().toString();
+ headers.put(RequestReplicator.LOCK_VERSION_ID_HEADER, lockVersionId);
+
final Map<String, String> updatedHeaders = new HashMap<>(headers);
updatedHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE);
@@ -361,20 +365,21 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
return;
}
- final Thread cancelClaimThread = new Thread(new Runnable() {
+ final Map<String, String> cancelLockHeaders = new HashMap<>(updatedHeaders);
+ cancelLockHeaders.put(LOCK_CANCELATION_HEADER, "true");
+ final Thread cancelLockThread = new Thread(new Runnable() {
@Override
public void run() {
logger.debug("Found {} dissenting nodes for {} {}; canceling claim request", dissentingCount, method, uri.getPath());
- updatedHeaders.put(CLAIM_CANCEL_HEADER, "true");
final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
- nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, null);
+ nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null);
- replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders);
+ replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders);
}
});
- cancelClaimThread.setName("Cancel Claims");
- cancelClaimThread.start();
+ cancelLockThread.setName("Cancel Flow Locks");
+ cancelLockThread.start();
// Add a NodeResponse for each node to the Cluster Response
// Check that all nodes responded successfully.
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLock.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLock.java
new file mode 100644
index 0000000..b5c974f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLock.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+public interface DistributedLock {
+
+ /**
+ * Obtains a lock, blocking as long as necessary to obtain the lock.
+ * Once a lock has been obtained, the identifier of the version of the lock is returned,
+ * which can be passed to the {@link #withLock(String, Callable)} or
+ * {@link #unlock(String)} method. Once this method returns, it is
+ * important that either the {@link #withLock(String, Callable)} or
+ * {@link #unlock(String)} method be called with this identifier. Otherwise,
+ * any attempt to claim another read lock or write lock will block until this
+ * lock expires.
+ *
+ * @return the identifier
+ */
+ String lock();
+
+ /**
+ * Obtains a lock, blocking as long as necessary to obtain the lock.
+ * Once a lock has been obtained, the identifier of the version of the lock is returned,
+ * which can be passed to the {@link #withLock(String, Callable)} or
+ * {@link #unlock(String)} method. Once this method returns, it is
+ * important that either the {@link #withLock(String, Callable)} or
+ * {@link #unlock(String)} method be called with this identifier. Otherwise,
+ * any attempt to claim another read lock or write lock will block until this
+ * lock expires.
+ *
+ * @param versionIdentifier a value that should be used as the version id instead of generating one.
+ * This allows us to ensure that all nodes in the cluster use the same id.
+ *
+ * @return the identifier
+ */
+ String lock(String versionIdentifier);
+
+ /**
+ * Waits up to the given amount of time to obtain a lock. If the lock is obtained
+ * within this time period, the identifier will be returned, as with {@link #lock()}.
+ * If the lock cannot be obtained within the given time period, <code>null</code> will
+ * be returned.
+ *
+ * @param time the maximum amount of time to wait for the lock
+ * @param timeUnit the unit of time that the time parameter is in
+ * @return the identifier of the lock, or <code>null</code> if no lock is obtained
+ */
+ String tryLock(long time, TimeUnit timeUnit);
+
+ /**
+ * Waits up to the given amount of time to obtain a lock. If the lock is obtained
+ * within this time period, the identifier will be returned, as with {@link #lock()}.
+ * If the lock cannot be obtained within the given time period, <code>null</code> will
+ * be returned.
+ *
+ * @param time the maximum amount of time to wait for the lock
+ * @param timeUnit the unit of time that the time parameter is in
+ * @param versionIdentifier a value that should be used as the version id instead of generating one.
+ * This allows us to ensure that all nodes in the cluster use the same id.
+ * @return the identifier of the lock, or <code>null</code> if no lock is obtained
+ */
+ String tryLock(long time, TimeUnit timeUnit, String versionIdentifier);
+
+ /**
+ * Performs the given action while this lock is held. The identifier of the lock that was
+ * obtained by calling {@link #lock()} must be provided to this method. If the
+ * lock identifier is incorrect, or the lock has expired, a {@link LockExpiredException}
+ * will be thrown. This method provides a mechanism for verifying that the lock obtained
+ * by {@link #lock()} or {@link #tryLock(long, TimeUnit)} is still valid and that the action
+ * being performed will be done so without the lock expiring (i.e., if the lock expires while
+ * the action is being performed, the lock won't be released until the provided action completes).
+ *
+ * @param identifier the identifier of the lock that has already been obtained
+ * @param action the action to perform
+ *
+ * @return the value returned by the given action
+ *
+ * @throws LockExpiredException if the provided identifier is not the identifier of the currently
+ * held lock, or if the lock that was obtained has already expired and is no longer valid
+ */
+ <T> T withLock(String identifier, Supplier<T> action) throws LockExpiredException;
+
+ /**
+ * Cancels the lock with the given identifier, so that the lock is no longer valid.
+ *
+ * @param identifier the identifier of the lock that was obtained by calling {@link #lock()}.
+ *
+ * @throws LockExpiredException if the provided identifier is not the identifier of the currently
+ * held lock, or if the lock that was obtained has already expired and is no longer valid
+ */
+ void unlock(String identifier) throws LockExpiredException;
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLockingManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLockingManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLockingManager.java
new file mode 100644
index 0000000..09c1d5d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLockingManager.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.concurrent;
+
+/**
+ * <p>
+ * A DistributedLockingManager is responsible for exposing a mechanism that
+ * clients can use to obtain a lock on the dataflow.
+ * </p>
+ *
+ * <p>
+ * Because of the way in which NiFi replicates requests from one node to all
+ * other nodes in the cluster, it is important that all nodes in the cluster
+ * are able to obtain a lock for the request before the request is allowed to
+ * proceed. This is accomplished by using a two-phase approach. For each request
+ * that will require a lock (either a read (shared) lock or a write (mutually
+ * exclusive) lock), the request must be done in two phases. The first phase is
+ * responsible for obtaining the lock and optionally performing validation of
+ * the request. Once a node has obtained the necessary lock and performed any
+ * required validation, the node will respond to the web request with a status
+ * code of 150 - NodeContinue.
+ * </p>
+ *
+ * <p>
+ * At this point, the node that originated the request
+ * will verify that either all nodes obtained a lock or that at least one node
+ * failed to obtain a lock. If all nodes respond with a 150 - NodeContinue,
+ * then the second phase of the request will occur. In the second phase, the
+ * actual logic of the desired request is performed while the lock is held.
+ * The lock is then released, once the logic is performed (or if the logic fails
+ * to be performed).
+ * </p>
+ *
+ * <p>
+ * In the case that at least one node responds with a status code with than
+ * 150 - NodeContinue, the node that originated the request will instead issue
+ * a cancel request for the second phase so that all nodes are able to unlock
+ * the lock that was previously obtained for the request.
+ * </p>
+ *
+ * <p>
+ * A key consideration in this type of approach that must be taken into account
+ * is that the node that originated the request could, at any point in time, fail
+ * as a result of the process being killed, power loss, network connectivity problems,
+ * etc. As a result, the locks that are obtained through a DistributedLockingManager
+ * are designed to expire after some amount of time, so that locks are not held
+ * indefinitely, even in the case of node failure.
+ * </p>
+ */
+public interface DistributedLockingManager {
+
+ DistributedLock getReadLock();
+
+ DistributedLock getWriteLock();
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/LockExpiredException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/LockExpiredException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/LockExpiredException.java
new file mode 100644
index 0000000..c673c9f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/LockExpiredException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.concurrent;
+
+public class LockExpiredException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public LockExpiredException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
index 7585299..6b6bc45 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
@@ -19,18 +19,16 @@ package org.apache.nifi.web.revision;
import java.util.Collection;
import java.util.List;
-import java.util.Set;
-import java.util.function.Supplier;
import org.apache.nifi.authorization.user.NiFiUser;
-import org.apache.nifi.web.InvalidRevisionException;
import org.apache.nifi.web.Revision;
+import org.apache.nifi.web.concurrent.DistributedLockingManager;
/**
* <p>
* A Revision Manager provides the ability to prevent clients of the Web API from
- * stepping on one another. This is done by providing claims and locking mechanisms
+ * stepping on one another. This is done by providing revisions
* for components individually.
* </p>
*
@@ -45,69 +43,28 @@ import org.apache.nifi.web.Revision;
*
* <p>
* When the first phase of the two-phase commit is processed, the Revision Manager should
- * be used to obtain a Revision Claim by calling the {@link #requestClaim(Collection)}
- * method. If a Claim is granted, then the request validation may continue. If the
- * Claim is not granted, the request should fail and the second phase should not
- * be performed.
+ * be used to verify that the client-provided Revisions are current by calling the
+ * {@link #verifyRevisions(Collection)}
+ * method. If the revisions are up-to-date, the method will return successfully and the
+ * request validation may continue. Otherwise, the request should fail and the second phase
+ * should not be performed.
* </p>
*
* <p>
* If the first phase of the above two-phase commit completes and all nodes indicate that the
- * request may continue, this means that all nodes have provided granted a Claim on the Revisions
- * that are relevant. This Claim will automatically expire after some time. This expiration
- * means that if the node that issues the first phase never initiates the second phase (if the node
- * dies or loses network connectivitiy, for instance), then the Revision Claim will expire and
- * the Revision will remain unchanged.
+ * request may continue, this means that all nodes have agreed that the client's Revisios are
+ * acceptable.
* </p>
*
* <p>
- * When the second phase begins, changes to the resource(s) must be made with the Revisions
- * locked. This is accomplished by wrapping the logic in a {@link Runnable} and passing the Runnable,
- * along with the {@link RevisionClaim} to the {@link #updateRevision(RevisionClaim, Supplier)} method.
+ * To ensure that the revisions remain consistent between the time that they are validated and
+ * the time that the modification takes place, it is important that the revisions always be
+ * validated while an appropriate read or write lock is held, via the {@link DistributedLockingManager}.
* </p>
*/
public interface RevisionManager {
/**
- * <p>
- * Attempts to obtain a Revision Claim for Revisions supplied. If a Revision Claim
- * is granted, no other thread will be allowed to modify any of the components for
- * which a Revision is claimed until either the Revision Claim is relinquished by
- * calling the {@link #updateRevision(RevisionClaim, Runnable)} method or the
- * {@link #releaseClaim(RevisionClaim)} method, or the Revision Claim expires.
- * </p>
- *
- * <p>
- * This method is atomic. If a Revision Claim is unable to be obtained for any of the
- * provided Revisions, then no Revision Claim will be obtained.
- * </p>
- *
- * @param revisions a Set of Revisions, each of which corresponds to a different
- * component for which a Claim is to be acquired.
- * @param user the user for which the claim is being requested
- *
- * @return the Revision Claim that was granted, if one was granted.
- *
- * @throws InvalidRevisionException if any of the Revisions provided is out-of-date.
- */
- RevisionClaim requestClaim(Collection<Revision> revisions, NiFiUser user) throws InvalidRevisionException;
-
- /**
- * <p>
- * A convenience method that will call {@link #requestClaim(Collection)} by wrapping the given
- * Revision in a Collection
- * </p>
- *
- * @param revision the revision to request a claim for
- * @param user the user for which the claim is being requested
- *
- * @return the Revision Claim that was granted, if one was granted.
- *
- * @throws InvalidRevisionException if any of the Revisions provided is out-of-date.
- */
- RevisionClaim requestClaim(Revision revision, NiFiUser user) throws InvalidRevisionException;
-
- /**
* Returns the current Revision for the component with the given ID. If no Revision yet exists for the
* component with the given ID, one will be created with a Version of 0 and no Client ID.
*
@@ -135,7 +92,7 @@ public interface RevisionManager {
*
* @throws ExpiredRevisionClaimException if the Revision Claim has expired
*/
- <T> RevisionUpdate<T> updateRevision(RevisionClaim claim, NiFiUser modifier, UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException;
+ <T> RevisionUpdate<T> updateRevision(RevisionClaim claim, NiFiUser modifier, UpdateRevisionTask<T> task);
/**
* Performs the given task that is expected to remove a component from the flow. As a result,
@@ -152,64 +109,6 @@ public interface RevisionManager {
<T> T deleteRevision(RevisionClaim claim, NiFiUser user, DeleteRevisionTask<T> task) throws ExpiredRevisionClaimException;
/**
- * Performs some operation to obtain an object of type T whose identifier is provided via
- * the componentId argument, and return that object of type T while holding a Read Lock on
- * the Revision for that object. Note that the callback provided must never modify the object
- * with the given ID.
- *
- * @param callback the callback that is to be performed with the Read Lock held
- * @return the value returned from the callback
- */
- <T> T get(String componentId, ReadOnlyRevisionCallback<T> callback);
-
- /**
- * Performs some operation to obtain an object of type T whose identifier is provided via
- * the componentId argument, and return that object of type T while holding a Read Lock on
- * the Revision for that object. Note that the callback provided must never modify the object
- * with the given ID.
- *
- * @param callback the callback that is to be performed with the Read Lock held
- * @return the value returned from the callback
- */
- <T> T get(Set<String> componentId, Supplier<T> callback);
-
- /**
- * Releases the claims on the revisions held by the given Revision Claim, if all of the Revisions
- * are up-to-date.
- *
- * @param claim the claim that holds the revisions
- * @param user the user that is releasing the claim. Must be the same user that claimed the revision.
- *
- * @return <code>true</code> if the claim was released, <code>false</code> if the Revisions were not
- * up-to-date
- */
- boolean releaseClaim(RevisionClaim claim, NiFiUser user);
-
- /**
- * Releases the claim on the revision for the given component if the claim was obtained by the calling thread
- *
- * @param componentId the ID of the component
- * @return <code>true</code> if the claim was released, false otherwise
- */
- boolean cancelClaim(String componentId);
-
- /**
- * Releases the claim on the given revision if the claim was obtained by the calling thread
- *
- * @param revision the Revision to cancel the claim for
- * @return <code>true</code> if the claim was released, false otherwise
- */
- boolean cancelClaim(Revision revision);
-
- /**
- * Releases the claims on the given revisions if the claim was obtained by the calling thread
- *
- * @param revisions the Revisions to cancel claims for
- * @return <code>true</code> if all claims were released, false otherwise
- */
- boolean cancelClaims(Set<Revision> revisions);
-
- /**
* Clears any revisions that are currently held and resets the Revision Manager so that the revisions
* present are those provided in the given collection
*/
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index dd1f7e0..2d093ce 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -16,6 +16,14 @@
*/
package org.apache.nifi.web;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.repository.claim.ContentDirection;
@@ -90,13 +98,7 @@ import org.apache.nifi.web.api.entity.SnippetEntity;
import org.apache.nifi.web.api.entity.TemplateEntity;
import org.apache.nifi.web.api.entity.UserEntity;
import org.apache.nifi.web.api.entity.UserGroupEntity;
-
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
+import org.apache.nifi.web.concurrent.LockExpiredException;
/**
* Defines the NiFiServiceFacade interface.
@@ -115,56 +117,120 @@ public interface NiFiServiceFacade {
void authorizeAccess(AuthorizeAccess authorizeAccess);
/**
- * Claims the specified revision for the specified user.
+ * Obtains a read (shared) lock for the entire flow, so that no other
+ * requests can be made to modify the flow until either this read lock
+ * is released via {@link #releaseReadLock()} or the lock expires
*
- * @param revision revision
- * @param user user
- * @throws InvalidRevisionException invalid revision
+ * @return an identifier that indicates the version of the lock, so that other
+ * requests cannot release a lock that was held by this request
*/
- void claimRevision(Revision revision, NiFiUser user) throws InvalidRevisionException;
+ String obtainReadLock();
/**
- * Claims the specified revisions for the specified user.
+ * Obtains a read (shared) lock for the entire flow, so that no other
+ * requests can be made to modify the flow until either this read lock
+ * is released via {@link #releaseReadLock()} or the lock expires
*
- * @param revisions revisions
- * @param user user
- * @throws InvalidRevisionException invalid revision
+ * @param versionId specifies a value to use for the Version ID for the lock
+ *
+ * @return an identifier that indicates the version of the lock, so that other
+ * requests cannot release a lock that was held by this request
*/
- void claimRevisions(Set<Revision> revisions, NiFiUser user) throws InvalidRevisionException;
+ String obtainReadLock(String versionId);
/**
- * Cancels the specified revision. Cancellation is only supported based on the current thread.
+ * Performs the given action while holding the read lock that has already been obtained
+ * with the given versionIdentifier. This allows the given action to be performed without
+ * allowing the read lock to expire until the entire action has completed.
*
- * @param revision revision
- * @throws InvalidRevisionException invalid revision
+ * @param versionIdentifier the identifier that indicates the version of the lock that
+ * is held. The value that is to be passed here is the value that was returned from the
+ * call to {@link #obtainReadLock()}.
+ * @param action the action to perform
+ *
+ * @return the value returned by the action
+ * @throws LockExpiredException if the lock has expired before the action is invoked
+ * @throws Exception any Exception thrown by the given action is propagated
*/
- void cancelRevision(Revision revision) throws InvalidRevisionException;
+ <T> T withReadLock(String versionIdentifier, Supplier<T> action) throws LockExpiredException;
/**
- * Cancels the specified revisions. Cancellation is only supported based on the current thread.
+ * Releases the read lock held on this flow
*
- * @param revisions revision
- * @throws InvalidRevisionException invalid revision
+ * @param versionIdentifier the identifier that indicates the version of the lock that
+ * is held. The value that is to be passed here is the value that was returned from the
+ * call to {@link #obtainReadLock()}.
+ *
+ * @throws LockExpiredException if the lock with the given identifier has already expired or is not valid
*/
- void cancelRevisions(Set<Revision> revisions) throws InvalidRevisionException;
+ void releaseReadLock(String versionIdentifier) throws LockExpiredException;
/**
- * Releases the claim that is held on the given revision by the given user
+ * Obtains a write (mutually exclusive) lock for the entire flow, so that no other
+ * requests can be made to read or modify the flow until either this write lock
+ * is released via {@link #releaseWriteLock()} or the lock expires
*
- * @param revision the revision
- * @param user the user
- * @throws InvalidRevisionException if the revision is invalid
+ * @return an identifier that indicates the version of the lock, so that other
+ * requests cannot release a lock that was held by this request
*/
- void releaseRevisionClaim(Revision revision, NiFiUser user) throws InvalidRevisionException;
+ String obtainWriteLock();
/**
- * Releases the claim that is held on the given revisions by the given user
+ * Obtains a write (mutually exclusive) lock for the entire flow, so that no other
+ * requests can be made to read or modify the flow until either this write lock
+ * is released via {@link #releaseWriteLock()} or the lock expires
*
- * @param revisions the revisions
- * @param user the user
- * @throws InvalidRevisionException if the revision is invalid
+ * @param versionId specifies a value to use for the Version ID for the lock
+ *
+ * @return an identifier that indicates the version of the lock, so that other
+ * requests cannot release a lock that was held by this request
+ */
+ String obtainWriteLock(String versionId);
+
+ /**
+ * Performs the given action while holding the write lock that has already been obtained
+ * with the given versionIdentifier. This allows the given action to be performed without
+ * allowing the write lock to expire until the entire action has completed.
+ *
+ * @param versionIdentifier the identifier that indicates the version of the lock that
+ * is held. The value that is to be passed here is the value that was returned from the
+ * call to {@link #obtainWriteLock()}.
+ * @param action the action to perform
+ *
+ * @return the value returned by the action
+ * @throws LockExpiredException if the lock has expired before the action is invoked
+ * @throws Exception any Exception thrown by the given action is propagated
+ */
+ <T> T withWriteLock(String versionIdentifier, Supplier<T> action) throws LockExpiredException;
+
+ /**
+ * Releases the write lock held on the flow
+ *
+ * @param versionIdentifier the identifier that indicates the version of the lock that
+ * is held. The value that is to be passed here is the value that was returned from the
+ * call to {@link #obtainWriteLock()}.
+ *
+ * @throws LockExpiredException if the lock with the given identifier has already expired or is not valid
+ */
+ void releaseWriteLock(String versionIdentifier) throws LockExpiredException;
+
+ /**
+ * Claims the specified revision for the specified user.
+ *
+ * @param revision revision
+ * @param user user
+ * @throws InvalidRevisionException invalid revision
+ */
+ void verifyRevision(Revision revision, NiFiUser user) throws InvalidRevisionException;
+
+ /**
+ * Claims the specified revisions for the specified user.
+ *
+ * @param revisions revisions
+ * @param user user
+ * @throws InvalidRevisionException invalid revision
*/
- void releaseRevisionClaims(Set<Revision> revisions, NiFiUser user) throws InvalidRevisionException;
+ void verifyRevisions(Set<Revision> revisions, NiFiUser user) throws InvalidRevisionException;
/**
* Gets the current revisions for the components based on the specified function.