You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/07/06 18:36:46 UTC
[2/4] nifi git commit: NIFI-2170: Refactor RevisionManager into a
RevisionManager and a DistributedLockingManager. This closes #610
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
index b42f839..794c5a1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
@@ -52,6 +52,7 @@ import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
+import org.apache.nifi.web.concurrent.LockExpiredException;
import org.apache.nifi.web.util.ClientResponseUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -410,17 +411,20 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
}
processor = entity.getComponent();
} else {
- // claim the revision
- serviceFacade.claimRevision(revision, user);
+ // update processor within write lock
+ final String writeLockId = serviceFacade.obtainWriteLock();
try {
-
- ProcessorDTO processorDTO = buildProcessorDto(id,annotationData,properties);
- final ProcessorEntity entity = serviceFacade.updateProcessor(revision,processorDTO);
- processor = entity.getComponent();
-
+ processor = serviceFacade.withWriteLock(writeLockId, () -> {
+ ProcessorDTO processorDTO = buildProcessorDto(id, annotationData, properties);
+ final ProcessorEntity entity = serviceFacade.updateProcessor(revision, processorDTO);
+ return entity.getComponent();
+ });
} finally {
- // ensure the revision is canceled.. if the operation succeed, this is a noop
- serviceFacade.cancelRevision(revision);
+ // ensure the lock is released
+ try {
+ serviceFacade.releaseWriteLock(writeLockId);
+ } catch (final LockExpiredException e) {
+ }
}
}
@@ -565,15 +569,19 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
controllerServiceDto.setAnnotationData(annotationData);
controllerServiceDto.setProperties(properties);
- // claim the revision
- serviceFacade.claimRevision(revision, user);
+ // update controller service within write lock
+ final String writeLockId = serviceFacade.obtainWriteLock();
try {
- // perform the update
- final ControllerServiceEntity entity = serviceFacade.updateControllerService(revision, controllerServiceDto);
- controllerService = entity.getComponent();
+ controllerService = serviceFacade.withWriteLock(writeLockId, () -> {
+ final ControllerServiceEntity entity = serviceFacade.updateControllerService(revision, controllerServiceDto);
+ return entity.getComponent();
+ });
} finally {
- // ensure the revision is canceled.. if the operation succeed, this is a noop
- serviceFacade.cancelRevision(revision);
+ // ensure the lock is released
+ try {
+ serviceFacade.releaseWriteLock(writeLockId);
+ } catch (final LockExpiredException e) {
+ }
}
} else {
// if this is a standalone instance the service should have been found above... there should
@@ -733,14 +741,20 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
reportingTaskDto.setAnnotationData(annotationData);
reportingTaskDto.setProperties(properties);
- // claim the revision
- serviceFacade.claimRevision(revision, user);
+ // obtain write lock
+ final String writeLockId = serviceFacade.obtainWriteLock();
try {
- final ReportingTaskEntity entity = serviceFacade.updateReportingTask(revision, reportingTaskDto);
- reportingTask = entity.getComponent();
+ reportingTask = serviceFacade.withWriteLock(writeLockId, () -> {
+ serviceFacade.verifyRevision(revision, user);
+ final ReportingTaskEntity entity = serviceFacade.updateReportingTask(revision, reportingTaskDto);
+ return entity.getComponent();
+ });
} finally {
- // ensure the revision is canceled.. if the operation succeed, this is a noop
- serviceFacade.cancelRevision(revision);
+ // ensure the lock is released
+ try {
+ serviceFacade.releaseWriteLock(writeLockId);
+ } catch (final LockExpiredException e) {
+ }
}
} else {
// if this is a standalone instance the task should have been found above... there should
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 120c387..6fdca03 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -16,10 +16,31 @@
*/
package org.apache.nifi.web.api;
-import com.sun.jersey.api.core.HttpContext;
-import com.sun.jersey.api.representation.Form;
-import com.sun.jersey.core.util.MultivaluedMapImpl;
-import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.CacheControl;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriBuilderException;
+import javax.ws.rs.core.UriInfo;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
@@ -40,32 +61,14 @@ import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.SnippetDTO;
import org.apache.nifi.web.api.entity.ComponentEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
+import org.apache.nifi.web.concurrent.LockExpiredException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.core.CacheControl;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.ResponseBuilder;
-import javax.ws.rs.core.UriBuilder;
-import javax.ws.rs.core.UriBuilderException;
-import javax.ws.rs.core.UriInfo;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
+import com.sun.jersey.api.core.HttpContext;
+import com.sun.jersey.api.representation.Form;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider;
/**
* Base class for controllers.
@@ -343,8 +346,8 @@ public abstract class ApplicationResource {
return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER) != null;
}
- protected boolean isClaimCancelationPhase(final HttpServletRequest httpServletRequest) {
- return httpServletRequest.getHeader(RequestReplicator.CLAIM_CANCEL_HEADER) != null;
+ protected boolean isLockCancelationPhase(final HttpServletRequest httpServletRequest) {
+ return httpServletRequest.getHeader(RequestReplicator.LOCK_CANCELATION_HEADER) != null;
}
/**
@@ -444,9 +447,7 @@ public abstract class ApplicationResource {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
return withWriteLock(serviceFacade, authorizer, verifier, action,
- () -> serviceFacade.claimRevision(revision, user),
- () -> serviceFacade.cancelRevision(revision),
- () -> serviceFacade.releaseRevisionClaim(revision, user));
+ () -> serviceFacade.verifyRevision(revision, user));
}
/**
@@ -463,56 +464,76 @@ public abstract class ApplicationResource {
final Runnable verifier, final Supplier<Response> action) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
return withWriteLock(serviceFacade, authorizer, verifier, action,
- () -> serviceFacade.claimRevisions(revisions, user),
- () -> serviceFacade.cancelRevisions(revisions),
- () -> serviceFacade.releaseRevisionClaims(revisions, user));
+ () -> serviceFacade.verifyRevisions(revisions, user));
}
/**
* Executes an action through the service facade using the specified revision.
*
- * @param serviceFacade service facade
- * @param authorizer authorizer
- * @param verifier verifier
- * @param action the action to execute
- * @param claimRevision a callback that will claim the necessary revisions for the operation
- * @param cancelRevision a callback that will cancel the necessary revisions if the operation fails
- * @param releaseClaim a callback that will release any previously claimed revision if the operation is canceled after the first phase
+ * @param serviceFacade service facade
+ * @param authorizer authorizer
+ * @param verifier verifier
+ * @param action the action to execute
+ * @param verifyRevision a callback that will claim the necessary revisions for the operation
* @return the response
*/
private Response withWriteLock(
final NiFiServiceFacade serviceFacade, final AuthorizeAccess authorizer, final Runnable verifier, final Supplier<Response> action,
- final Runnable claimRevision, final Runnable cancelRevision, final Runnable releaseClaim) {
+ final Runnable verifyRevision) {
+
+ if (isLockCancelationPhase(httpServletRequest)) {
+ final String lockVersionId = httpServletRequest.getHeader(RequestReplicator.LOCK_VERSION_ID_HEADER);
+ try {
+ serviceFacade.releaseWriteLock(lockVersionId);
+ } catch (final Exception e) {
+ // If the lock has expired, then it has already been unlocked.
+ }
- if (isClaimCancelationPhase(httpServletRequest)) {
- releaseClaim.run();
return generateOkResponse().build();
}
- final boolean validationPhase = isValidationPhase(httpServletRequest);
- if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
- // authorize access
- serviceFacade.authorizeAccess(authorizer);
- claimRevision.run();
- }
-
+ String lockId = null;
try {
+ final boolean validationPhase = isValidationPhase(httpServletRequest);
+ if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
+ // authorize access
+ serviceFacade.authorizeAccess(authorizer);
+
+ lockId = httpServletRequest.getHeader(RequestReplicator.LOCK_VERSION_ID_HEADER);
+ lockId = serviceFacade.obtainWriteLock(lockId);
+ verifyRevision.run();
+ } else {
+ lockId = httpServletRequest.getHeader(RequestReplicator.LOCK_VERSION_ID_HEADER);
+ }
+
if (validationPhase) {
if (verifier != null) {
verifier.run();
}
return generateContinueResponse().build();
}
- } catch (final Exception e) {
- cancelRevision.run();
- throw e;
- }
- try {
- return action.get();
- } finally {
- cancelRevision.run();
+ try {
+ return serviceFacade.withWriteLock(lockId, () -> action.get());
+ } finally {
+ try {
+ serviceFacade.releaseWriteLock(lockId);
+ } catch (final LockExpiredException e) {
+ // If the lock expires here, it's okay. We've already completed our action,
+ // so the expiration of the lock is of no consequence to us.
+ }
+ }
+ } catch (final RuntimeException t) {
+ if (lockId != null) {
+ try {
+ serviceFacade.releaseWriteLock(lockId);
+ } catch (final Exception e) {
+ t.addSuppressed(e);
+ }
+ }
+
+ throw t;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/LockExpiredExceptionMapper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/LockExpiredExceptionMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/LockExpiredExceptionMapper.java
new file mode 100644
index 0000000..56c87e9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/LockExpiredExceptionMapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.config;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.web.concurrent.LockExpiredException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Provider
+public class LockExpiredExceptionMapper implements ExceptionMapper<LockExpiredException> {
+ private static final Logger logger = LoggerFactory.getLogger(InvalidRevisionExceptionMapper.class);
+
+ @Override
+ public Response toResponse(LockExpiredException exception) {
+ // log the error
+ logger.warn(String.format("%s. Returning %s response.", exception, Response.Status.CONFLICT));
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(StringUtils.EMPTY, exception);
+ }
+
+ return Response.status(Response.Status.CONFLICT).entity(exception.getMessage()).type("text/plain").build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
index a1a63a6..15d2f0f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
@@ -33,7 +33,6 @@
<!-- revision manager -->
<bean id="revisionManager" class="org.apache.nifi.web.revision.NaiveRevisionManager">
- <constructor-arg ref="nifiProperties"></constructor-arg>
</bean>
<!-- content access -->
@@ -164,6 +163,11 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="heartbeatMonitor" ref="heartbeatMonitor" />
<property name="bulletinRepository" ref="bulletinRepository"/>
+ <property name="lockManager" ref="lockManager" />
+ </bean>
+
+ <bean id="lockManager" class="org.apache.nifi.web.concurrent.DistributedReadWriteLock">
+ <constructor-arg ref="nifiProperties" />
</bean>
<!-- component ui extension configuration context -->
@@ -393,6 +397,7 @@
<bean class="org.apache.nifi.web.api.config.IllegalNodeReconnectionExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.IllegalStateExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.InvalidRevisionExceptionMapper" scope="singleton"/>
+ <bean class="org.apache.nifi.web.api.config.LockExpiredExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.JsonMappingExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.JsonParseExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.MutableRequestExceptionMapper" scope="singleton"/>
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/DistributedReadWriteLock.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/DistributedReadWriteLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/DistributedReadWriteLock.java
new file mode 100644
index 0000000..608d0c0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/DistributedReadWriteLock.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.concurrent;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+
+public class DistributedReadWriteLock implements DistributedLockingManager {
+ private final DistributedLock readLock;
+ private final DistributedLock writeLock;
+
+ public DistributedReadWriteLock(final NiFiProperties properties) {
+ this(FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.REQUEST_REPLICATION_CLAIM_TIMEOUT,
+ NiFiProperties.DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT), TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
+ }
+
+ public DistributedReadWriteLock(final long lockExpirationPeriod, final TimeUnit lockExpirationUnit) {
+ final ReadWriteLockSync sync = new ReadWriteLockSync();
+ readLock = new ReentrantDistributedLock(LockMode.SHARED, sync, lockExpirationPeriod, lockExpirationUnit);
+ writeLock = new ReentrantDistributedLock(LockMode.MUTUALLY_EXCLUSIVE, sync, lockExpirationPeriod, lockExpirationUnit);
+ }
+
+ @Override
+ public DistributedLock getReadLock() {
+ return readLock;
+ }
+
+ @Override
+ public DistributedLock getWriteLock() {
+ return writeLock;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockInfo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockInfo.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockInfo.java
new file mode 100644
index 0000000..a34bb5b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockInfo.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.concurrent;
+
+import java.util.concurrent.TimeUnit;
+
+public class LockInfo {
+ private final String versionId;
+ private final int lockCount;
+ private final LockMode lockMode;
+ private final long expirationTime;
+
+ public LockInfo(final String versionId, final LockMode lockMode, final int lockCount, final long expirationPeriod, final TimeUnit expirationUnit) {
+ this.versionId = versionId;
+ this.lockMode = lockMode;
+ this.lockCount = lockCount;
+ this.expirationTime = System.nanoTime() + expirationUnit.toNanos(expirationPeriod);
+ }
+
+ public boolean isExpired() {
+ return System.nanoTime() > expirationTime;
+ }
+
+ public String getVersionId() {
+ return versionId;
+ }
+
+ public int getLockCount() {
+ return lockCount;
+ }
+
+ public LockMode getLockMode() {
+ return lockMode;
+ }
+
+ @Override
+ public String toString() {
+ return "LockInfo[versionId=" + versionId + ", lockMode=" + lockMode + ", lockCount = " + lockCount + ", expired=" + isExpired() + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockMode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockMode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockMode.java
new file mode 100644
index 0000000..2cc5eac
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockMode.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.concurrent;
+
+public enum LockMode {
+
+ SHARED,
+
+ MUTUALLY_EXCLUSIVE;
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReadWriteLockSync.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReadWriteLockSync.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReadWriteLockSync.java
new file mode 100644
index 0000000..ef32f8f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReadWriteLockSync.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.concurrent;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ReadWriteLockSync {
+ private final AtomicReference<LockInfo> lockInfoRef = new AtomicReference<>();
+
+ public LockInfo get() {
+ return lockInfoRef.get();
+ }
+
+ public boolean update(final LockInfo currentLock, final LockInfo updatedLock) {
+ return lockInfoRef.compareAndSet(currentLock, updatedLock);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReentrantDistributedLock.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReentrantDistributedLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReentrantDistributedLock.java
new file mode 100644
index 0000000..c51feec
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReentrantDistributedLock.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.concurrent;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReentrantDistributedLock implements DistributedLock {
+ private static final Logger logger = LoggerFactory.getLogger(ReentrantDistributedLock.class);
+
+ private final long expirationNanos;
+
+ private final ReadWriteLockSync sync;
+ private final LockMode lockMode;
+
+ public ReentrantDistributedLock(final LockMode lockMode, final ReadWriteLockSync sync, final long expirationTimePeriod, final TimeUnit expirationTimeUnit) {
+ this.lockMode = lockMode;
+ this.sync = sync;
+ this.expirationNanos = expirationTimeUnit.toNanos(expirationTimePeriod);
+ }
+
+ int getClaimCount() {
+ final LockInfo currentInfo = sync.get();
+ if (currentInfo == null || currentInfo.isExpired()) {
+ return 0;
+ }
+
+ return currentInfo.getLockCount();
+ }
+
+ @Override
+ public String lock() {
+ return lock(null);
+ }
+
+ @Override
+ public String lock(final String versionIdentifier) {
+ return tryLock(-1L, TimeUnit.MILLISECONDS, versionIdentifier);
+ }
+
+ @Override
+ public String tryLock(final long time, final TimeUnit timeUnit) {
+ return tryLock(time, timeUnit, null);
+ }
+
+ @Override
+ public String tryLock(final long timePeriod, final TimeUnit timeUnit, final String versionIdentifier) {
+ final long stopTryingTime = timePeriod < 0 ? -1L : System.nanoTime() + timeUnit.toNanos(timePeriod);
+ logger.debug("Attempting to obtain {} lock with a max wait of {} {}", lockMode, timePeriod, timeUnit);
+
+ long i = 0;
+ while (true) {
+ if (i++ > 0) {
+ if (stopTryingTime > 0L && System.nanoTime() > stopTryingTime) {
+ logger.debug("Failed to obtain {} lock within {} {}; returning null for tryLock", lockMode, timePeriod, timeUnit);
+ return null;
+ }
+
+ // If not the first time we've reached this point, we want to
+ // give other threads a chance to release their locks before
+ // we enter the synchronized block.
+ Thread.yield();
+ }
+
+ synchronized (sync) {
+ final LockInfo currentInfo = sync.get();
+ logger.trace("Current Lock Info = {}", currentInfo);
+
+ if (currentInfo == null || currentInfo.isExpired()) {
+ // There is no lock currently held. Attempt to obtain the lock.
+ final String versionId = versionIdentifier == null ? UUID.randomUUID().toString() : versionIdentifier;
+ final boolean updated = updateLockInfo(currentInfo, versionId, 1);
+
+ if (updated) {
+ // Lock has been obtained. Return the current version.
+ logger.debug("Obtained {} lock with Version ID {}", lockMode, versionId);
+ return versionId;
+ } else {
+ // Try again.
+ logger.debug("Failed to update atomic reference. Trying again");
+ continue;
+ }
+ } else {
+ // There is already a lock held. If the lock that is being held is SHARED,
+ // and this is a SHARED lock, then we can use it.
+ if (lockMode == LockMode.SHARED && currentInfo.getLockMode() == LockMode.SHARED) {
+ logger.debug("Lock is already held but is a shared lock. Attempting to increment lock count");
+
+ // lock being held is a shared lock, and this is a shared lock. We can just
+ // update the Lock Info by incrementing the lock count and using a new expiration time.
+ final boolean updated = updateLockInfo(currentInfo, currentInfo.getVersionId(), currentInfo.getLockCount() + 1);
+ if (updated) {
+ // lock info was updated. Return the current version.
+ logger.debug("Incremented lock count. Obtained {} lock with Version ID {}", lockMode, currentInfo.getVersionId());
+ return currentInfo.getVersionId();
+ } else {
+ // failed to update the lock info. The lock has expired, so we have to start over.
+ logger.debug("Failed to update atomic reference. Trying again");
+ continue;
+ }
+ } else {
+ // either the lock being held is a mutex or this lock requires a mutex. Either
+ // way, we cannot enter the lock, so we will wait a bit and then retry.
+ // We wait before entering synchronized block, because we don't want to overuse
+ // the CPU and we want to give other threads a chance to unlock the lock.
+ logger.debug("Cannot obtain {} lock because it is already held and cannot be shared. Trying again", lockMode);
+ continue;
+ }
+ }
+ }
+ }
+ }
+
+ protected boolean updateLockInfo(final LockInfo currentInfo, final String versionId, final int lockCount) {
+ final LockInfo newInfo = new LockInfo(versionId, lockMode, lockCount, expirationNanos, TimeUnit.NANOSECONDS);
+ return sync.update(currentInfo, newInfo);
+ }
+
+ @Override
+ public <T> T withLock(final String identifier, final Supplier<T> action) throws LockExpiredException {
+ synchronized (sync) {
+ verifyIdentifier(identifier, sync.get());
+ return action.get();
+ }
+ }
+
+ @Override
+ public void unlock(final String identifier) throws LockExpiredException {
+ synchronized (sync) {
+ final LockInfo info = sync.get();
+ verifyIdentifier(identifier, info);
+
+ final int newLockCount = info.getLockCount() - 1;
+ if (newLockCount <= 0) {
+ sync.update(info, null);
+ } else {
+ sync.update(info, new LockInfo(info.getVersionId(), lockMode, newLockCount, expirationNanos, TimeUnit.NANOSECONDS));
+ }
+ }
+ }
+
+ private void verifyIdentifier(final String identifier, final LockInfo lockInfo) throws LockExpiredException {
+ if (lockInfo == null) {
+ throw new LockExpiredException("No lock has been obtained");
+ }
+
+ if (!lockInfo.getVersionId().equals(identifier)) {
+ throw new LockExpiredException("Incorrect Lock ID provided. This typically means that the lock has already expired and another lock has been obtained.");
+ }
+
+ if (lockInfo.isExpired()) {
+ throw new LockExpiredException("Lock has already expired");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java
index ecc725b..5911120 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java
@@ -17,30 +17,16 @@
package org.apache.nifi.web.revision;
-import java.text.Collator;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
-import java.util.Stack;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
import org.apache.nifi.authorization.user.NiFiUser;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.FlowModification;
import org.apache.nifi.web.InvalidRevisionException;
import org.apache.nifi.web.Revision;
import org.slf4j.Logger;
@@ -58,197 +44,59 @@ import org.slf4j.LoggerFactory;
public class NaiveRevisionManager implements RevisionManager {
private static final Logger logger = LoggerFactory.getLogger(NaiveRevisionManager.class);
- private final long claimExpirationNanos;
- private final ConcurrentMap<String, RevisionLock> revisionLockMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Revision> revisionMap = new ConcurrentHashMap<>();
- public NaiveRevisionManager() {
- this(1, TimeUnit.MINUTES);
- }
-
- public NaiveRevisionManager(final NiFiProperties properties) {
- this(getRequestTimeoutMillis(properties), TimeUnit.MILLISECONDS);
- }
-
- /**
- * Constructs a new NaiveRevisionManager that uses the given amount of time as the expiration time
- * for a Revision Claims
- *
- * @param claimExpiration how long a Revision Claim should last
- * @param timeUnit the TimeUnit of 'claimExpiration'
- */
- public NaiveRevisionManager(final long claimExpiration, final TimeUnit timeUnit) {
- this.claimExpirationNanos = timeUnit.toNanos(claimExpiration);
- }
-
- private static long getRequestTimeoutMillis(final NiFiProperties properties) {
- return FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.REQUEST_REPLICATION_CLAIM_TIMEOUT,
- NiFiProperties.DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT), TimeUnit.MILLISECONDS);
- }
-
- @Override
- public RevisionClaim requestClaim(final Revision revision, final NiFiUser user) throws InvalidRevisionException {
- Objects.requireNonNull(user);
- return requestClaim(Collections.singleton(revision), user);
- }
@Override
public void reset(final Collection<Revision> revisions) {
- final Map<String, RevisionLock> copy;
- synchronized (this) {
- copy = new HashMap<>(revisionLockMap);
- revisionLockMap.clear();
+ synchronized (this) { // avoid allowing two threads to reset versions concurrently
+ revisionMap.clear();
for (final Revision revision : revisions) {
- revisionLockMap.put(revision.getComponentId(), new RevisionLock(new FlowModification(revision, null), claimExpirationNanos));
+ revisionMap.put(revision.getComponentId(), revision);
}
}
-
- for (final RevisionLock lock : copy.values()) {
- lock.clear();
- }
}
@Override
public List<Revision> getAllRevisions() {
- return revisionLockMap.values().stream()
- .map(lock -> lock.getRevision())
- .collect(Collectors.toList());
- }
-
- @Override
- public RevisionClaim requestClaim(final Collection<Revision> revisions, final NiFiUser user) {
- Objects.requireNonNull(user);
- logger.debug("Attempting to claim Revisions {}", revisions);
-
- // Try to obtain a Revision Claim (temporary lock) on all revisions
- final List<Revision> revisionList = new ArrayList<>(revisions);
- revisionList.sort(new RevisionComparator());
-
- ClaimResult failedClaimResult = null;
- final Set<RevisionLock> locksObtained = new HashSet<>();
- for (int i = 0; i < revisionList.size(); i++) {
- final Revision revision = revisionList.get(i);
- final RevisionLock revisionLock = getRevisionLock(revision);
-
- final ClaimResult claimResult = revisionLock.requestClaim(revision, user);
- logger.trace("Obtained Revision Claim for {}", revision);
-
- if (claimResult.isSuccessful()) {
- locksObtained.add(revisionLock);
- } else {
- logger.debug("Failed to obtain Revision Claim for component with ID {} because Current Revision is {} but supplied Revision is {}",
- revision.getComponentId(), claimResult.getLastModification().getRevision(), revision);
-
- failedClaimResult = claimResult;
- break;
- }
- }
-
- // if we got a Revision Claim on each Revision, return a successful result
- if (locksObtained.size() == revisionList.size()) {
- logger.trace("Obtained Revision Claim for all components");
-
- // it's possible that obtaining the locks took a while if we are obtaining
- // many. Renew the timestamp to ensure that the first locks obtained don't
- // expire too quickly.
- final long timestamp = System.nanoTime() + claimExpirationNanos;
- for (final RevisionLock revisionLock : locksObtained) {
- revisionLock.renewExpiration(timestamp);
- }
-
- return new StandardRevisionClaim(revisions);
- }
-
- // We failed to obtain all of the Revision Claims necessary. Since
- // we need this call to atomically obtain all or nothing, we have to now
- // release the locks that we did obtain.
- logger.debug("Failed to obtain all necessary Revisions; releasing claims for {}", locksObtained);
- for (final RevisionLock revisionLock : locksObtained) {
- revisionLock.releaseClaim();
- }
-
- final FlowModification lastMod = failedClaimResult.getLastModification();
- if (lastMod.getRevision().getClientId() == null || lastMod.getRevision().getClientId().trim().isEmpty() || lastMod.getRevision().getVersion() == null) {
- throw new InvalidRevisionException(String.format("Given revision %s does not match current revision %s.",
- failedClaimResult.getProposedRevision(), lastMod.getRevision()));
- } else {
- throw new InvalidRevisionException(String.format("Component %s has been updated by '%s'. Please refresh to synchronize the view.",
- failedClaimResult.getProposedRevision().getComponentId(), lastMod.getLastModifier()));
- }
+ return new ArrayList<>(revisionMap.values());
}
@Override
public Revision getRevision(final String componentId) {
- final RevisionLock revisionLock = getRevisionLock(new Revision(0L, null, componentId));
- return revisionLock.getRevision();
+ return revisionMap.computeIfAbsent(componentId, id -> new Revision(0L, null, componentId));
}
@Override
public <T> T deleteRevision(final RevisionClaim claim, final NiFiUser user, final DeleteRevisionTask<T> task) throws ExpiredRevisionClaimException {
Objects.requireNonNull(user);
logger.debug("Attempting to delete revision using {}", claim);
- int successCount = 0;
final List<Revision> revisionList = new ArrayList<>(claim.getRevisions());
revisionList.sort(new RevisionComparator());
+ // Verify the provided revisions.
String failedId = null;
for (final Revision revision : revisionList) {
- final RevisionLock revisionLock = getRevisionLock(revision);
- final boolean verified = revisionLock.requestWriteLock(revision, user);
-
- if (verified) {
- logger.trace("Verified Revision Claim for {}", revision);
- successCount++;
- } else {
- logger.debug("Failed to verify Revision Claim for {}", revision);
- failedId = revision.getComponentId();
- break;
+ final Revision curRevision = getRevision(revision.getComponentId());
+ if (!curRevision.equals(revision)) {
+ throw new ExpiredRevisionClaimException("Invalid Revision was given for component with ID '" + failedId + "'");
}
}
- if (successCount == revisionList.size()) {
- logger.debug("Successfully verified Revision Claim for all revisions {}", claim);
-
- final T taskValue;
- try {
- taskValue = task.performTask();
- } catch (final Exception e) {
- logger.debug("Failed to perform Claim Deletion task. Will relinquish the Revision Claims for the following revisions: {}", revisionList);
-
- for (final Revision revision : revisionList) {
- final RevisionLock revisionLock = getRevisionLock(revision);
- revisionLock.unlock(revision, revision, user.getIdentity());
- logger.debug("Relinquished lock for {}", revision);
- }
-
- throw e;
- }
-
- for (final Revision revision : revisionList) {
- deleteRevisionLock(revision);
- logger.debug("Deleted Revision {}", revision);
- }
+ // Perform the action provided
+ final T taskResult = task.performTask();
- return taskValue;
- }
-
- // We failed to obtain a thread lock for all revisions. Relinquish
- // any Revision Claims that we have
- for (int i = 0; i < successCount; i++) {
- final Revision revision = revisionList.get(i);
- final RevisionLock revisionLock = getRevisionLock(revision);
- revisionLock.relinquishRevisionClaim(revision, null);
- logger.debug("Relinquished lock for {}", revision);
+ for (final Revision revision : revisionList) {
+ revisionMap.remove(revision.getComponentId());
}
- // Throw an Exception indicating that we failed to obtain the locks
- throw new ExpiredRevisionClaimException("Invalid Revision was given for component with ID '" + failedId + "'");
+ return taskResult;
}
@Override
public <T> RevisionUpdate<T> updateRevision(final RevisionClaim originalClaim, final NiFiUser user, final UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException {
Objects.requireNonNull(user);
- int successCount = 0;
logger.debug("Attempting to update revision using {}", originalClaim);
final List<Revision> revisionList = new ArrayList<>(originalClaim.getRevisions());
@@ -256,523 +104,55 @@ public class NaiveRevisionManager implements RevisionManager {
String failedId = null;
for (final Revision revision : revisionList) {
- final RevisionLock revisionLock = getRevisionLock(revision);
- final boolean verified = revisionLock.requestWriteLock(revision, user);
+ final Revision currentRevision = getRevision(revision.getComponentId());
+ final boolean verified = revision.equals(currentRevision);
- if (verified) {
- logger.trace("Verified Revision Claim for {}", revision);
- successCount++;
- } else {
- logger.debug("Failed to verify Revision Claim for {}", revision);
- failedId = revision.getComponentId();
- break;
+ if (!verified) {
+ // Throw an Exception indicating that we failed to obtain the locks
+ throw new InvalidRevisionException("Invalid Revision was given for component with ID '" + failedId + "'");
}
}
// We successfully verified all revisions.
- if (successCount == revisionList.size()) {
- logger.debug("Successfully verified Revision Claim for all revisions");
-
- RevisionUpdate<T> updatedComponent = null;
- try {
- updatedComponent = task.update();
- } finally {
- // Release the lock that we are holding and update the revision.
- // To do this, we need to map the old revision to the new revision
- // so that we have an efficient way to lookup the pairing, so that
- // we can easily obtain the old revision and the new revision for
- // the same component in order to call #unlock on the RevisionLock
- final Map<Revision, Revision> updatedRevisions = new HashMap<>();
- final Map<String, Revision> revisionsByComponentId = new HashMap<>();
- for (final Revision revision : revisionList) {
- updatedRevisions.put(revision, revision);
- revisionsByComponentId.put(revision.getComponentId(), revision);
- }
-
- if (updatedComponent != null) {
- for (final Revision updatedRevision : updatedComponent.getUpdatedRevisions()) {
- final Revision oldRevision = revisionsByComponentId.get(updatedRevision.getComponentId());
- if (oldRevision != null) {
- updatedRevisions.put(oldRevision, updatedRevision);
- }
- }
- }
-
- for (final Revision revision : revisionList) {
- final Revision updatedRevision = updatedRevisions.get(revision);
- getRevisionLock(revision).unlock(revision, updatedRevision, user.getIdentity());
-
- if (updatedRevision.getVersion() != revision.getVersion()) {
- logger.debug("Unlocked Revision {} and updated associated Version to {}", revision, updatedRevision.getVersion());
- } else {
- logger.debug("Unlocked Revision {} without updating Version", revision);
- }
- }
- }
-
- return updatedComponent;
- }
-
- // We failed to obtain a thread lock for all revisions. Relinquish
- // any Revision Claims that we have
- for (int i = 0; i < successCount; i++) {
- final Revision revision = revisionList.get(i);
- final RevisionLock revisionLock = getRevisionLock(revision);
- revisionLock.cancelWriteLock();
- logger.debug("Relinquished lock for {}", revision);
- }
-
- // Throw an Exception indicating that we failed to obtain the locks
- throw new InvalidRevisionException("Invalid Revision was given for component with ID '" + failedId + "'");
- }
-
- @Override
- public boolean releaseClaim(final RevisionClaim claim, final NiFiUser user) {
- Objects.requireNonNull(user);
- boolean success = true;
-
- final List<Revision> revisions = new ArrayList<>(claim.getRevisions());
- revisions.sort(new RevisionComparator());
-
- for (final Revision revision : revisions) {
- final RevisionLock revisionLock = getRevisionLock(revision);
- success = revisionLock.relinquishRevisionClaim(revision, user) && success;
- }
-
- return success;
- }
-
- @Override
- public boolean cancelClaim(String componentId) {
- logger.debug("Attempting to cancel claim for component {}", componentId);
- final Revision revision = new Revision(0L, null, componentId);
-
- final RevisionLock revisionLock = getRevisionLock(revision);
- if (revisionLock == null) {
- logger.debug("No Revision Lock exists for Component {} - there is no claim to cancel", componentId);
- return false;
- }
-
- return revisionLock.releaseClaimIfCurrentThread(null);
- }
-
- @Override
- public boolean cancelClaim(Revision revision) {
- logger.debug("Attempting to cancel claim for {}", revision);
-
- final RevisionLock revisionLock = getRevisionLock(revision);
- if (revisionLock == null) {
- logger.debug("No Revision Lock exists for {} - there is no claim to cancel", revision);
- return false;
- }
-
- return revisionLock.releaseClaimIfCurrentThread(revision);
- }
-
- @Override
- public boolean cancelClaims(final Set<Revision> revisions) {
- boolean successful = false;
- for (final Revision revision : revisions) {
- successful = cancelClaim(revision);
- }
-
- return successful;
- }
-
- @Override
- public <T> T get(final String componentId, final ReadOnlyRevisionCallback<T> callback) {
- final RevisionLock revisionLock = getRevisionLock(new Revision(0L, null, componentId));
- logger.debug("Attempting to obtain read lock for {}", revisionLock.getRevision());
- if (logger.isTraceEnabled()) {
- logger.trace("Attempting to obtain read lock due to following stack trace", new RuntimeException("Exception for generating stack trace for debugging purposes"));
- }
-
- revisionLock.acquireReadLock(null, revisionLock.getRevision().getClientId());
- logger.debug("Obtained read lock for {}", revisionLock.getRevision());
-
- try {
- return callback.withRevision(revisionLock.getRevision());
- } finally {
- logger.debug("Releasing read lock for {}", revisionLock.getRevision());
- revisionLock.relinquishReadLock();
- }
- }
-
- @Override
- public <T> T get(final Set<String> componentIds, final Supplier<T> callback) {
- final List<String> sortedIds = new ArrayList<>(componentIds);
- sortedIds.sort(Collator.getInstance());
-
- final Stack<RevisionLock> revisionLocks = new Stack<>();
-
- logger.debug("Will attempt to obtain read locks for components {}", componentIds);
- if (logger.isTraceEnabled()) {
- logger.trace("Attempting to obtain read lock due to following stack trace", new RuntimeException("Exception for generating stack trace for debugging purposes"));
- }
-
- for (final String componentId : sortedIds) {
- final RevisionLock revisionLock = getRevisionLock(new Revision(0L, null, componentId));
-
- logger.trace("Attempting to obtain read lock for {}", revisionLock.getRevision());
- revisionLock.acquireReadLock(null, revisionLock.getRevision().getClientId());
- revisionLocks.push(revisionLock);
- logger.trace("Obtained read lock for {}", revisionLock.getRevision());
- }
+ logger.debug("Successfully verified Revision Claim for all revisions");
- logger.debug("Obtained read lock for all necessary components {}; calling call-back", componentIds);
+ RevisionUpdate<T> updatedComponent = null;
try {
- return callback.get();
+ updatedComponent = task.update();
} finally {
- while (!revisionLocks.isEmpty()) {
- final RevisionLock lock = revisionLocks.pop();
- logger.debug("Releasing read lock for {}", lock.getRevision());
- lock.relinquishReadLock();
- }
- }
- }
-
- private synchronized void deleteRevisionLock(final Revision revision) {
- final RevisionLock revisionLock = revisionLockMap.remove(revision.getComponentId());
- if (revisionLock == null) {
- return;
- }
-
- revisionLock.releaseClaim();
- }
-
- private synchronized RevisionLock getRevisionLock(final Revision revision) {
- return revisionLockMap.computeIfAbsent(revision.getComponentId(), id -> new RevisionLock(new FlowModification(revision, null), claimExpirationNanos));
- }
-
-
- private static class RevisionLock {
- private final AtomicReference<FlowModification> lastModReference = new AtomicReference<>();
- private final AtomicReference<LockStamp> lockStamp = new AtomicReference<>();
- private final long lockNanos;
- private final ReadWriteLock threadLock = new ReentrantReadWriteLock();
-
- public RevisionLock(final FlowModification lastMod, final long lockNanos) {
- this.lockNanos = lockNanos;
- lastModReference.set(lastMod);
- }
-
- /**
- * Requests that a Revision Claim be granted for the proposed Revision
- *
- * @param proposedRevision the revision to obtain a Claim for
- *
- * @return <code>true</code> if the Revision is valid and a Claim has been granted, <code>false</code> otherwise
- */
- public ClaimResult requestClaim(final Revision proposedRevision, final NiFiUser user) {
- // acquire the claim, blocking if necessary.
- acquireClaim(user, proposedRevision.getClientId());
-
- threadLock.writeLock().lock();
- try {
- // check if the revision is correct
- final FlowModification lastModification = lastModReference.get();
-
- final Revision currentRevision = lastModification.getRevision();
- if (proposedRevision.equals(currentRevision)) {
- // revision is correct - return true
- return new ClaimResult(true, lastModification, proposedRevision);
- }
-
- // revision is incorrect. Release the Claim and return false
- releaseClaim();
- logger.debug("Cannot obtain Revision Claim {} because the Revision is out-of-date. Current revision is {}", proposedRevision, currentRevision);
- return new ClaimResult(false, lastModification, proposedRevision);
- } finally {
- threadLock.writeLock().unlock();
- }
- }
-
- /**
- * Verifies that the given Revision has a Claim against it already and that the Claim belongs
- * to the same client as the given Revision. If so, upgrades the Revision Claim to a lock that
- * will not be relinquished until the {@link #unlock(Revision)} method is called.
- *
- * @param proposedRevision the current Revision
- * @return <code>true</code> if the Revision Claim was upgraded to a lock, <code>false</code> otherwise
- * @throws ExpiredRevisionClaimException if the Revision Claim for the given Revision has already expired
- */
- public boolean requestWriteLock(final Revision proposedRevision, final NiFiUser user) throws ExpiredRevisionClaimException {
- Objects.requireNonNull(proposedRevision);
- threadLock.writeLock().lock();
-
- boolean releaseLock = true;
- try {
- if (getRevision().equals(proposedRevision)) {
- final LockStamp stamp = lockStamp.get();
-
- if (stamp == null) {
- final IllegalStateException ise = new IllegalStateException("No claim has been obtained for " + proposedRevision + " so cannot lock the component for modification");
- logger.debug("Attempted to obtain write lock for {} but no Claim was obtained; throwing IllegalStateException", proposedRevision, ise);
- throw ise;
- }
-
- final boolean userEqual = stamp.getUser() == null || stamp.getUser().equals(user);
- if (!userEqual) {
- logger.debug("Failed to verify {} because the User was not the same as the Lock Stamp's User (Lock Stamp was {})", proposedRevision, stamp);
- throw new InvalidRevisionException("Cannot obtain write lock for " + proposedRevision + " because it was claimed by " + stamp.getUser());
- }
-
- final boolean clientIdEqual = stamp.getClientId() == null || stamp.getClientId().equals(proposedRevision.getClientId());
- if (!clientIdEqual) {
- logger.debug("Failed to verify {} because the Client ID was not the same as the Lock Stamp's Client ID (Lock Stamp was {})", proposedRevision, stamp);
- throw new InvalidRevisionException("Cannot obtain write lock for " + proposedRevision + " because it was claimed with a different Client ID");
- }
-
- // TODO - Must make sure that we don't have an expired stamp if it is the result of another
- // operation taking a long time. I.e., Client A fires off two requests for Component X. If the
- // first one takes 2 minutes to complete, it should not result in the second request getting
- // rejected. I.e., we want to ensure that if the request is received before the Claim expired,
- // that we do not throw an ExpiredRevisionClaimException. Expiration of the Revision is intended
- // only to avoid the case where a node obtains a Claim and then the node is lost or otherwise does
- // not fulfill the second phase of the two-phase commit.
- // We may need a Queue of updates (queue would need to be bounded, with a request getting
- // rejected if queue is full).
- if (stamp.isExpired()) {
- throw new ExpiredRevisionClaimException("Claim for " + proposedRevision + " has expired");
- }
-
- // Intentionally leave the thread lock in a locked state!
- releaseLock = false;
- return true;
- }
- } finally {
- if (releaseLock) {
- threadLock.writeLock().unlock();
- }
+ // Release the lock that we are holding and update the revision.
+ // To do this, we need to map the old revision to the new revision
+ // so that we have an efficient way to lookup the pairing, so that
+ // we can easily obtain the old revision and the new revision for
+ // the same component in order to call #unlock on the RevisionLock
+ final Map<Revision, Revision> updatedRevisions = new HashMap<>();
+ final Map<String, Revision> revisionsByComponentId = new HashMap<>();
+ for (final Revision revision : revisionList) {
+ updatedRevisions.put(revision, revision);
+ revisionsByComponentId.put(revision.getComponentId(), revision);
}
- return false;
- }
-
- private void acquireClaim(final NiFiUser user, final String clientId) {
- while (true) {
- final LockStamp stamp = lockStamp.get();
-
- if (stamp == null || stamp.isExpired()) {
- final long now = System.nanoTime();
- final boolean lockObtained = lockStamp.compareAndSet(stamp, new LockStamp(user, clientId, now + lockNanos));
- if (lockObtained) {
- return;
+ if (updatedComponent != null) {
+ for (final Revision updatedRevision : updatedComponent.getUpdatedRevisions()) {
+ final Revision oldRevision = revisionsByComponentId.get(updatedRevision.getComponentId());
+ if (oldRevision != null) {
+ updatedRevisions.put(oldRevision, updatedRevision);
}
- } else {
- Thread.yield();
- }
- }
- }
-
- public void acquireReadLock(final NiFiUser user, final String clientId) {
- // Wait until we can claim the lock stamp
- boolean obtained = false;
- while (!obtained) {
- // If the lock stamp is not null, then there is either an active Claim or a
- // write lock held. Wait until it is null and then replace it atomically
- // with a LockStamp that does not expire (expiration time is Long.MAX_VALUE).
- final LockStamp curStamp = lockStamp.get();
- final boolean nullOrExpired = (curStamp == null || curStamp.isExpired());
- obtained = nullOrExpired && lockStamp.compareAndSet(curStamp, new LockStamp(user, clientId, Long.MAX_VALUE));
-
- if (!obtained) {
- // Could not obtain lock. Yield so that we don't sit around doing nothing with the thread.
- Thread.yield();
}
}
- // Now we can obtain the read lock without problem.
- threadLock.readLock().lock();
- }
-
- public void relinquishReadLock() {
- lockStamp.set(null);
- threadLock.readLock().unlock();
- }
-
- private void releaseClaim() {
- lockStamp.set(null);
- }
-
- public void clear() {
- threadLock.writeLock().lock();
- try {
- releaseClaim();
- } finally {
- threadLock.writeLock().unlock();
- }
- }
-
- public boolean releaseClaimIfCurrentThread(final Revision revision) {
- threadLock.writeLock().lock();
- try {
- final LockStamp stamp = lockStamp.get();
- if (stamp == null) {
- logger.debug("Cannot cancel claim for {} because there is no claim held", getRevision());
- return false;
- }
-
- if (revision != null && !getRevision().equals(revision)) {
- throw new InvalidRevisionException("Cannot release claim because the provided Revision is not valid");
- }
-
- if (stamp.isObtainedByCurrentThread()) {
- releaseClaim();
- logger.debug("Successfully canceled claim for {}", getRevision());
- return true;
- }
-
- logger.debug("Cannot cancel claim for {} because it is held by Thread {} and current Thread is {}",
- getRevision(), stamp.obtainingThread, Thread.currentThread().getName());
- return false;
- } finally {
- threadLock.writeLock().unlock();
- }
- }
+ for (final Revision revision : revisionList) {
+ final Revision updatedRevision = updatedRevisions.get(revision);
+ revisionMap.put(updatedRevision.getComponentId(), updatedRevision);
- /**
- * Releases the Revision Claim if and only if the current revision matches the proposed revision
- *
- * @param proposedRevision the proposed revision to check against the current revision
- * @return <code>true</code> if the Revision Claim was relinquished, <code>false</code> otherwise
- */
- public boolean relinquishRevisionClaim(final Revision proposedRevision, final NiFiUser user) {
- threadLock.writeLock().lock();
- try {
- final LockStamp stamp = lockStamp.get();
- final boolean userOk = stamp == null || stamp.getUser().equals(user);
- if (userOk) {
- if (getRevision().equals(proposedRevision)) {
- releaseClaim();
- return true;
- }
+ if (updatedRevision.getVersion() != revision.getVersion()) {
+ logger.debug("Unlocked Revision {} and updated associated Version to {}", revision, updatedRevision.getVersion());
} else {
- throw new InvalidRevisionException("Cannot relinquish claim for " + proposedRevision + " because it was claimed by " + stamp.getUser());
+ logger.debug("Unlocked Revision {} without updating Version", revision);
}
-
- return false;
- } finally {
- threadLock.writeLock().unlock();
- }
- }
-
- /**
- * Releases the lock and any Revision Claim that is held for the given Revision and
- * updates the revision
- *
- * @param proposedRevision the current Revision
- * @param updatedRevision the Revision to update the current revision to
- */
- public void unlock(final Revision proposedRevision, final Revision updatedRevision, final String modifier) {
- final Revision curRevision = getRevision();
- if (curRevision == null) {
- throw new IllegalMonitorStateException("Cannot unlock " + proposedRevision + " because it is not locked");
- }
-
- if (!curRevision.equals(proposedRevision)) {
- // Intentionally leave the thread lock in a locked state!
- throw new IllegalMonitorStateException("Cannot unlock " + proposedRevision + " because the version is not valid");
}
-
- lastModReference.set(new FlowModification(updatedRevision, modifier));
-
- // Set stamp to null to indicate that it is not locked.
- releaseClaim();
-
- // Thread Lock should already be locked if this is called.
- threadLock.writeLock().unlock();
- }
-
- public void cancelWriteLock() {
- releaseClaim();
- threadLock.writeLock().unlock();
- }
-
- /**
- * Updates expiration time to the given timestamp
- *
- * @param timestamp the new expiration timestamp in nanoseconds
- */
- public void renewExpiration(final long timestamp) {
- final LockStamp stamp = lockStamp.get();
-
- final NiFiUser user;
- final String clientId;
- if (stamp == null) {
- user = null;
- clientId = null;
- } else {
- user = stamp.getUser();
- clientId = stamp.getClientId();
- }
-
- lockStamp.set(new LockStamp(user, clientId, timestamp));
}
- public Revision getRevision() {
- final FlowModification lastMod = lastModReference.get();
- return (lastMod == null) ? null : lastMod.getRevision();
- }
- }
-
-
- private static class LockStamp {
- private final NiFiUser user;
- private final String clientId;
- private final long expirationTimestamp;
- private final Thread obtainingThread;
-
- public LockStamp(final NiFiUser user, final String clientId, final long expirationTimestamp) {
- this.user = user;
- this.clientId = clientId;
- this.expirationTimestamp = expirationTimestamp;
- this.obtainingThread = Thread.currentThread();
- }
-
- public NiFiUser getUser() {
- return user;
- }
-
- public String getClientId() {
- return clientId;
- }
-
- public boolean isExpired() {
- return System.nanoTime() > expirationTimestamp;
- }
-
- public boolean isObtainedByCurrentThread() {
- return obtainingThread == Thread.currentThread();
- }
-
- @Override
- public String toString() {
- return "LockStamp[user=" + user + ", clientId=" + clientId + ", expired=" + isExpired() + "]";
- }
- }
-
- private static class ClaimResult {
- private final boolean successful;
- private final FlowModification lastMod;
- private final Revision proposedRevision;
-
- public ClaimResult(final boolean successful, final FlowModification lastMod, final Revision proposedRevision) {
- this.successful = successful;
- this.lastMod = lastMod;
- this.proposedRevision = proposedRevision;
- }
-
- public boolean isSuccessful() {
- return successful;
- }
-
- public FlowModification getLastModification() {
- return lastMod;
- }
-
- public Revision getProposedRevision() {
- return proposedRevision;
- }
+ return updatedComponent;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4c94e34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/concurrent/TestReentrantDistributedLock.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/concurrent/TestReentrantDistributedLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/concurrent/TestReentrantDistributedLock.java
new file mode 100644
index 0000000..8027614
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/concurrent/TestReentrantDistributedLock.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.concurrent;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestReentrantDistributedLock {
+ private ReadWriteLockSync sync;
+
+ @Before
+ public void setup() {
+ sync = new ReadWriteLockSync();
+ }
+
+ @Test(timeout = 5000)
+ public void testMultipleReadLocks() throws LockExpiredException {
+ final ReentrantDistributedLock lock = createReadLock();
+ final String id1 = lock.lock();
+ final String id2 = lock.lock();
+ assertEquals(id1, id2);
+
+ assertEquals(2, lock.getClaimCount());
+ lock.unlock(id1);
+ assertEquals(1, lock.getClaimCount());
+ lock.unlock(id2);
+ assertEquals(0, lock.getClaimCount());
+ }
+
+ @Test(timeout = 10000)
+ public void testMultipleWriteLocksBlock() throws LockExpiredException {
+ final ReentrantDistributedLock lock = createWriteLock();
+ final String id1 = lock.lock();
+ assertNotNull(id1);
+
+ final long startTime = System.nanoTime();
+ final String id2 = lock.tryLock(500, TimeUnit.MILLISECONDS);
+ assertNull(id2);
+
+ // We don't know exactly how long it will take to timeout because the time periods
+ // won't be exact, but it should take more than 350 milliseconds.
+ final long nanos = System.nanoTime() - startTime;
+ assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(350L));
+
+ lock.unlock(id1);
+ final String id3 = lock.tryLock(500, TimeUnit.MILLISECONDS);
+ assertNotNull(id3);
+ assertNotSame(id1, id3);
+ lock.unlock(id3);
+ }
+
+ @Test(timeout = 10000)
+ public void testReadLockBlocksWriteLock() throws LockExpiredException {
+ final ReentrantDistributedLock readLock = createReadLock();
+ final ReentrantDistributedLock writeLock = createWriteLock();
+
+ final String id1 = readLock.lock();
+ assertNotNull(id1);
+
+ final long startTime = System.nanoTime();
+ final String id2 = writeLock.tryLock(500, TimeUnit.MILLISECONDS);
+ assertNull(id2);
+
+ // We don't know exactly how long it will take to timeout because the time periods
+ // won't be exact, but it should take more than 350 milliseconds.
+ final long nanos = System.nanoTime() - startTime;
+ assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(350L));
+
+ readLock.unlock(id1);
+
+ final String id3 = writeLock.lock();
+ assertNotNull(id3);
+ assertNotSame(id1, id3);
+
+ writeLock.unlock(id3);
+ }
+
+ @Test(timeout = 10000)
+ public void testWriteLockBlocksReadLock() throws LockExpiredException {
+ final ReentrantDistributedLock readLock = createReadLock();
+ final ReentrantDistributedLock writeLock = createWriteLock();
+
+ final String id1 = writeLock.lock();
+ assertNotNull(id1);
+
+ final long startTime = System.nanoTime();
+ final String id2 = readLock.tryLock(500, TimeUnit.MILLISECONDS);
+ assertNull(id2);
+
+ // We don't know exactly how long it will take to timeout because the time periods
+ // won't be exact, but it should take more than 350 milliseconds.
+ final long nanos = System.nanoTime() - startTime;
+ assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(350L));
+
+ writeLock.unlock(id1);
+
+ final String id3 = readLock.lock();
+ assertNotNull(id3);
+ assertNotSame(id1, id3);
+
+ readLock.unlock(id3);
+ }
+
+ @Test(timeout = 10000)
+ public void testMultipleReadLocksBlockingOnWriteLock() throws InterruptedException, LockExpiredException {
+ final ReentrantDistributedLock readLock = createReadLock();
+ final ReentrantDistributedLock writeLock = createWriteLock();
+
+ final String id1 = writeLock.lock();
+ assertNotNull(id1);
+
+ final ExecutorService executor = Executors.newFixedThreadPool(3);
+ final AtomicReferenceArray<String> array = new AtomicReferenceArray<>(3);
+ for (int i = 0; i < 3; i++) {
+ final int index = i;
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ final String id = readLock.lock();
+ assertNotNull(id);
+ array.set(index, id);
+ }
+ });
+ }
+
+ // wait a bit and then make sure that no values have been set
+ Thread.sleep(250L);
+ for (int i = 0; i < 3; i++) {
+ assertNull(array.get(i));
+ }
+
+ // unlock so that the readers can lock.
+ writeLock.unlock(id1);
+
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.MINUTES);
+
+ final String id = array.get(0);
+ assertNotNull(id);
+ for (int i = 0; i < 3; i++) {
+ assertEquals(id, array.get(i));
+ }
+
+ for (int i = 0; i < 3; i++) {
+ assertEquals(3 - i, readLock.getClaimCount());
+ readLock.unlock(id);
+ }
+
+ assertEquals(0, readLock.getClaimCount());
+ }
+
+ @Test(timeout = 10000)
+ public void testLockExpires() {
+ final ReentrantDistributedLock lock = new ReentrantDistributedLock(LockMode.MUTUALLY_EXCLUSIVE, sync, 25, TimeUnit.MILLISECONDS);
+ final String id1 = lock.lock();
+ assertNotNull(id1);
+
+ final long start = System.nanoTime();
+ final String id2 = lock.lock();
+ final long nanos = System.nanoTime() - start;
+
+ assertNotNull(id2);
+ assertNotSame(id1, id2);
+
+ // The timeout may not entirely elapse but will be close. Give 5 milliseconds buffer
+ assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(20));
+ }
+
+ @Test(timeout = 10000)
+ public void testWithLock() throws LockExpiredException, Exception {
+ final ReentrantDistributedLock lock = createWriteLock();
+ final String id = lock.lock();
+ assertEquals(1, lock.getClaimCount());
+
+ final Object obj = new Object();
+ final Object returned = lock.withLock(id, () -> obj);
+ assertTrue(returned == obj);
+ assertEquals(1, lock.getClaimCount());
+ lock.unlock(id);
+ assertEquals(0, lock.getClaimCount());
+ }
+
+ private ReentrantDistributedLock createReadLock() {
+ return new ReentrantDistributedLock(LockMode.SHARED, sync, 30, TimeUnit.SECONDS);
+ }
+
+ private ReentrantDistributedLock createWriteLock() {
+ return new ReentrantDistributedLock(LockMode.MUTUALLY_EXCLUSIVE, sync, 30, TimeUnit.SECONDS);
+ }
+}