You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/07/06 18:36:46 UTC

[2/4] nifi git commit: NIFI-2170: Refactor RevisionManager into a RevisionManager and a DistributedLockingManager. This closes #610

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
index b42f839..794c5a1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
@@ -52,6 +52,7 @@ import org.apache.nifi.web.api.dto.RevisionDTO;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
 import org.apache.nifi.web.api.entity.ReportingTaskEntity;
+import org.apache.nifi.web.concurrent.LockExpiredException;
 import org.apache.nifi.web.util.ClientResponseUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -410,17 +411,20 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
                 }
                 processor = entity.getComponent();
             } else {
-                // claim the revision
-                serviceFacade.claimRevision(revision, user);
+                // update processor within write lock
+                final String writeLockId = serviceFacade.obtainWriteLock();
                 try {
-
-                    ProcessorDTO processorDTO = buildProcessorDto(id,annotationData,properties);
-                    final ProcessorEntity entity = serviceFacade.updateProcessor(revision,processorDTO);
-                    processor = entity.getComponent();
-
+                    processor = serviceFacade.withWriteLock(writeLockId, () -> {
+                        ProcessorDTO processorDTO = buildProcessorDto(id, annotationData, properties);
+                        final ProcessorEntity entity = serviceFacade.updateProcessor(revision, processorDTO);
+                        return entity.getComponent();
+                    });
                 } finally {
-                    // ensure the revision is canceled.. if the operation succeed, this is a noop
-                    serviceFacade.cancelRevision(revision);
+                    // ensure the lock is released
+                    try {
+                        serviceFacade.releaseWriteLock(writeLockId);
+                    } catch (final LockExpiredException e) {
+                    }
                 }
             }
 
@@ -565,15 +569,19 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
                 controllerServiceDto.setAnnotationData(annotationData);
                 controllerServiceDto.setProperties(properties);
 
-                // claim the revision
-                serviceFacade.claimRevision(revision, user);
+                // update controller service within write lock
+                final String writeLockId = serviceFacade.obtainWriteLock();
                 try {
-                    // perform the update
-                    final ControllerServiceEntity entity = serviceFacade.updateControllerService(revision, controllerServiceDto);
-                    controllerService = entity.getComponent();
+                    controllerService = serviceFacade.withWriteLock(writeLockId, () -> {
+                        final ControllerServiceEntity entity = serviceFacade.updateControllerService(revision, controllerServiceDto);
+                        return entity.getComponent();
+                    });
                 } finally {
-                    // ensure the revision is canceled.. if the operation succeed, this is a noop
-                    serviceFacade.cancelRevision(revision);
+                    // ensure the lock is released
+                    try {
+                        serviceFacade.releaseWriteLock(writeLockId);
+                    } catch (final LockExpiredException e) {
+                    }
                 }
             } else {
                 // if this is a standalone instance the service should have been found above... there should
@@ -733,14 +741,20 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
                 reportingTaskDto.setAnnotationData(annotationData);
                 reportingTaskDto.setProperties(properties);
 
-                // claim the revision
-                serviceFacade.claimRevision(revision, user);
+                // obtain write lock
+                final String writeLockId = serviceFacade.obtainWriteLock();
                 try {
-                    final ReportingTaskEntity entity = serviceFacade.updateReportingTask(revision, reportingTaskDto);
-                    reportingTask = entity.getComponent();
+                    reportingTask = serviceFacade.withWriteLock(writeLockId, () -> {
+                        serviceFacade.verifyRevision(revision, user);
+                        final ReportingTaskEntity entity = serviceFacade.updateReportingTask(revision, reportingTaskDto);
+                        return entity.getComponent();
+                    });
                 } finally {
-                    // ensure the revision is canceled.. if the operation succeed, this is a noop
-                    serviceFacade.cancelRevision(revision);
+                    // ensure the lock is released
+                    try {
+                        serviceFacade.releaseWriteLock(writeLockId);
+                    } catch (final LockExpiredException e) {
+                    }
                 }
             } else {
                 // if this is a standalone instance the task should have been found above... there should

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 120c387..6fdca03 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -16,10 +16,31 @@
  */
 package org.apache.nifi.web.api;
 
-import com.sun.jersey.api.core.HttpContext;
-import com.sun.jersey.api.representation.Form;
-import com.sun.jersey.core.util.MultivaluedMapImpl;
-import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.CacheControl;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriBuilderException;
+import javax.ws.rs.core.UriInfo;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.Authorizer;
 import org.apache.nifi.authorization.RequestAction;
@@ -40,32 +61,14 @@ import org.apache.nifi.web.api.dto.RevisionDTO;
 import org.apache.nifi.web.api.dto.SnippetDTO;
 import org.apache.nifi.web.api.entity.ComponentEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
+import org.apache.nifi.web.concurrent.LockExpiredException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.core.CacheControl;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.ResponseBuilder;
-import javax.ws.rs.core.UriBuilder;
-import javax.ws.rs.core.UriBuilderException;
-import javax.ws.rs.core.UriInfo;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
+import com.sun.jersey.api.core.HttpContext;
+import com.sun.jersey.api.representation.Form;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider;
 
 /**
  * Base class for controllers.
@@ -343,8 +346,8 @@ public abstract class ApplicationResource {
         return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER) != null;
     }
 
-    protected boolean isClaimCancelationPhase(final HttpServletRequest httpServletRequest) {
-        return httpServletRequest.getHeader(RequestReplicator.CLAIM_CANCEL_HEADER) != null;
+    protected boolean isLockCancelationPhase(final HttpServletRequest httpServletRequest) {
+        return httpServletRequest.getHeader(RequestReplicator.LOCK_CANCELATION_HEADER) != null;
     }
 
     /**
@@ -444,9 +447,7 @@ public abstract class ApplicationResource {
 
         final NiFiUser user = NiFiUserUtils.getNiFiUser();
         return withWriteLock(serviceFacade, authorizer, verifier, action,
-                () -> serviceFacade.claimRevision(revision, user),
-                () -> serviceFacade.cancelRevision(revision),
-                () -> serviceFacade.releaseRevisionClaim(revision, user));
+            () -> serviceFacade.verifyRevision(revision, user));
     }
 
     /**
@@ -463,56 +464,76 @@ public abstract class ApplicationResource {
                                      final Runnable verifier, final Supplier<Response> action) {
         final NiFiUser user = NiFiUserUtils.getNiFiUser();
         return withWriteLock(serviceFacade, authorizer, verifier, action,
-                () -> serviceFacade.claimRevisions(revisions, user),
-                () -> serviceFacade.cancelRevisions(revisions),
-                () -> serviceFacade.releaseRevisionClaims(revisions, user));
+            () -> serviceFacade.verifyRevisions(revisions, user));
     }
 
 
     /**
      * Executes an action through the service facade using the specified revision.
      *
-     * @param serviceFacade  service facade
-     * @param authorizer     authorizer
-     * @param verifier       verifier
-     * @param action         the action to execute
-     * @param claimRevision  a callback that will claim the necessary revisions for the operation
-     * @param cancelRevision a callback that will cancel the necessary revisions if the operation fails
-     * @param releaseClaim   a callback that will release any previously claimed revision if the operation is canceled after the first phase
+     * @param serviceFacade service facade
+     * @param authorizer authorizer
+     * @param verifier verifier
+     * @param action the action to execute
+     * @param verifyRevision a callback that will claim the necessary revisions for the operation
      * @return the response
      */
     private Response withWriteLock(
             final NiFiServiceFacade serviceFacade, final AuthorizeAccess authorizer, final Runnable verifier, final Supplier<Response> action,
-            final Runnable claimRevision, final Runnable cancelRevision, final Runnable releaseClaim) {
+        final Runnable verifyRevision) {
+
+        if (isLockCancelationPhase(httpServletRequest)) {
+            final String lockVersionId = httpServletRequest.getHeader(RequestReplicator.LOCK_VERSION_ID_HEADER);
+            try {
+                serviceFacade.releaseWriteLock(lockVersionId);
+            } catch (final Exception e) {
+                // If the lock has expired, then it has already been unlocked.
+            }
 
-        if (isClaimCancelationPhase(httpServletRequest)) {
-            releaseClaim.run();
             return generateOkResponse().build();
         }
 
-        final boolean validationPhase = isValidationPhase(httpServletRequest);
-        if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
-            // authorize access
-            serviceFacade.authorizeAccess(authorizer);
-            claimRevision.run();
-        }
-
+        String lockId = null;
         try {
+            final boolean validationPhase = isValidationPhase(httpServletRequest);
+            if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
+                // authorize access
+                serviceFacade.authorizeAccess(authorizer);
+
+                lockId = httpServletRequest.getHeader(RequestReplicator.LOCK_VERSION_ID_HEADER);
+                lockId = serviceFacade.obtainWriteLock(lockId);
+                verifyRevision.run();
+            } else {
+                lockId = httpServletRequest.getHeader(RequestReplicator.LOCK_VERSION_ID_HEADER);
+            }
+
             if (validationPhase) {
                 if (verifier != null) {
                     verifier.run();
                 }
                 return generateContinueResponse().build();
             }
-        } catch (final Exception e) {
-            cancelRevision.run();
-            throw e;
-        }
 
-        try {
-            return action.get();
-        } finally {
-            cancelRevision.run();
+            try {
+                return serviceFacade.withWriteLock(lockId, () -> action.get());
+            } finally {
+                try {
+                    serviceFacade.releaseWriteLock(lockId);
+                } catch (final LockExpiredException e) {
+                    // If the lock expires here, it's okay. We've already completed our action,
+                    // so the expiration of the lock is of no consequence to us.
+                }
+            }
+        } catch (final RuntimeException t) {
+            if (lockId != null) {
+                try {
+                    serviceFacade.releaseWriteLock(lockId);
+                } catch (final Exception e) {
+                    t.addSuppressed(e);
+                }
+            }
+
+            throw t;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/LockExpiredExceptionMapper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/LockExpiredExceptionMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/LockExpiredExceptionMapper.java
new file mode 100644
index 0000000..56c87e9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/LockExpiredExceptionMapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.config;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.concurrent.LockExpiredException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Provider
+public class LockExpiredExceptionMapper implements ExceptionMapper<LockExpiredException> {
+    private static final Logger logger = LoggerFactory.getLogger(InvalidRevisionExceptionMapper.class);
+
+    @Override
+    public Response toResponse(LockExpiredException exception) {
+        // log the error
+        logger.warn(String.format("%s. Returning %s response.", exception, Response.Status.CONFLICT));
+
+        if (logger.isDebugEnabled()) {
+            logger.debug(StringUtils.EMPTY, exception);
+        }
+
+        return Response.status(Response.Status.CONFLICT).entity(exception.getMessage()).type("text/plain").build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
index a1a63a6..15d2f0f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
@@ -33,7 +33,6 @@
 
     <!-- revision manager -->
     <bean id="revisionManager" class="org.apache.nifi.web.revision.NaiveRevisionManager">
-        <constructor-arg ref="nifiProperties"></constructor-arg>
     </bean>
     
     <!-- content access -->
@@ -164,6 +163,11 @@
         <property name="clusterCoordinator" ref="clusterCoordinator"/>
         <property name="heartbeatMonitor" ref="heartbeatMonitor" />
         <property name="bulletinRepository" ref="bulletinRepository"/>
+        <property name="lockManager" ref="lockManager" />
+    </bean>
+    
+    <bean id="lockManager" class="org.apache.nifi.web.concurrent.DistributedReadWriteLock">
+        <constructor-arg ref="nifiProperties" />
     </bean>
 
     <!-- component ui extension configuration context -->
@@ -393,6 +397,7 @@
     <bean class="org.apache.nifi.web.api.config.IllegalNodeReconnectionExceptionMapper" scope="singleton"/>
     <bean class="org.apache.nifi.web.api.config.IllegalStateExceptionMapper" scope="singleton"/>
     <bean class="org.apache.nifi.web.api.config.InvalidRevisionExceptionMapper" scope="singleton"/>
+    <bean class="org.apache.nifi.web.api.config.LockExpiredExceptionMapper" scope="singleton"/>
     <bean class="org.apache.nifi.web.api.config.JsonMappingExceptionMapper" scope="singleton"/>
     <bean class="org.apache.nifi.web.api.config.JsonParseExceptionMapper" scope="singleton"/>
     <bean class="org.apache.nifi.web.api.config.MutableRequestExceptionMapper" scope="singleton"/>

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/DistributedReadWriteLock.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/DistributedReadWriteLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/DistributedReadWriteLock.java
new file mode 100644
index 0000000..608d0c0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/DistributedReadWriteLock.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.concurrent;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+
+public class DistributedReadWriteLock implements DistributedLockingManager {
+    private final DistributedLock readLock;
+    private final DistributedLock writeLock;
+
+    public DistributedReadWriteLock(final NiFiProperties properties) {
+        this(FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.REQUEST_REPLICATION_CLAIM_TIMEOUT,
+            NiFiProperties.DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT), TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
+    }
+
+    public DistributedReadWriteLock(final long lockExpirationPeriod, final TimeUnit lockExpirationUnit) {
+        final ReadWriteLockSync sync = new ReadWriteLockSync();
+        readLock = new ReentrantDistributedLock(LockMode.SHARED, sync, lockExpirationPeriod, lockExpirationUnit);
+        writeLock = new ReentrantDistributedLock(LockMode.MUTUALLY_EXCLUSIVE, sync, lockExpirationPeriod, lockExpirationUnit);
+    }
+
+    @Override
+    public DistributedLock getReadLock() {
+        return readLock;
+    }
+
+    @Override
+    public DistributedLock getWriteLock() {
+        return writeLock;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockInfo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockInfo.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockInfo.java
new file mode 100644
index 0000000..a34bb5b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockInfo.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.concurrent;
+
+import java.util.concurrent.TimeUnit;
+
+public class LockInfo {
+    private final String versionId;
+    private final int lockCount;
+    private final LockMode lockMode;
+    private final long expirationTime;
+
+    public LockInfo(final String versionId, final LockMode lockMode, final int lockCount, final long expirationPeriod, final TimeUnit expirationUnit) {
+        this.versionId = versionId;
+        this.lockMode = lockMode;
+        this.lockCount = lockCount;
+        this.expirationTime = System.nanoTime() + expirationUnit.toNanos(expirationPeriod);
+    }
+
+    public boolean isExpired() {
+        return System.nanoTime() > expirationTime;
+    }
+
+    public String getVersionId() {
+        return versionId;
+    }
+
+    public int getLockCount() {
+        return lockCount;
+    }
+
+    public LockMode getLockMode() {
+        return lockMode;
+    }
+
+    @Override
+    public String toString() {
+        return "LockInfo[versionId=" + versionId + ", lockMode=" + lockMode + ", lockCount = " + lockCount + ", expired=" + isExpired() + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockMode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockMode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockMode.java
new file mode 100644
index 0000000..2cc5eac
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockMode.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.concurrent;
+
+public enum LockMode {
+
+    SHARED,
+
+    MUTUALLY_EXCLUSIVE;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReadWriteLockSync.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReadWriteLockSync.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReadWriteLockSync.java
new file mode 100644
index 0000000..ef32f8f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReadWriteLockSync.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.concurrent;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ReadWriteLockSync {
+    private final AtomicReference<LockInfo> lockInfoRef = new AtomicReference<>();
+
+    public LockInfo get() {
+        return lockInfoRef.get();
+    }
+
+    public boolean update(final LockInfo currentLock, final LockInfo updatedLock) {
+        return lockInfoRef.compareAndSet(currentLock, updatedLock);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReentrantDistributedLock.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReentrantDistributedLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReentrantDistributedLock.java
new file mode 100644
index 0000000..c51feec
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReentrantDistributedLock.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.concurrent;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReentrantDistributedLock implements DistributedLock {
+    private static final Logger logger = LoggerFactory.getLogger(ReentrantDistributedLock.class);
+
+    private final long expirationNanos;
+
+    private final ReadWriteLockSync sync;
+    private final LockMode lockMode;
+
+    public ReentrantDistributedLock(final LockMode lockMode, final ReadWriteLockSync sync, final long expirationTimePeriod, final TimeUnit expirationTimeUnit) {
+        this.lockMode = lockMode;
+        this.sync = sync;
+        this.expirationNanos = expirationTimeUnit.toNanos(expirationTimePeriod);
+    }
+
+    int getClaimCount() {
+        final LockInfo currentInfo = sync.get();
+        if (currentInfo == null || currentInfo.isExpired()) {
+            return 0;
+        }
+
+        return currentInfo.getLockCount();
+    }
+
+    @Override
+    public String lock() {
+        return lock(null);
+    }
+
+    @Override
+    public String lock(final String versionIdentifier) {
+        return tryLock(-1L, TimeUnit.MILLISECONDS, versionIdentifier);
+    }
+
+    @Override
+    public String tryLock(final long time, final TimeUnit timeUnit) {
+        return tryLock(time, timeUnit, null);
+    }
+
+    @Override
+    public String tryLock(final long timePeriod, final TimeUnit timeUnit, final String versionIdentifier) {
+        final long stopTryingTime = timePeriod < 0 ? -1L : System.nanoTime() + timeUnit.toNanos(timePeriod);
+        logger.debug("Attempting to obtain {} lock with a max wait of {} {}", lockMode, timePeriod, timeUnit);
+
+        long i = 0;
+        while (true) {
+            if (i++ > 0) {
+                if (stopTryingTime > 0L && System.nanoTime() > stopTryingTime) {
+                    logger.debug("Failed to obtain {} lock within {} {}; returning null for tryLock", lockMode, timePeriod, timeUnit);
+                    return null;
+                }
+
+                // If not the first time we've reached this point, we want to
+                // give other threads a chance to release their locks before
+                // we enter the synchronized block.
+                Thread.yield();
+            }
+
+            synchronized (sync) {
+                final LockInfo currentInfo = sync.get();
+                logger.trace("Current Lock Info = {}", currentInfo);
+
+                if (currentInfo == null || currentInfo.isExpired()) {
+                    // There is no lock currently held. Attempt to obtain the lock.
+                    final String versionId = versionIdentifier == null ? UUID.randomUUID().toString() : versionIdentifier;
+                    final boolean updated = updateLockInfo(currentInfo, versionId, 1);
+
+                    if (updated) {
+                        // Lock has been obtained. Return the current version.
+                        logger.debug("Obtained {} lock with Version ID {}", lockMode, versionId);
+                        return versionId;
+                    } else {
+                        // Try again.
+                        logger.debug("Failed to update atomic reference. Trying again");
+                        continue;
+                    }
+                } else {
+                    // There is already a lock held. If the lock that is being held is SHARED,
+                    // and this is a SHARED lock, then we can use it.
+                    if (lockMode == LockMode.SHARED && currentInfo.getLockMode() == LockMode.SHARED) {
+                        logger.debug("Lock is already held but is a shared lock. Attempting to increment lock count");
+
+                        // lock being held is a shared lock, and this is a shared lock. We can just
+                        // update the Lock Info by incrementing the lock count and using a new expiration time.
+                        final boolean updated = updateLockInfo(currentInfo, currentInfo.getVersionId(), currentInfo.getLockCount() + 1);
+                        if (updated) {
+                            // lock info was updated. Return the current version.
+                            logger.debug("Incremented lock count. Obtained {} lock with Version ID {}", lockMode, currentInfo.getVersionId());
+                            return currentInfo.getVersionId();
+                        } else {
+                            // failed to update the lock info. The lock has expired, so we have to start over.
+                            logger.debug("Failed to update atomic reference. Trying again");
+                            continue;
+                        }
+                    } else {
+                        // either the lock being held is a mutex or this lock requires a mutex. Either
+                        // way, we cannot enter the lock, so we will wait a bit and then retry.
+                        // We wait before entering synchronized block, because we don't want to overuse
+                        // the CPU and we want to give other threads a chance to unlock the lock.
+                        logger.debug("Cannot obtain {} lock because it is already held and cannot be shared. Trying again", lockMode);
+                        continue;
+                    }
+                }
+            }
+        }
+    }
+
+    protected boolean updateLockInfo(final LockInfo currentInfo, final String versionId, final int lockCount) {
+        final LockInfo newInfo = new LockInfo(versionId, lockMode, lockCount, expirationNanos, TimeUnit.NANOSECONDS);
+        return sync.update(currentInfo, newInfo);
+    }
+
+    @Override
+    public <T> T withLock(final String identifier, final Supplier<T> action) throws LockExpiredException {
+        synchronized (sync) {
+            verifyIdentifier(identifier, sync.get());
+            return action.get();
+        }
+    }
+
+    @Override
+    public void unlock(final String identifier) throws LockExpiredException {
+        synchronized (sync) {
+            final LockInfo info = sync.get();
+            verifyIdentifier(identifier, info);
+
+            final int newLockCount = info.getLockCount() - 1;
+            if (newLockCount <= 0) {
+                sync.update(info, null);
+            } else {
+                sync.update(info, new LockInfo(info.getVersionId(), lockMode, newLockCount, expirationNanos, TimeUnit.NANOSECONDS));
+            }
+        }
+    }
+
+    private void verifyIdentifier(final String identifier, final LockInfo lockInfo) throws LockExpiredException {
+        if (lockInfo == null) {
+            throw new LockExpiredException("No lock has been obtained");
+        }
+
+        if (!lockInfo.getVersionId().equals(identifier)) {
+            throw new LockExpiredException("Incorrect Lock ID provided. This typically means that the lock has already expired and another lock has been obtained.");
+        }
+
+        if (lockInfo.isExpired()) {
+            throw new LockExpiredException("Lock has already expired");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java
index ecc725b..5911120 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java
@@ -17,30 +17,16 @@
 
 package org.apache.nifi.web.revision;
 
-import java.text.Collator;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
-import java.util.Stack;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
 
 import org.apache.nifi.authorization.user.NiFiUser;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.FlowModification;
 import org.apache.nifi.web.InvalidRevisionException;
 import org.apache.nifi.web.Revision;
 import org.slf4j.Logger;
@@ -58,197 +44,59 @@ import org.slf4j.LoggerFactory;
 public class NaiveRevisionManager implements RevisionManager {
     private static final Logger logger = LoggerFactory.getLogger(NaiveRevisionManager.class);
 
-    private final long claimExpirationNanos;
-    private final ConcurrentMap<String, RevisionLock> revisionLockMap = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Revision> revisionMap = new ConcurrentHashMap<>();
 
-    public NaiveRevisionManager() {
-        this(1, TimeUnit.MINUTES);
-    }
-
-    public NaiveRevisionManager(final NiFiProperties properties) {
-        this(getRequestTimeoutMillis(properties), TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * Constructs a new NaiveRevisionManager that uses the given amount of time as the expiration time
-     * for a Revision Claims
-     *
-     * @param claimExpiration how long a Revision Claim should last
-     * @param timeUnit the TimeUnit of 'claimExpiration'
-     */
-    public NaiveRevisionManager(final long claimExpiration, final TimeUnit timeUnit) {
-        this.claimExpirationNanos = timeUnit.toNanos(claimExpiration);
-    }
-
-    private static long getRequestTimeoutMillis(final NiFiProperties properties) {
-        return FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.REQUEST_REPLICATION_CLAIM_TIMEOUT,
-            NiFiProperties.DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT), TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public RevisionClaim requestClaim(final Revision revision, final NiFiUser user) throws InvalidRevisionException {
-        Objects.requireNonNull(user);
-        return requestClaim(Collections.singleton(revision), user);
-    }
 
     @Override
     public void reset(final Collection<Revision> revisions) {
-        final Map<String, RevisionLock> copy;
-        synchronized (this) {
-            copy = new HashMap<>(revisionLockMap);
-            revisionLockMap.clear();
+        synchronized (this) { // avoid allowing two threads to reset versions concurrently
+            revisionMap.clear();
 
             for (final Revision revision : revisions) {
-                revisionLockMap.put(revision.getComponentId(), new RevisionLock(new FlowModification(revision, null), claimExpirationNanos));
+                revisionMap.put(revision.getComponentId(), revision);
             }
         }
-
-        for (final RevisionLock lock : copy.values()) {
-            lock.clear();
-        }
     }
 
     @Override
     public List<Revision> getAllRevisions() {
-        return revisionLockMap.values().stream()
-            .map(lock -> lock.getRevision())
-            .collect(Collectors.toList());
-    }
-
-    @Override
-    public RevisionClaim requestClaim(final Collection<Revision> revisions, final NiFiUser user) {
-        Objects.requireNonNull(user);
-        logger.debug("Attempting to claim Revisions {}", revisions);
-
-        // Try to obtain a Revision Claim (temporary lock) on all revisions
-        final List<Revision> revisionList = new ArrayList<>(revisions);
-        revisionList.sort(new RevisionComparator());
-
-        ClaimResult failedClaimResult = null;
-        final Set<RevisionLock> locksObtained = new HashSet<>();
-        for (int i = 0; i < revisionList.size(); i++) {
-            final Revision revision = revisionList.get(i);
-            final RevisionLock revisionLock = getRevisionLock(revision);
-
-            final ClaimResult claimResult = revisionLock.requestClaim(revision, user);
-            logger.trace("Obtained Revision Claim for {}", revision);
-
-            if (claimResult.isSuccessful()) {
-                locksObtained.add(revisionLock);
-            } else {
-                logger.debug("Failed to obtain Revision Claim for component with ID {} because Current Revision is {} but supplied Revision is {}",
-                    revision.getComponentId(), claimResult.getLastModification().getRevision(), revision);
-
-                failedClaimResult = claimResult;
-                break;
-            }
-        }
-
-        // if we got a Revision Claim on each Revision, return a successful result
-        if (locksObtained.size() == revisionList.size()) {
-            logger.trace("Obtained Revision Claim for all components");
-
-            // it's possible that obtaining the locks took a while if we are obtaining
-            // many. Renew the timestamp to ensure that the first locks obtained don't
-            // expire too quickly.
-            final long timestamp = System.nanoTime() + claimExpirationNanos;
-            for (final RevisionLock revisionLock : locksObtained) {
-                revisionLock.renewExpiration(timestamp);
-            }
-
-            return new StandardRevisionClaim(revisions);
-        }
-
-        // We failed to obtain all of the Revision Claims necessary. Since
-        // we need this call to atomically obtain all or nothing, we have to now
-        // release the locks that we did obtain.
-        logger.debug("Failed to obtain all necessary Revisions; releasing claims for {}", locksObtained);
-        for (final RevisionLock revisionLock : locksObtained) {
-            revisionLock.releaseClaim();
-        }
-
-        final FlowModification lastMod = failedClaimResult.getLastModification();
-        if (lastMod.getRevision().getClientId() == null || lastMod.getRevision().getClientId().trim().isEmpty() || lastMod.getRevision().getVersion() == null) {
-            throw new InvalidRevisionException(String.format("Given revision %s does not match current revision %s.",
-                failedClaimResult.getProposedRevision(), lastMod.getRevision()));
-        } else {
-            throw new InvalidRevisionException(String.format("Component %s has been updated by '%s'. Please refresh to synchronize the view.",
-                failedClaimResult.getProposedRevision().getComponentId(), lastMod.getLastModifier()));
-        }
+        return new ArrayList<>(revisionMap.values());
     }
 
     @Override
     public Revision getRevision(final String componentId) {
-        final RevisionLock revisionLock = getRevisionLock(new Revision(0L, null, componentId));
-        return revisionLock.getRevision();
+        return revisionMap.computeIfAbsent(componentId, id -> new Revision(0L, null, componentId));
     }
 
     @Override
     public <T> T deleteRevision(final RevisionClaim claim, final NiFiUser user, final DeleteRevisionTask<T> task) throws ExpiredRevisionClaimException {
         Objects.requireNonNull(user);
         logger.debug("Attempting to delete revision using {}", claim);
-        int successCount = 0;
         final List<Revision> revisionList = new ArrayList<>(claim.getRevisions());
         revisionList.sort(new RevisionComparator());
 
+        // Verify the provided revisions.
         String failedId = null;
         for (final Revision revision : revisionList) {
-            final RevisionLock revisionLock = getRevisionLock(revision);
-            final boolean verified = revisionLock.requestWriteLock(revision, user);
-
-            if (verified) {
-                logger.trace("Verified Revision Claim for {}", revision);
-                successCount++;
-            } else {
-                logger.debug("Failed to verify Revision Claim for {}", revision);
-                failedId = revision.getComponentId();
-                break;
+            final Revision curRevision = getRevision(revision.getComponentId());
+            if (!curRevision.equals(revision)) {
+                throw new ExpiredRevisionClaimException("Invalid Revision was given for component with ID '" + failedId + "'");
             }
         }
 
-        if (successCount == revisionList.size()) {
-            logger.debug("Successfully verified Revision Claim for all revisions {}", claim);
-
-            final T taskValue;
-            try {
-                taskValue = task.performTask();
-            } catch (final Exception e) {
-                logger.debug("Failed to perform Claim Deletion task. Will relinquish the Revision Claims for the following revisions: {}", revisionList);
-
-                for (final Revision revision : revisionList) {
-                    final RevisionLock revisionLock = getRevisionLock(revision);
-                    revisionLock.unlock(revision, revision, user.getIdentity());
-                    logger.debug("Relinquished lock for {}", revision);
-                }
-
-                throw e;
-            }
-
-            for (final Revision revision : revisionList) {
-                deleteRevisionLock(revision);
-                logger.debug("Deleted Revision {}", revision);
-            }
+        // Perform the action provided
+        final T taskResult = task.performTask();
 
-            return taskValue;
-        }
-
-        // We failed to obtain a thread lock for all revisions. Relinquish
-        // any Revision Claims that we have
-        for (int i = 0; i < successCount; i++) {
-            final Revision revision = revisionList.get(i);
-            final RevisionLock revisionLock = getRevisionLock(revision);
-            revisionLock.relinquishRevisionClaim(revision, null);
-            logger.debug("Relinquished lock for {}", revision);
+        for (final Revision revision : revisionList) {
+            revisionMap.remove(revision.getComponentId());
         }
 
-        // Throw an Exception indicating that we failed to obtain the locks
-        throw new ExpiredRevisionClaimException("Invalid Revision was given for component with ID '" + failedId + "'");
+        return taskResult;
     }
 
     @Override
     public <T> RevisionUpdate<T> updateRevision(final RevisionClaim originalClaim, final NiFiUser user, final UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException {
         Objects.requireNonNull(user);
-        int successCount = 0;
         logger.debug("Attempting to update revision using {}", originalClaim);
 
         final List<Revision> revisionList = new ArrayList<>(originalClaim.getRevisions());
@@ -256,523 +104,55 @@ public class NaiveRevisionManager implements RevisionManager {
 
         String failedId = null;
         for (final Revision revision : revisionList) {
-            final RevisionLock revisionLock = getRevisionLock(revision);
-            final boolean verified = revisionLock.requestWriteLock(revision, user);
+            final Revision currentRevision = getRevision(revision.getComponentId());
+            final boolean verified = revision.equals(currentRevision);
 
-            if (verified) {
-                logger.trace("Verified Revision Claim for {}", revision);
-                successCount++;
-            } else {
-                logger.debug("Failed to verify Revision Claim for {}", revision);
-                failedId = revision.getComponentId();
-                break;
+            if (!verified) {
+                // Throw an Exception indicating that we failed to obtain the locks
+                throw new InvalidRevisionException("Invalid Revision was given for component with ID '" + failedId + "'");
             }
         }
 
         // We successfully verified all revisions.
-        if (successCount == revisionList.size()) {
-            logger.debug("Successfully verified Revision Claim for all revisions");
-
-            RevisionUpdate<T> updatedComponent = null;
-            try {
-                updatedComponent = task.update();
-            } finally {
-                // Release the lock that we are holding and update the revision.
-                // To do this, we need to map the old revision to the new revision
-                // so that we have an efficient way to lookup the pairing, so that
-                // we can easily obtain the old revision and the new revision for
-                // the same component in order to call #unlock on the RevisionLock
-                final Map<Revision, Revision> updatedRevisions = new HashMap<>();
-                final Map<String, Revision> revisionsByComponentId = new HashMap<>();
-                for (final Revision revision : revisionList) {
-                    updatedRevisions.put(revision, revision);
-                    revisionsByComponentId.put(revision.getComponentId(), revision);
-                }
-
-                if (updatedComponent != null) {
-                    for (final Revision updatedRevision : updatedComponent.getUpdatedRevisions()) {
-                        final Revision oldRevision = revisionsByComponentId.get(updatedRevision.getComponentId());
-                        if (oldRevision != null) {
-                            updatedRevisions.put(oldRevision, updatedRevision);
-                        }
-                    }
-                }
-
-                for (final Revision revision : revisionList) {
-                    final Revision updatedRevision = updatedRevisions.get(revision);
-                    getRevisionLock(revision).unlock(revision, updatedRevision, user.getIdentity());
-
-                    if (updatedRevision.getVersion() != revision.getVersion()) {
-                        logger.debug("Unlocked Revision {} and updated associated Version to {}", revision, updatedRevision.getVersion());
-                    } else {
-                        logger.debug("Unlocked Revision {} without updating Version", revision);
-                    }
-                }
-            }
-
-            return updatedComponent;
-        }
-
-        // We failed to obtain a thread lock for all revisions. Relinquish
-        // any Revision Claims that we have
-        for (int i = 0; i < successCount; i++) {
-            final Revision revision = revisionList.get(i);
-            final RevisionLock revisionLock = getRevisionLock(revision);
-            revisionLock.cancelWriteLock();
-            logger.debug("Relinquished lock for {}", revision);
-        }
-
-        // Throw an Exception indicating that we failed to obtain the locks
-        throw new InvalidRevisionException("Invalid Revision was given for component with ID '" + failedId + "'");
-    }
-
-    @Override
-    public boolean releaseClaim(final RevisionClaim claim, final NiFiUser user) {
-        Objects.requireNonNull(user);
-        boolean success = true;
-
-        final List<Revision> revisions = new ArrayList<>(claim.getRevisions());
-        revisions.sort(new RevisionComparator());
-
-        for (final Revision revision : revisions) {
-            final RevisionLock revisionLock = getRevisionLock(revision);
-            success = revisionLock.relinquishRevisionClaim(revision, user) && success;
-        }
-
-        return success;
-    }
-
-    @Override
-    public boolean cancelClaim(String componentId) {
-        logger.debug("Attempting to cancel claim for component {}", componentId);
-        final Revision revision = new Revision(0L, null, componentId);
-
-        final RevisionLock revisionLock = getRevisionLock(revision);
-        if (revisionLock == null) {
-            logger.debug("No Revision Lock exists for Component {} - there is no claim to cancel", componentId);
-            return false;
-        }
-
-        return revisionLock.releaseClaimIfCurrentThread(null);
-    }
-
-    @Override
-    public boolean cancelClaim(Revision revision) {
-        logger.debug("Attempting to cancel claim for {}", revision);
-
-        final RevisionLock revisionLock = getRevisionLock(revision);
-        if (revisionLock == null) {
-            logger.debug("No Revision Lock exists for {} - there is no claim to cancel", revision);
-            return false;
-        }
-
-        return revisionLock.releaseClaimIfCurrentThread(revision);
-    }
-
-    @Override
-    public boolean cancelClaims(final Set<Revision> revisions) {
-        boolean successful = false;
-        for (final Revision revision : revisions) {
-            successful = cancelClaim(revision);
-        }
-
-        return successful;
-    }
-
-    @Override
-    public <T> T get(final String componentId, final ReadOnlyRevisionCallback<T> callback) {
-        final RevisionLock revisionLock = getRevisionLock(new Revision(0L, null, componentId));
-        logger.debug("Attempting to obtain read lock for {}", revisionLock.getRevision());
-        if (logger.isTraceEnabled()) {
-            logger.trace("Attempting to obtain read lock due to following stack trace", new RuntimeException("Exception for generating stack trace for debugging purposes"));
-        }
-
-        revisionLock.acquireReadLock(null, revisionLock.getRevision().getClientId());
-        logger.debug("Obtained read lock for {}", revisionLock.getRevision());
-
-        try {
-            return callback.withRevision(revisionLock.getRevision());
-        } finally {
-            logger.debug("Releasing read lock for {}", revisionLock.getRevision());
-            revisionLock.relinquishReadLock();
-        }
-    }
-
-    @Override
-    public <T> T get(final Set<String> componentIds, final Supplier<T> callback) {
-        final List<String> sortedIds = new ArrayList<>(componentIds);
-        sortedIds.sort(Collator.getInstance());
-
-        final Stack<RevisionLock> revisionLocks = new Stack<>();
-
-        logger.debug("Will attempt to obtain read locks for components {}", componentIds);
-        if (logger.isTraceEnabled()) {
-            logger.trace("Attempting to obtain read lock due to following stack trace", new RuntimeException("Exception for generating stack trace for debugging purposes"));
-        }
-
-        for (final String componentId : sortedIds) {
-            final RevisionLock revisionLock = getRevisionLock(new Revision(0L, null, componentId));
-
-            logger.trace("Attempting to obtain read lock for {}", revisionLock.getRevision());
-            revisionLock.acquireReadLock(null, revisionLock.getRevision().getClientId());
-            revisionLocks.push(revisionLock);
-            logger.trace("Obtained read lock for {}", revisionLock.getRevision());
-        }
+        logger.debug("Successfully verified Revision Claim for all revisions");
 
-        logger.debug("Obtained read lock for all necessary components {}; calling call-back", componentIds);
+        RevisionUpdate<T> updatedComponent = null;
         try {
-            return callback.get();
+            updatedComponent = task.update();
         } finally {
-            while (!revisionLocks.isEmpty()) {
-                final RevisionLock lock = revisionLocks.pop();
-                logger.debug("Releasing read lock for {}", lock.getRevision());
-                lock.relinquishReadLock();
-            }
-        }
-    }
-
-    private synchronized void deleteRevisionLock(final Revision revision) {
-        final RevisionLock revisionLock = revisionLockMap.remove(revision.getComponentId());
-        if (revisionLock == null) {
-            return;
-        }
-
-        revisionLock.releaseClaim();
-    }
-
-    private synchronized RevisionLock getRevisionLock(final Revision revision) {
-        return revisionLockMap.computeIfAbsent(revision.getComponentId(), id -> new RevisionLock(new FlowModification(revision, null), claimExpirationNanos));
-    }
-
-
-    private static class RevisionLock {
-        private final AtomicReference<FlowModification> lastModReference = new AtomicReference<>();
-        private final AtomicReference<LockStamp> lockStamp = new AtomicReference<>();
-        private final long lockNanos;
-        private final ReadWriteLock threadLock = new ReentrantReadWriteLock();
-
-        public RevisionLock(final FlowModification lastMod, final long lockNanos) {
-            this.lockNanos = lockNanos;
-            lastModReference.set(lastMod);
-        }
-
-        /**
-         * Requests that a Revision Claim be granted for the proposed Revision
-         *
-         * @param proposedRevision the revision to obtain a Claim for
-         *
-         * @return <code>true</code> if the Revision is valid and a Claim has been granted, <code>false</code> otherwise
-         */
-        public ClaimResult requestClaim(final Revision proposedRevision, final NiFiUser user) {
-            // acquire the claim, blocking if necessary.
-            acquireClaim(user, proposedRevision.getClientId());
-
-            threadLock.writeLock().lock();
-            try {
-                // check if the revision is correct
-                final FlowModification lastModification = lastModReference.get();
-
-                final Revision currentRevision = lastModification.getRevision();
-                if (proposedRevision.equals(currentRevision)) {
-                    // revision is correct - return true
-                    return new ClaimResult(true, lastModification, proposedRevision);
-                }
-
-                // revision is incorrect. Release the Claim and return false
-                releaseClaim();
-                logger.debug("Cannot obtain Revision Claim {} because the Revision is out-of-date. Current revision is {}", proposedRevision, currentRevision);
-                return new ClaimResult(false, lastModification, proposedRevision);
-            } finally {
-                threadLock.writeLock().unlock();
-            }
-        }
-
-        /**
-         * Verifies that the given Revision has a Claim against it already and that the Claim belongs
-         * to the same client as the given Revision. If so, upgrades the Revision Claim to a lock that
-         * will not be relinquished until the {@link #unlock(Revision)} method is called.
-         *
-         * @param proposedRevision the current Revision
-         * @return <code>true</code> if the Revision Claim was upgraded to a lock, <code>false</code> otherwise
-         * @throws ExpiredRevisionClaimException if the Revision Claim for the given Revision has already expired
-         */
-        public boolean requestWriteLock(final Revision proposedRevision, final NiFiUser user) throws ExpiredRevisionClaimException {
-            Objects.requireNonNull(proposedRevision);
-            threadLock.writeLock().lock();
-
-            boolean releaseLock = true;
-            try {
-                if (getRevision().equals(proposedRevision)) {
-                    final LockStamp stamp = lockStamp.get();
-
-                    if (stamp == null) {
-                        final IllegalStateException ise = new IllegalStateException("No claim has been obtained for " + proposedRevision + " so cannot lock the component for modification");
-                        logger.debug("Attempted to obtain write lock for {} but no Claim was obtained; throwing IllegalStateException", proposedRevision, ise);
-                        throw ise;
-                    }
-
-                    final boolean userEqual = stamp.getUser() == null || stamp.getUser().equals(user);
-                    if (!userEqual) {
-                        logger.debug("Failed to verify {} because the User was not the same as the Lock Stamp's User (Lock Stamp was {})", proposedRevision, stamp);
-                        throw new InvalidRevisionException("Cannot obtain write lock for " + proposedRevision + " because it was claimed by " + stamp.getUser());
-                    }
-
-                    final boolean clientIdEqual = stamp.getClientId() == null || stamp.getClientId().equals(proposedRevision.getClientId());
-                    if (!clientIdEqual) {
-                        logger.debug("Failed to verify {} because the Client ID was not the same as the Lock Stamp's Client ID (Lock Stamp was {})", proposedRevision, stamp);
-                        throw new InvalidRevisionException("Cannot obtain write lock for " + proposedRevision + " because it was claimed with a different Client ID");
-                    }
-
-                    // TODO - Must make sure that we don't have an expired stamp if it is the result of another
-                    // operation taking a long time. I.e., Client A fires off two requests for Component X. If the
-                    // first one takes 2 minutes to complete, it should not result in the second request getting
-                    // rejected. I.e., we want to ensure that if the request is received before the Claim expired,
-                    // that we do not throw an ExpiredRevisionClaimException. Expiration of the Revision is intended
-                    // only to avoid the case where a node obtains a Claim and then the node is lost or otherwise does
-                    // not fulfill the second phase of the two-phase commit.
-                    // We may need a Queue of updates (queue would need to be bounded, with a request getting
-                    // rejected if queue is full).
-                    if (stamp.isExpired()) {
-                        throw new ExpiredRevisionClaimException("Claim for " + proposedRevision + " has expired");
-                    }
-
-                    // Intentionally leave the thread lock in a locked state!
-                    releaseLock = false;
-                    return true;
-                }
-            } finally {
-                if (releaseLock) {
-                    threadLock.writeLock().unlock();
-                }
+            // Release the lock that we are holding and update the revision.
+            // To do this, we need to map the old revision to the new revision
+            // so that we have an efficient way to lookup the pairing, so that
+            // we can easily obtain the old revision and the new revision for
+            // the same component in order to call #unlock on the RevisionLock
+            final Map<Revision, Revision> updatedRevisions = new HashMap<>();
+            final Map<String, Revision> revisionsByComponentId = new HashMap<>();
+            for (final Revision revision : revisionList) {
+                updatedRevisions.put(revision, revision);
+                revisionsByComponentId.put(revision.getComponentId(), revision);
             }
 
-            return false;
-        }
-
-        private void acquireClaim(final NiFiUser user, final String clientId) {
-            while (true) {
-                final LockStamp stamp = lockStamp.get();
-
-                if (stamp == null || stamp.isExpired()) {
-                    final long now = System.nanoTime();
-                    final boolean lockObtained = lockStamp.compareAndSet(stamp, new LockStamp(user, clientId, now + lockNanos));
-                    if (lockObtained) {
-                        return;
+            if (updatedComponent != null) {
+                for (final Revision updatedRevision : updatedComponent.getUpdatedRevisions()) {
+                    final Revision oldRevision = revisionsByComponentId.get(updatedRevision.getComponentId());
+                    if (oldRevision != null) {
+                        updatedRevisions.put(oldRevision, updatedRevision);
                     }
-                } else {
-                    Thread.yield();
-                }
-            }
-        }
-
-        public void acquireReadLock(final NiFiUser user, final String clientId) {
-            // Wait until we can claim the lock stamp
-            boolean obtained = false;
-            while (!obtained) {
-                // If the lock stamp is not null, then there is either an active Claim or a
-                // write lock held. Wait until it is null and then replace it atomically
-                // with a LockStamp that does not expire (expiration time is Long.MAX_VALUE).
-                final LockStamp curStamp = lockStamp.get();
-                final boolean nullOrExpired = (curStamp == null || curStamp.isExpired());
-                obtained = nullOrExpired && lockStamp.compareAndSet(curStamp, new LockStamp(user, clientId, Long.MAX_VALUE));
-
-                if (!obtained) {
-                    // Could not obtain lock. Yield so that we don't sit around doing nothing with the thread.
-                    Thread.yield();
                 }
             }
 
-            // Now we can obtain the read lock without problem.
-            threadLock.readLock().lock();
-        }
-
-        public void relinquishReadLock() {
-            lockStamp.set(null);
-            threadLock.readLock().unlock();
-        }
-
-        private void releaseClaim() {
-            lockStamp.set(null);
-        }
-
-        public void clear() {
-            threadLock.writeLock().lock();
-            try {
-                releaseClaim();
-            } finally {
-                threadLock.writeLock().unlock();
-            }
-        }
-
-        public boolean releaseClaimIfCurrentThread(final Revision revision) {
-            threadLock.writeLock().lock();
-            try {
-                final LockStamp stamp = lockStamp.get();
-                if (stamp == null) {
-                    logger.debug("Cannot cancel claim for {} because there is no claim held", getRevision());
-                    return false;
-                }
-
-                if (revision != null && !getRevision().equals(revision)) {
-                    throw new InvalidRevisionException("Cannot release claim because the provided Revision is not valid");
-                }
-
-                if (stamp.isObtainedByCurrentThread()) {
-                    releaseClaim();
-                    logger.debug("Successfully canceled claim for {}", getRevision());
-                    return true;
-                }
-
-                logger.debug("Cannot cancel claim for {} because it is held by Thread {} and current Thread is {}",
-                    getRevision(), stamp.obtainingThread, Thread.currentThread().getName());
-                return false;
-            } finally {
-                threadLock.writeLock().unlock();
-            }
-        }
+            for (final Revision revision : revisionList) {
+                final Revision updatedRevision = updatedRevisions.get(revision);
+                revisionMap.put(updatedRevision.getComponentId(), updatedRevision);
 
-        /**
-         * Releases the Revision Claim if and only if the current revision matches the proposed revision
-         *
-         * @param proposedRevision the proposed revision to check against the current revision
-         * @return <code>true</code> if the Revision Claim was relinquished, <code>false</code> otherwise
-         */
-        public boolean relinquishRevisionClaim(final Revision proposedRevision, final NiFiUser user) {
-            threadLock.writeLock().lock();
-            try {
-                final LockStamp stamp = lockStamp.get();
-                final boolean userOk = stamp == null || stamp.getUser().equals(user);
-                if (userOk) {
-                    if (getRevision().equals(proposedRevision)) {
-                        releaseClaim();
-                        return true;
-                    }
+                if (updatedRevision.getVersion() != revision.getVersion()) {
+                    logger.debug("Unlocked Revision {} and updated associated Version to {}", revision, updatedRevision.getVersion());
                 } else {
-                    throw new InvalidRevisionException("Cannot relinquish claim for " + proposedRevision + " because it was claimed by " + stamp.getUser());
+                    logger.debug("Unlocked Revision {} without updating Version", revision);
                 }
-
-                return false;
-            } finally {
-                threadLock.writeLock().unlock();
-            }
-        }
-
-        /**
-         * Releases the lock and any Revision Claim that is held for the given Revision and
-         * updates the revision
-         *
-         * @param proposedRevision the current Revision
-         * @param updatedRevision the Revision to update the current revision to
-         */
-        public void unlock(final Revision proposedRevision, final Revision updatedRevision, final String modifier) {
-            final Revision curRevision = getRevision();
-            if (curRevision == null) {
-                throw new IllegalMonitorStateException("Cannot unlock " + proposedRevision + " because it is not locked");
-            }
-
-            if (!curRevision.equals(proposedRevision)) {
-                // Intentionally leave the thread lock in a locked state!
-                throw new IllegalMonitorStateException("Cannot unlock " + proposedRevision + " because the version is not valid");
             }
-
-            lastModReference.set(new FlowModification(updatedRevision, modifier));
-
-            // Set stamp to null to indicate that it is not locked.
-            releaseClaim();
-
-            // Thread Lock should already be locked if this is called.
-            threadLock.writeLock().unlock();
-        }
-
-        public void cancelWriteLock() {
-            releaseClaim();
-            threadLock.writeLock().unlock();
-        }
-
-        /**
-         * Updates expiration time to the given timestamp
-         *
-         * @param timestamp the new expiration timestamp in nanoseconds
-         */
-        public void renewExpiration(final long timestamp) {
-            final LockStamp stamp = lockStamp.get();
-
-            final NiFiUser user;
-            final String clientId;
-            if (stamp == null) {
-                user = null;
-                clientId = null;
-            } else {
-                user = stamp.getUser();
-                clientId = stamp.getClientId();
-            }
-
-            lockStamp.set(new LockStamp(user, clientId, timestamp));
         }
 
-        public Revision getRevision() {
-            final FlowModification lastMod = lastModReference.get();
-            return (lastMod == null) ? null : lastMod.getRevision();
-        }
-    }
-
-
-    private static class LockStamp {
-        private final NiFiUser user;
-        private final String clientId;
-        private final long expirationTimestamp;
-        private final Thread obtainingThread;
-
-        public LockStamp(final NiFiUser user, final String clientId, final long expirationTimestamp) {
-            this.user = user;
-            this.clientId = clientId;
-            this.expirationTimestamp = expirationTimestamp;
-            this.obtainingThread = Thread.currentThread();
-        }
-
-        public NiFiUser getUser() {
-            return user;
-        }
-
-        public String getClientId() {
-            return clientId;
-        }
-
-        public boolean isExpired() {
-            return System.nanoTime() > expirationTimestamp;
-        }
-
-        public boolean isObtainedByCurrentThread() {
-            return obtainingThread == Thread.currentThread();
-        }
-
-        @Override
-        public String toString() {
-            return "LockStamp[user=" + user + ", clientId=" + clientId + ", expired=" + isExpired() + "]";
-        }
-    }
-
-    private static class ClaimResult {
-        private final boolean successful;
-        private final FlowModification lastMod;
-        private final Revision proposedRevision;
-
-        public ClaimResult(final boolean successful, final FlowModification lastMod, final Revision proposedRevision) {
-            this.successful = successful;
-            this.lastMod = lastMod;
-            this.proposedRevision = proposedRevision;
-        }
-
-        public boolean isSuccessful() {
-            return successful;
-        }
-
-        public FlowModification getLastModification() {
-            return lastMod;
-        }
-
-        public Revision getProposedRevision() {
-            return proposedRevision;
-        }
+        return updatedComponent;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/concurrent/TestReentrantDistributedLock.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/concurrent/TestReentrantDistributedLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/concurrent/TestReentrantDistributedLock.java
new file mode 100644
index 0000000..8027614
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/concurrent/TestReentrantDistributedLock.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.concurrent;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestReentrantDistributedLock {
+    private ReadWriteLockSync sync;
+
+    @Before
+    public void setup() {
+        sync = new ReadWriteLockSync();
+    }
+
+    @Test(timeout = 5000)
+    public void testMultipleReadLocks() throws LockExpiredException {
+        final ReentrantDistributedLock lock = createReadLock();
+        final String id1 = lock.lock();
+        final String id2 = lock.lock();
+        assertEquals(id1, id2);
+
+        assertEquals(2, lock.getClaimCount());
+        lock.unlock(id1);
+        assertEquals(1, lock.getClaimCount());
+        lock.unlock(id2);
+        assertEquals(0, lock.getClaimCount());
+    }
+
+    @Test(timeout = 10000)
+    public void testMultipleWriteLocksBlock() throws LockExpiredException {
+        final ReentrantDistributedLock lock = createWriteLock();
+        final String id1 = lock.lock();
+        assertNotNull(id1);
+
+        final long startTime = System.nanoTime();
+        final String id2 = lock.tryLock(500, TimeUnit.MILLISECONDS);
+        assertNull(id2);
+
+        // We don't know exactly how long it will take to timeout because the time periods
+        // won't be exact, but it should take more than 350 milliseconds.
+        final long nanos = System.nanoTime() - startTime;
+        assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(350L));
+
+        lock.unlock(id1);
+        final String id3 = lock.tryLock(500, TimeUnit.MILLISECONDS);
+        assertNotNull(id3);
+        assertNotSame(id1, id3);
+        lock.unlock(id3);
+    }
+
+    @Test(timeout = 10000)
+    public void testReadLockBlocksWriteLock() throws LockExpiredException {
+        final ReentrantDistributedLock readLock = createReadLock();
+        final ReentrantDistributedLock writeLock = createWriteLock();
+
+        final String id1 = readLock.lock();
+        assertNotNull(id1);
+
+        final long startTime = System.nanoTime();
+        final String id2 = writeLock.tryLock(500, TimeUnit.MILLISECONDS);
+        assertNull(id2);
+
+        // We don't know exactly how long it will take to timeout because the time periods
+        // won't be exact, but it should take more than 350 milliseconds.
+        final long nanos = System.nanoTime() - startTime;
+        assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(350L));
+
+        readLock.unlock(id1);
+
+        final String id3 = writeLock.lock();
+        assertNotNull(id3);
+        assertNotSame(id1, id3);
+
+        writeLock.unlock(id3);
+    }
+
+    @Test(timeout = 10000)
+    public void testWriteLockBlocksReadLock() throws LockExpiredException {
+        final ReentrantDistributedLock readLock = createReadLock();
+        final ReentrantDistributedLock writeLock = createWriteLock();
+
+        final String id1 = writeLock.lock();
+        assertNotNull(id1);
+
+        final long startTime = System.nanoTime();
+        final String id2 = readLock.tryLock(500, TimeUnit.MILLISECONDS);
+        assertNull(id2);
+
+        // We don't know exactly how long it will take to timeout because the time periods
+        // won't be exact, but it should take more than 350 milliseconds.
+        final long nanos = System.nanoTime() - startTime;
+        assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(350L));
+
+        writeLock.unlock(id1);
+
+        final String id3 = readLock.lock();
+        assertNotNull(id3);
+        assertNotSame(id1, id3);
+
+        readLock.unlock(id3);
+    }
+
+    @Test(timeout = 10000)
+    public void testMultipleReadLocksBlockingOnWriteLock() throws InterruptedException, LockExpiredException {
+        final ReentrantDistributedLock readLock = createReadLock();
+        final ReentrantDistributedLock writeLock = createWriteLock();
+
+        final String id1 = writeLock.lock();
+        assertNotNull(id1);
+
+        final ExecutorService executor = Executors.newFixedThreadPool(3);
+        final AtomicReferenceArray<String> array = new AtomicReferenceArray<>(3);
+        for (int i = 0; i < 3; i++) {
+            final int index = i;
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    final String id = readLock.lock();
+                    assertNotNull(id);
+                    array.set(index, id);
+                }
+            });
+        }
+
+        // wait a bit and then make sure that no values have been set
+        Thread.sleep(250L);
+        for (int i = 0; i < 3; i++) {
+            assertNull(array.get(i));
+        }
+
+        // unlock so that the readers can lock.
+        writeLock.unlock(id1);
+
+        executor.shutdown();
+        executor.awaitTermination(10, TimeUnit.MINUTES);
+
+        final String id = array.get(0);
+        assertNotNull(id);
+        for (int i = 0; i < 3; i++) {
+            assertEquals(id, array.get(i));
+        }
+
+        for (int i = 0; i < 3; i++) {
+            assertEquals(3 - i, readLock.getClaimCount());
+            readLock.unlock(id);
+        }
+
+        assertEquals(0, readLock.getClaimCount());
+    }
+
+    @Test(timeout = 10000)
+    public void testLockExpires() {
+        final ReentrantDistributedLock lock = new ReentrantDistributedLock(LockMode.MUTUALLY_EXCLUSIVE, sync, 25, TimeUnit.MILLISECONDS);
+        final String id1 = lock.lock();
+        assertNotNull(id1);
+
+        final long start = System.nanoTime();
+        final String id2 = lock.lock();
+        final long nanos = System.nanoTime() - start;
+
+        assertNotNull(id2);
+        assertNotSame(id1, id2);
+
+        // The timeout may not entirely elapse but will be close. Give 5 milliseconds buffer
+        assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(20));
+    }
+
+    @Test(timeout = 10000)
+    public void testWithLock() throws LockExpiredException, Exception {
+        final ReentrantDistributedLock lock = createWriteLock();
+        final String id = lock.lock();
+        assertEquals(1, lock.getClaimCount());
+
+        final Object obj = new Object();
+        final Object returned = lock.withLock(id, () -> obj);
+        assertTrue(returned == obj);
+        assertEquals(1, lock.getClaimCount());
+        lock.unlock(id);
+        assertEquals(0, lock.getClaimCount());
+    }
+
+    private ReentrantDistributedLock createReadLock() {
+        return new ReentrantDistributedLock(LockMode.SHARED, sync, 30, TimeUnit.SECONDS);
+    }
+
+    private ReentrantDistributedLock createWriteLock() {
+        return new ReentrantDistributedLock(LockMode.MUTUALLY_EXCLUSIVE, sync, 30, TimeUnit.SECONDS);
+    }
+}