You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sh...@apache.org on 2016/11/01 02:37:53 UTC
[34/50] [abbrv] ignite git commit: IGNITE-642 Implement
IgniteReentrantLock data structure
http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
new file mode 100644
index 0000000..3ab7289
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
@@ -0,0 +1,1538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.InvalidObjectException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.ObjectStreamException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCondition;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLock;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.transactions.TransactionRollbackException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Cache reentrant lock implementation based on AbstractQueuedSynchronizer.
+ */
+public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Deserialization stash. */
+ private static final ThreadLocal<String> stash = new ThreadLocal<>();
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** Reentrant lock name. */
+ private String name;
+
+ /** Removed flag. */
+ private volatile boolean rmvd;
+
+ /** Reentrant lock key. */
+ private GridCacheInternalKey key;
+
+ /** Reentrant lock projection. */
+ private IgniteInternalCache<GridCacheInternalKey, GridCacheLockState> lockView;
+
+ /** Cache context. */
+ private final GridCacheContext ctx;
+
+ /** Initialization guard. */
+ private final AtomicBoolean initGuard = new AtomicBoolean();
+
+ /** Initialization latch. */
+ private final CountDownLatch initLatch = new CountDownLatch(1);
+
+ /** Lock that provides non-overlapping processing of updates. */
+ private Lock updateLock = new ReentrantLock();
+
+ /** Internal synchronization object. */
+ private Sync sync;
+
+ /** Flag indicating that every operation on this lock should be interrupted. */
+ private volatile boolean interruptAll;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridCacheLockImpl() {
+ // This instance should never be used directly.
+ ctx = null;
+ }
+
+ /**
+ * Synchronization implementation for reentrant lock using AbstractQueuedSynchronizer.
+ */
+ @SuppressWarnings({"CallToThreadYield", "CallToSignalInsteadOfSignalAll"})
+ private class Sync extends AbstractQueuedSynchronizer {
+ /** */
+ private static final long serialVersionUID = 1192457210091910933L;
+
+ /** */
+ private static final long LOCK_FREE = 0;
+
+ /** Map containing condition objects. */
+ private Map<String, ConditionObject> conditionMap;
+
+ /** List of condition signal calls on this node. */
+ private Map<String, Integer> outgoingSignals;
+
+ /** Last condition waited on. */
+ @Nullable
+ private volatile String lastCondition;
+
+ /** True if any node owning the lock had failed. */
+ private volatile boolean isBroken;
+
+ /** UUID of the node that currently owns the lock. */
+ private volatile UUID currentOwnerNode;
+
+ /** ID of the thread that currently owns the lock. */
+ private volatile long currentOwnerThreadId;
+
+ /** UUID of this node. */
+ private final UUID thisNode;
+
+ /** FailoverSafe flag. */
+ private final boolean failoverSafe;
+
+ /** Fairness flag. */
+ private final boolean fair;
+
+ /** Threads that are waiting on this lock. */
+ private Set<Long> waitingThreads;
+
+ /**
+ * @param state State.
+ */
+ protected Sync(GridCacheLockState state) {
+ setState(state.get());
+
+ thisNode = ctx.localNodeId();
+
+ currentOwnerNode = state.getId();
+
+ currentOwnerThreadId = state.getThreadId();
+
+ conditionMap = new HashMap<>();
+
+ outgoingSignals = new HashMap<>();
+
+ failoverSafe = state.isFailoverSafe();
+
+ fair = state.isFair();
+
+ waitingThreads = new ConcurrentSkipListSet<>();
+ }
+
+ /**
+ *
+ */
+ protected void addOutgoingSignal(String condition) {
+ int cnt = 0;
+
+ if (outgoingSignals.containsKey(condition)) {
+ cnt = outgoingSignals.get(condition);
+
+ // SignalAll has already been called.
+ if (cnt == 0)
+ return;
+ }
+
+ outgoingSignals.put(condition, cnt + 1);
+ }
+
+ protected void addOutgoingSignalAll(String condition) {
+ outgoingSignals.put(condition, 0);
+ }
+
+ /**
+ * Process any condition await calls on this node.
+ */
+ private String processAwait() {
+ if (lastCondition == null)
+ return null;
+
+ String ret = lastCondition;
+
+ lastCondition = null;
+
+ return ret;
+ }
+
+ /** */
+ private Map<String, Integer> processSignal() {
+ Map<String,Integer> ret = new HashMap<>(outgoingSignals);
+
+ outgoingSignals.clear();
+
+ return ret;
+ }
+
+ /** Interrupt every thread on this node waiting on this lock. */
+ private synchronized void interruptAll() {
+ // First release all threads waiting on associated condition queues.
+ if (!conditionMap.isEmpty()) {
+ // Temporarily obtain ownership of the lock,
+ // in order to signal all conditions.
+ UUID tempUUID = getOwnerNode();
+
+ long tempThreadID = currentOwnerThreadId;
+
+ setCurrentOwnerNode(thisNode);
+
+ currentOwnerThreadId = Thread.currentThread().getId();
+
+ for (Condition c : conditionMap.values())
+ c.signalAll();
+
+ // Restore owner node and owner thread.
+ setCurrentOwnerNode(tempUUID);
+
+ currentOwnerThreadId = tempThreadID;
+ }
+
+ // Interrupt any future call to acquire/release on this sync object.
+ interruptAll = true;
+
+ // Interrupt any ongoing transactions.
+ for (Thread t: getQueuedThreads())
+ t.interrupt();
+ }
+
+ /** Check if lock is in correct state (i.e. not broken in non-failoversafe mode),
+ * if not throw {@linkplain IgniteInterruptedException} */
+ private void validate(final boolean throwInterrupt) {
+ // Interrupted flag shouldn't be always cleared
+ // (e.g. lock() method doesn't throw exception and doesn't clear interrupted)
+ // but should be cleared if this method is called after lock breakage or node stop.
+ // If interruptAll is set, exception is thrown anyway.
+ boolean interrupted = Thread.currentThread().isInterrupted();
+
+ // Clear interrupt flag.
+ if (throwInterrupt || interruptAll)
+ Thread.interrupted();
+
+ if (interruptAll)
+ throw new IgniteException("Lock broken (possible reason: node stopped" +
+ " or node owning lock failed while in non-failoversafe mode).");
+
+ // Global queue should be synchronized only if interrupted exception should be thrown.
+ if (fair && (throwInterrupt && interrupted) && !interruptAll) {
+ synchronizeQueue(true, Thread.currentThread());
+
+ throw new IgniteInterruptedException("Lock is interrupted.");
+ }
+ }
+
+ /**
+ * Sets the number of permits currently acquired on this lock. This method should only be used in {@linkplain
+ * GridCacheLockImpl#onUpdate(GridCacheLockState)}.
+ *
+ * @param permits Number of permits acquired at this reentrant lock.
+ */
+ final synchronized void setPermits(int permits) {
+ setState(permits);
+ }
+
+ /**
+ * Gets the number of permissions currently acquired at this lock.
+ *
+ * @return Number of permits acquired at this reentrant lock.
+ */
+ final int getPermits() {
+ return getState();
+ }
+
+ /**
+ * Sets the UUID of the node that currently owns this lock. This method should only be used in {@linkplain
+ * GridCacheLockImpl#onUpdate(GridCacheLockState)}.
+ *
+ * @param ownerNode UUID of the node owning this lock.
+ */
+ final synchronized void setCurrentOwnerNode(UUID ownerNode) {
+ currentOwnerNode = ownerNode;
+ }
+
+ /**
+ * Gets the UUID of the node that currently owns the lock.
+ *
+ * @return UUID of the node that currently owns the lock.
+ */
+ final UUID getOwnerNode() {
+ return currentOwnerNode;
+ }
+
+ /**
+ * Checks if latest call to acquire/release was called on this node.
+ * Should only be called from update method.
+ *
+ * @param newOwnerID ID of the node that is about to acquire this lock (or null).
+ * @return true if acquire/release that triggered last update came from this node.
+ */
+ protected boolean isLockedLocally(UUID newOwnerID) {
+ return thisNode.equals(getOwnerNode()) || thisNode.equals(newOwnerID);
+ }
+
+ protected void setCurrentOwnerThread(long newOwnerThreadId) {
+ currentOwnerThreadId = newOwnerThreadId;
+ }
+
+ /**
+ * Returns true if node that owned the locked failed before call to unlock.
+ *
+ * @return true if any node failed while owning the lock.
+ */
+ protected boolean isBroken() {
+ return isBroken;
+ }
+
+ /** */
+ protected void setBroken(boolean isBroken) {
+ this.isBroken = isBroken;
+ }
+
+ /** */
+ protected synchronized boolean hasPredecessor(LinkedList<UUID> nodes) {
+ if (!fair)
+ return false;
+
+ for (Iterator<UUID> it = nodes.iterator(); it.hasNext(); ) {
+ UUID node = it.next();
+
+ if (ctx.discovery().node(node) == null) {
+ it.remove();
+
+ continue;
+ }
+
+ return !node.equals(thisNode);
+ }
+
+ return false;
+ }
+
+ /**
+ * Performs tryLock.
+ * @param acquires Number of permits to acquire.
+ * @param fair Fairness parameter.
+ * @return {@code True} if succeeded, false otherwise.
+ */
+ final boolean tryAcquire(final int acquires, final boolean fair) {
+ // If broken in non-failoversafe mode, exit immediately.
+ if (interruptAll)
+ return true;
+
+ final Thread current = Thread.currentThread();
+
+ boolean failed = false;
+
+ int c = getState();
+
+ // Wait for lock to reach stable state.
+ while (c != 0) {
+ UUID currentOwner = currentOwnerNode;
+
+ if (currentOwner != null) {
+ failed = ctx.discovery().node(currentOwner) == null;
+
+ break;
+ }
+
+ c = getState();
+ }
+
+ // Check if lock is released or current owner failed.
+ if (c == 0 || failed) {
+ if (compareAndSetGlobalState(0, acquires, current, fair)) {
+
+ // Not used for synchronization (we use ThreadID), but updated anyway.
+ setExclusiveOwnerThread(current);
+
+ while (!isHeldExclusively() && !interruptAll)
+ Thread.yield();
+
+ return true;
+ }
+ }
+ else if (isHeldExclusively()) {
+ int nextc = c + acquires;
+
+ if (nextc < 0) // overflow
+ throw new Error("Maximum lock count exceeded.");
+
+ setState(nextc);
+
+ return true;
+ }
+
+ if (fair && !isQueued(current))
+ synchronizeQueue(false, current);
+
+ return false;
+ }
+
+ /**
+ * Performs lock.
+ */
+ final void lock() {
+ acquire(1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected final boolean tryAcquire(int acquires) {
+ return tryAcquire(acquires, fair);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected final boolean tryRelease(int releases) {
+ // This method is called with release==0 only when trying to wake through update,
+ // to check if some other node released the lock.
+ if (releases == 0)
+ return true;
+
+ // If broken in non-failoversafe mode, exit immediately.
+ if (interruptAll)
+ return true;
+
+ int c = getState() - releases;
+
+ if (!isHeldExclusively()) {
+ log.error("Lock.unlock() is called in illegal state [callerNodeId=" + thisNode + ", ownerNodeId="
+ + currentOwnerNode + ", callerThreadId=" + Thread.currentThread().getId() + ", ownerThreadId="
+ + currentOwnerThreadId + ", lockState=" + getState() + "]");
+
+ throw new IllegalMonitorStateException();
+ }
+
+ boolean free = false;
+
+ if (c == 0) {
+ free = true;
+
+ setGlobalState(0, processAwait(), processSignal());
+
+ while (isHeldExclusively() && !interruptAll)
+ Thread.yield();
+ }
+ else
+ setState(c);
+
+ return free;
+ }
+
+
+ /** {@inheritDoc} */
+ @Override protected final boolean isHeldExclusively() {
+ // While we must in general read state before owner,
+ // we don't need to do so to check if current thread is owner
+
+ return currentOwnerThreadId == Thread.currentThread().getId() && thisNode.equals(currentOwnerNode);
+ }
+
+ /**
+ * @param name Condition name.
+ * @return Condition object.
+ */
+ final synchronized IgniteCondition newCondition(String name) {
+ if (conditionMap.containsKey(name))
+ return new IgniteConditionObject(name, conditionMap.get(name));
+
+ ConditionObject cond = new ConditionObject();
+
+ conditionMap.put(name, cond);
+
+ return new IgniteConditionObject(name, cond);
+ }
+
+ // Methods relayed from outer class
+
+ final int getHoldCount() {
+ return isHeldExclusively() ? getState() : 0;
+ }
+
+ final boolean isLocked() throws IgniteCheckedException {
+ return getState() != 0 || lockView.get(key).get() != 0;
+ }
+
+ /**
+ * This method is used for synchronizing the reentrant lock state across all nodes.
+ */
+ protected boolean compareAndSetGlobalState(final int expVal, final int newVal,
+ final Thread newThread, final boolean bargingProhibited) {
+ try {
+ return CU.outTx(
+ retryTopologySafe(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+
+ GridCacheLockState val = lockView.get(key);
+
+ if (val == null)
+ throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name);
+
+ final long newThreadID = newThread.getId();
+
+ LinkedList<UUID> nodes = val.getNodes();
+
+ // Barging is prohibited in fair mode unless tryLock() is called.
+ if (!(bargingProhibited && hasPredecessor(nodes))) {
+ if (val.get() == expVal || ctx.discovery().node(val.getId()) == null) {
+ val.set(newVal);
+
+ val.setId(thisNode);
+
+ val.setThreadId(newThreadID);
+
+ val.setSignals(null);
+
+ // This node is already in queue, except in cases where this is the only node
+ // or this is a call to tryLock(), in which case barging is ok.
+ // Queue is only updated if this is fair lock.
+ if (val.isFair() && (nodes.isEmpty() || !bargingProhibited))
+ nodes.addFirst(thisNode);
+
+ val.setNodes(nodes);
+
+ val.setChanged(true);
+
+ lockView.put(key, val);
+
+ tx.commit();
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+ catch (Exception e) {
+ if (interruptAll) {
+ log.info("Node is stopped (or lock is broken in non-failover safe mode)," +
+ " aborting transaction.");
+
+ // Return immediately, exception will be thrown later.
+ return true;
+ }
+ else {
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("Thread is interrupted while attempting to acquire lock.");
+
+ // Delegate the decision to throw InterruptedException to the AQS.
+ sync.release(0);
+
+ return false;
+ }
+
+ U.error(log, "Failed to compare and set: " + this, e);
+ }
+
+ throw e;
+ }
+ }
+ }),
+ ctx
+ );
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /**
+ * This method is used for synchronizing the number of acquire attempts on this lock across all nodes.
+ *
+ * @param cancelled true if acquire attempt is cancelled, false if acquire attempt should be registered.
+ */
+ protected boolean synchronizeQueue(final boolean cancelled, final Thread thread) {
+ final AtomicBoolean interrupted = new AtomicBoolean(false);
+
+ try {
+ return CU.outTx(
+ retryTopologySafe(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheLockState val = lockView.get(key);
+
+ if (val == null)
+ throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name);
+
+ LinkedList<UUID> nodes = val.getNodes();
+
+ if (!cancelled) {
+ nodes.add(thisNode);
+
+ val.setChanged(false);
+
+ lockView.put(key, val);
+
+ tx.commit();
+
+ // Keep track of all threads that are queued in global queue.
+ // We deliberately don't use #sync.isQueued(), because AQS
+ // cancel threads immediately after throwing interrupted exception.
+ sync.waitingThreads.add(thread.getId());
+
+ return true;
+ }
+ else {
+ if (sync.waitingThreads.contains(thread.getId())) {
+ // Update other nodes if this is the first node in queue.
+ val.setChanged(nodes.lastIndexOf(thisNode) == 0);
+
+ nodes.removeLastOccurrence(thisNode);
+
+ lockView.put(key, val);
+
+ tx.commit();
+
+ sync.waitingThreads.remove(thread.getId());
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+ catch (Exception e) {
+ if (interruptAll) {
+ log.info("Node is stopped (or lock is broken in non-failover safe mode)," +
+ " aborting transaction.");
+
+ // Abort this attempt to synchronize queue and start another one,
+ // that will return immediately.
+ sync.release(0);
+
+ return false;
+ }
+ else {
+ // If thread got interrupted, abort this attempt to synchronize queue,
+ // clear interrupt flag and try again, and let the AQS decide
+ // whether to throw an exception or ignore it.
+ if (Thread.interrupted() || X.hasCause(e, InterruptedException.class)) {
+ interrupted.set(true);
+
+ throw new TransactionRollbackException("Thread got interrupted " +
+ "while synchronizing the global queue, retrying. ");
+ }
+
+ U.error(log, "Failed to synchronize global lock queue: " + this, e);
+ }
+
+ throw e;
+ }
+ }
+ }),
+ ctx
+ );
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ // Restore interrupt flag and let AQS decide what to do with it.
+ if (interrupted.get())
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Sets the global state across all nodes after releasing the reentrant lock.
+ *
+ * @param newVal New state.
+ * @param lastCondition Id of the condition await is called.
+ * @param outgoingSignals Map containing signal calls on this node since the last acquisition of the lock.
+ */
+ protected boolean setGlobalState(final int newVal, @Nullable final String lastCondition, final Map<String, Integer> outgoingSignals) {
+ try {
+ return CU.outTx(
+ retryTopologySafe(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheLockState val = lockView.get(key);
+
+ if (val == null)
+ throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name);
+
+ val.set(newVal);
+
+ if (newVal == 0) {
+ val.setId(null);
+
+ val.setThreadId(LOCK_FREE);
+ }
+
+ val.setChanged(true);
+
+ // If this lock is fair, remove this node from queue.
+ if (val.isFair() && newVal == 0) {
+ UUID removedNode = val.getNodes().removeFirst();
+
+ assert(thisNode.equals(removedNode));
+ }
+
+ // Get global condition queue.
+ Map<String, LinkedList<UUID>> condMap = val.getConditionMap();
+
+ // Create map containing signals from this node.
+ Map<UUID, LinkedList<String>> signalMap = new HashMap<>();
+
+ // Put any signal calls on this node to global state.
+ if (!outgoingSignals.isEmpty()) {
+ for (String condition : outgoingSignals.keySet()) {
+ int cnt = outgoingSignals.get(condition);
+
+ // Get queue for this condition.
+ List<UUID> list = condMap.get(condition);
+
+ if (list != null && !list.isEmpty()) {
+ // Check if signalAll was called.
+ if (cnt == 0) {
+ cnt = list.size();
+ }
+
+ // Remove from global condition queue.
+ for (int i = 0; i < cnt; i++) {
+ if (list.isEmpty())
+ break;
+
+ UUID uuid = list.remove(0);
+
+ // Skip if node to be released is not alive anymore.
+ if (ctx.discovery().node(uuid) == null) {
+ cnt++;
+
+ continue;
+ }
+
+ LinkedList<String> queue = signalMap.get(uuid);
+
+ if (queue == null) {
+ queue = new LinkedList<>();
+
+ signalMap.put(uuid, queue);
+ }
+
+ queue.add(condition);
+ }
+ }
+ }
+ }
+
+ val.setSignals(signalMap);
+
+ // Check if this release is called after condition.await() call;
+ // If true, add this node to the global waiting queue.
+ if (lastCondition != null) {
+ LinkedList<UUID> queue;
+
+ //noinspection IfMayBeConditional
+ if (!condMap.containsKey(lastCondition))
+ // New condition object.
+ queue = new LinkedList<>();
+ else
+ // Existing condition object.
+ queue = condMap.get(lastCondition);
+
+ queue.add(thisNode);
+
+ condMap.put(lastCondition, queue);
+ }
+
+ val.setConditionMap(condMap);
+
+ lockView.put(key, val);
+
+ tx.commit();
+
+ return true;
+ }
+ catch (Exception e) {
+ if (interruptAll) {
+ log.info("Node is stopped (or lock is broken in non-failover safe mode)," +
+ " aborting transaction.");
+
+ return true;
+ }
+ else
+ U.error(log, "Failed to release: " + this, e);
+
+ throw e;
+ }
+ }
+ }),
+ ctx
+ );
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ protected synchronized boolean checkIncomingSignals(GridCacheLockState state) {
+ if (state.getSignals() == null)
+ return false;
+
+ LinkedList<String> signals = state.getSignals().get(thisNode);
+
+ if (signals == null || signals.isEmpty())
+ return false;
+
+ UUID tempUUID = getOwnerNode();
+
+ Thread tempThread = getExclusiveOwnerThread();
+
+ long tempThreadID = currentOwnerThreadId;
+
+ // Temporarily allow current thread to signal condition object.
+ // This is safe to do because:
+ // 1. if release was called on this node,
+ // it was called from currently active thread;
+ // 2. if release came from a thread on any other node,
+ // all threads on this node are already blocked.
+ setCurrentOwnerNode(thisNode);
+
+ setExclusiveOwnerThread(Thread.currentThread());
+
+ currentOwnerThreadId = Thread.currentThread().getId();
+
+ for (String signal: signals)
+ conditionMap.get(signal).signal();
+
+ // Restore owner node and owner thread.
+ setCurrentOwnerNode(tempUUID);
+
+ setExclusiveOwnerThread(tempThread);
+
+ currentOwnerThreadId = tempThreadID;
+
+ return true;
+ }
+
+ /**
+ * Condition implementation for {@linkplain IgniteLock}.
+ *
+ **/
+ private class IgniteConditionObject implements IgniteCondition {
+ /** */
+ private final String name;
+
+ /** */
+ private final AbstractQueuedSynchronizer.ConditionObject object;
+
+ /**
+ * @param name Condition name.
+ * @param object Condition object.
+ */
+ protected IgniteConditionObject(String name, ConditionObject object) {
+ this.name = name;
+
+ this.object = object;
+ }
+
+ /**
+ * Name of this condition.
+ *
+ * @return name Name of this condition object.
+ */
+ @Override public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void await() throws IgniteInterruptedException {
+ ctx.kernalContext().gateway().readLock();
+
+ try {
+ if (!isHeldExclusively())
+ throw new IllegalMonitorStateException();
+
+ lastCondition = name;
+
+ object.await();
+
+ sync.validate(true);
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void awaitUninterruptibly() {
+ ctx.kernalContext().gateway().readLock();
+
+ try {
+ if (!isHeldExclusively())
+ throw new IllegalMonitorStateException();
+
+ lastCondition = name;
+
+ object.awaitUninterruptibly();
+
+ sync.validate(false);
+ }
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long awaitNanos(long nanosTimeout) throws IgniteInterruptedException {
+ ctx.kernalContext().gateway().readLock();
+
+ try {
+ if (!isHeldExclusively())
+ throw new IllegalMonitorStateException();
+
+ lastCondition = name;
+
+ long result = object.awaitNanos(nanosTimeout);
+
+ sync.validate(true);
+
+ return result;
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean await(long time, TimeUnit unit) throws IgniteInterruptedException {
+ ctx.kernalContext().gateway().readLock();
+
+ try {
+ if (!isHeldExclusively())
+ throw new IllegalMonitorStateException();
+
+ lastCondition = name;
+
+ boolean result = object.await(time, unit);
+
+ sync.validate(true);
+
+ return result;
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean awaitUntil(Date deadline) throws IgniteInterruptedException {
+ ctx.kernalContext().gateway().readLock();
+
+ try {
+ if (!isHeldExclusively())
+ throw new IllegalMonitorStateException();
+
+ lastCondition = name;
+
+ boolean result = object.awaitUntil(deadline);
+
+ sync.validate(true);
+
+ return result;
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void signal() {
+ ctx.kernalContext().gateway().readLock();
+
+ try {
+ if (!isHeldExclusively())
+ throw new IllegalMonitorStateException();
+
+ validate(false);
+
+ addOutgoingSignal(name);
+ }
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void signalAll() {
+ ctx.kernalContext().gateway().readLock();
+
+ try {
+ if (!isHeldExclusively())
+ throw new IllegalMonitorStateException();
+
+ sync.validate(false);
+
+ addOutgoingSignalAll(name);
+ }
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
+ }
+ }
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param name Reentrant lock name.
+ * @param key Reentrant lock key.
+ * @param lockView Reentrant lock projection.
+ * @param ctx Cache context.
+ */
+ @SuppressWarnings("unchecked")
+ public GridCacheLockImpl(String name,
+ GridCacheInternalKey key,
+ IgniteInternalCache<GridCacheInternalKey, GridCacheLockState> lockView,
+ GridCacheContext ctx) {
+ assert name != null;
+ assert key != null;
+ assert ctx != null;
+ assert lockView != null;
+
+ this.name = name;
+ this.key = key;
+ this.lockView = lockView;
+ this.ctx = ctx;
+
+ log = ctx.logger(getClass());
+ }
+
+ /**
+ * @throws IgniteCheckedException If operation failed.
+ */
+ private void initializeReentrantLock() throws IgniteCheckedException {
+ if (initGuard.compareAndSet(false, true)) {
+ try {
+ sync = CU.outTx(
+ retryTopologySafe(new Callable<Sync>() {
+ @Override public Sync call() throws Exception {
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheLockState val = lockView.get(key);
+
+ if (val == null) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to find reentrant lock with given name: " + name);
+
+ return null;
+ }
+
+ tx.rollback();
+
+ return new Sync(val);
+ }
+ }
+ }),
+ ctx
+ );
+
+ if (log.isDebugEnabled())
+ log.debug("Initialized internal sync structure: " + sync);
+ }
+ finally {
+ initLatch.countDown();
+ }
+ }
+ else {
+ U.await(initLatch);
+
+ if (sync == null)
+ throw new IgniteCheckedException("Internal reentrant lock has not been properly initialized.");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdate(GridCacheLockState val) {
+ // Called only on initialization, so it's safe to ignore update.
+ if (sync == null)
+ return;
+
+ updateLock.lock();
+
+ try {
+ // If this update is a result of unsuccessful acquire in fair mode, no local update should be done.
+ if (!val.isChanged())
+ return;
+
+ // Check if update came from this node.
+ boolean local = sync.isLockedLocally(val.getId());
+
+ // Process any incoming signals.
+ boolean incomingSignals = sync.checkIncomingSignals(val);
+
+ // Update permission count.
+ sync.setPermits(val.get());
+
+ // Update owner's node id.
+ sync.setCurrentOwnerNode(val.getId());
+
+ // Update owner's thread id.
+ sync.setCurrentOwnerThread(val.getThreadId());
+
+ // Check if any threads waiting on this node need to be notified.
+ if ((incomingSignals || sync.getPermits() == 0) && !local) {
+ // Try to notify any waiting threads.
+ sync.release(0);
+ }
+
+ } finally{
+ updateLock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onNodeRemoved(UUID nodeId) {
+ updateLock.lock();
+
+ try {
+ if (nodeId.equals(sync.getOwnerNode())) {
+ sync.setBroken(true);
+
+ if (!sync.failoverSafe) {
+ sync.interruptAll();
+ }
+ }
+
+ // Try to notify any waiting threads.
+ sync.release(0);
+ }
+ finally {
+ updateLock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onStop() {
+ if (sync == null) {
+ interruptAll = true;
+
+ return;
+ }
+
+ sync.setBroken(true);
+
+ sync.interruptAll();
+
+ // Try to notify any waiting threads.
+ sync.release(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void lock() {
+ ctx.kernalContext().gateway().readLock();
+
+ try{
+ initializeReentrantLock();
+
+ sync.lock();
+
+ sync.validate(false);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void lockInterruptibly() throws IgniteInterruptedException {
+ ctx.kernalContext().gateway().readLock();
+
+ try {
+ initializeReentrantLock();
+
+ sync.acquireInterruptibly(1);
+
+ sync.validate(true);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ catch (InterruptedException e) {
+ if (sync.fair)
+ sync.synchronizeQueue(true, Thread.currentThread());
+
+ throw new IgniteInterruptedException(e);
+ }
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean tryLock() {
+ ctx.kernalContext().gateway().readLock();
+
+ try{
+ initializeReentrantLock();
+
+ boolean result = sync.tryAcquire(1, false);
+
+ sync.validate(false);
+
+ return result;
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean tryLock(long timeout, TimeUnit unit) throws IgniteInterruptedException {
+ ctx.kernalContext().gateway().readLock();
+
+ try{
+ initializeReentrantLock();
+
+ boolean result = sync.tryAcquireNanos(1, unit.toNanos(timeout));
+
+ sync.validate(true);
+
+ return result;
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ catch (InterruptedException e) {
+ if (sync.fair)
+ sync.synchronizeQueue(true, Thread.currentThread());
+
+ throw new IgniteInterruptedException(e);
+ }
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unlock() {
+ ctx.kernalContext().gateway().readLock();
+
+ try{
+ initializeReentrantLock();
+
+ // Validate before release.
+ sync.validate(false);
+
+ sync.release(1);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
+ }
+
+ @NotNull @Override public Condition newCondition() {
+ throw new UnsupportedOperationException("IgniteLock does not allow creation of nameless conditions. ");
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCondition getOrCreateCondition(String name) {
+ ctx.kernalContext().gateway().readLock();
+
+ try{
+ initializeReentrantLock();
+
+ IgniteCondition result = sync.newCondition(name);
+
+ sync.validate(false);
+
+ return result;
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getHoldCount() {
+ try{
+ initializeReentrantLock();
+
+ return sync.getHoldCount();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isHeldByCurrentThread() {
+ try{
+ initializeReentrantLock();
+
+ return sync.isHeldExclusively();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isLocked() {
+ try{
+ initializeReentrantLock();
+
+ return sync.isLocked();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasQueuedThreads() {
+ try{
+ initializeReentrantLock();
+
+ return sync.hasQueuedThreads();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasQueuedThread(Thread thread) {
+ try{
+ initializeReentrantLock();
+
+ return sync.isQueued(thread);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasWaiters(IgniteCondition condition) {
+ try{
+ initializeReentrantLock();
+
+ AbstractQueuedSynchronizer.ConditionObject c = sync.conditionMap.get(condition.name());
+
+ if (c == null)
+ throw new IllegalArgumentException();
+
+ return sync.hasWaiters(c);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getWaitQueueLength(IgniteCondition condition) {
+ try{
+ initializeReentrantLock();
+
+ AbstractQueuedSynchronizer.ConditionObject c = sync.conditionMap.get(condition.name());
+
+ if (c == null)
+ throw new IllegalArgumentException();
+
+ return sync.getWaitQueueLength(c);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ @Override public boolean isFailoverSafe() {
+ try{
+ initializeReentrantLock();
+
+ return sync.failoverSafe;
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ @Override public boolean isFair() {
+ try{
+ initializeReentrantLock();
+
+ return sync.fair;
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isBroken() {
+ try{
+ initializeReentrantLock();
+
+ return sync.isBroken();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheInternalKey key() {
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ return rmvd;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onRemoved() {
+ return rmvd = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void needCheckNotRemoved() {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeUTF(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ stash.set(in.readUTF());
+ }
+
+ /**
+ * Reconstructs object on unmarshalling.
+ *
+ * @return Reconstructed object.
+ * @throws ObjectStreamException Thrown in case of unmarshalling error.
+ */
+ private Object readResolve() throws ObjectStreamException {
+ String name = stash.get();
+
+ assert name != null;
+
+ try {
+ IgniteLock lock = IgnitionEx.localIgnite().context().dataStructures().reentrantLock(
+ name,
+ false,
+ false,
+ false);
+
+ if (lock == null)
+ throw new IllegalStateException("Lock was not found on deserialization: " + name);
+
+ return lock;
+ }
+ catch (IgniteCheckedException e) {
+ throw U.withCause(new InvalidObjectException(e.getMessage()), e);
+ }
+ finally {
+ stash.remove();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ if (!rmvd) {
+ try {
+ boolean force = sync != null && (sync.isBroken() && !sync.failoverSafe);
+
+ ctx.kernalContext().dataStructures().removeReentrantLock(name, force);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheLockImpl.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java
new file mode 100644
index 0000000..3feb9bf
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Grid cache reentrant lock state.
+ */
+public final class GridCacheLockState implements GridCacheInternal, Externalizable, Cloneable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Count. */
+ private int cnt;
+
+ /** Owner thread local ID. */
+ private long threadId;
+
+ /** Owner node ID. */
+ private UUID id;
+
+ /** FailoverSafe flag. */
+ private boolean failoverSafe;
+
+ /** Map containing state for each condition object associated with this lock. */
+ @GridToStringInclude
+ private Map<String, LinkedList<UUID>> conditionMap;
+
+ /** Map containing unprocessed signals for condition objects that are associated with this lock. */
+ @GridToStringInclude
+ private Map<UUID, LinkedList<String>> signals;
+
+ /** Flag indicating lock fairness. */
+ private boolean fair;
+
+ /** Queue containing nodes that are waiting to acquire this lock, used to ensure fairness. */
+ @GridToStringInclude
+ private LinkedList<UUID> nodes;
+
+ /**
+ * Flag indicating that global state changed.
+ * Used in fair mode to ensure that only successful acquires and releases trigger update.
+ */
+ private boolean changed;
+
+ /**
+ * Constructor.
+ *
+ * @param cnt Initial count.
+ * @param id UUID of owning node.
+ * @param threadID ID of the current thread.
+ * @param failoverSafe true if created in failoverSafe mode.
+ * @param fair true if created in fair mode.
+ */
+ public GridCacheLockState(int cnt, UUID id, long threadID, boolean failoverSafe, boolean fair) {
+ assert cnt >= 0;
+
+ this.id = id;
+
+ this.threadId = threadID;
+
+ conditionMap = new HashMap();
+
+ signals = null;
+
+ nodes = new LinkedList<UUID>();
+
+ this.fair = fair;
+
+ this.failoverSafe = failoverSafe;
+ }
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ public GridCacheLockState() {
+ // No-op.
+ }
+
+ /**
+ * @param cnt New count.
+ */
+ public void set(int cnt) {
+ this.cnt = cnt;
+ }
+
+ /**
+ * @return Current count.
+ */
+ public int get() {
+ return cnt;
+ }
+
+ /**
+ * @return Current owner thread ID.
+ */
+ public long getThreadId() {
+ return threadId;
+ }
+
+ /**
+ * @param threadId New thread owner ID.
+ */
+ public void setThreadId(long threadId) {
+ this.threadId = threadId;
+ }
+
+ /**
+ * @return Current owner node ID.
+ */
+ public UUID getId() {
+ return id;
+ }
+
+ /**
+ * @return New owner node ID.
+ */
+ public void setId(UUID id) {
+ this.id = id;
+ }
+
+ /**
+ * @return Failover safe flag.
+ */
+ public boolean isFailoverSafe() {
+ return failoverSafe;
+ }
+
+ /**
+ * @return Condition count.
+ */
+ public int condtionCount(){
+ return conditionMap.size();
+ }
+
+ /**
+ * @return Condition map.
+ */
+ public Map<String, LinkedList<UUID>> getConditionMap() {
+ return conditionMap;
+ }
+
+ /**
+ * @param conditionMap Condition map.
+ */
+ public void setConditionMap(Map<String, LinkedList<UUID>> conditionMap) {
+ this.conditionMap = conditionMap;
+ }
+
+ /**
+ * @return Signals.
+ */
+ public Map<UUID, LinkedList<String>> getSignals() {
+ return signals;
+ }
+
+ /**
+ * @param signals Signals.
+ */
+ public void setSignals(Map<UUID, LinkedList<String>> signals) {
+ this.signals = signals;
+ }
+
+ /**
+ * @return Nodes.
+ */
+ public LinkedList<UUID> getNodes() {
+ return nodes;
+ }
+
+ /**
+ * @param nodes Nodes.
+ */
+ public void setNodes(LinkedList<UUID> nodes) {
+ this.nodes = nodes;
+ }
+
+ /**
+ * @return Fair flag.
+ */
+ public boolean isFair() {
+ return fair;
+ }
+
+ /**
+ * @return Changed flag.
+ */
+ public boolean isChanged() {
+ return changed;
+ }
+
+ /**
+ * @param changed Changed flag.
+ */
+ public void setChanged(boolean changed) {
+ this.changed = changed;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object clone() throws CloneNotSupportedException {
+ return super.clone();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(cnt);
+ out.writeLong(threadId);
+ U.writeUuid(out, id);
+
+ out.writeBoolean(failoverSafe);
+
+ out.writeBoolean(fair);
+
+ out.writeBoolean(changed);
+
+ out.writeBoolean(conditionMap != null);
+
+ if (conditionMap != null) {
+ out.writeInt(conditionMap.size());
+
+ for (Map.Entry<String, LinkedList<UUID>> e : conditionMap.entrySet()) {
+ U.writeString(out, e.getKey());
+
+ out.writeInt(e.getValue().size());
+
+ for (UUID uuid:e.getValue())
+ U.writeUuid(out, uuid);
+ }
+ }
+
+ out.writeBoolean(signals != null);
+
+ if (signals != null) {
+ out.writeInt(signals.size());
+
+ for (Map.Entry<UUID, LinkedList<String>> e : signals.entrySet()) {
+ U.writeUuid(out, e.getKey());
+
+ out.writeInt(e.getValue().size());
+
+ for (String condition:e.getValue())
+ U.writeString(out, condition);
+ }
+ }
+
+ out.writeBoolean(nodes != null);
+
+ if (nodes != null) {
+ out.writeInt(nodes.size());
+
+ for (UUID uuid: nodes)
+ U.writeUuid(out, uuid);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException {
+ cnt = in.readInt();
+ threadId = in.readLong();
+ id = U.readUuid(in);
+
+ failoverSafe = in.readBoolean();
+
+ fair = in.readBoolean();
+
+ changed = in.readBoolean();
+
+ if (in.readBoolean()) {
+ int size = in.readInt();
+
+ conditionMap = U.newLinkedHashMap(size);
+
+ for (int i = 0; i < size; i++) {
+ String key = U.readString(in);
+
+ int size1 = in.readInt();
+
+ LinkedList<UUID> list = new LinkedList();
+
+ for (int j = 0; j < size1; j++)
+ list.add(U.readUuid(in));
+
+ conditionMap.put(key, list);
+ }
+ }
+
+ if (in.readBoolean()) {
+ assert (conditionMap != null);
+
+ int size = in.readInt();
+
+ signals = U.newLinkedHashMap(size);
+
+ for (int i = 0; i < size; i++) {
+ UUID node = U.readUuid(in);
+
+ int size1 = in.readInt();
+
+ LinkedList<String> list = new LinkedList();
+
+ for (int j = 0; j < size1; j++)
+ list.add(U.readString(in));
+
+ signals.put(node, list);
+ }
+ }
+ else
+ signals = null;
+
+ if (in.readBoolean()) {
+ int size = in.readInt();
+
+ nodes = new LinkedList();
+
+ for (int i = 0; i < size; i++)
+ nodes.add(U.readUuid(in));
+ }
+ else
+ nodes = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheLockState.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index c365f9d..8f196be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -19,8 +19,10 @@ package org.apache.ignite.internal.processors.datastructures;
import java.io.Externalizable;
import java.io.IOException;
+import java.io.InvalidObjectException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.io.ObjectStreamException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -32,7 +34,9 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -885,6 +889,35 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
t.set2(in.readUTF());
}
+ /**
+ * Reconstructs object on unmarshalling.
+ *
+ * @return Reconstructed object.
+ * @throws ObjectStreamException Thrown in case of unmarshalling error.
+ */
+ private Object readResolve() throws ObjectStreamException {
+ try {
+ IgniteBiTuple<GridKernalContext, String> t = stash.get();
+
+ IgniteSemaphore sem = IgnitionEx.localIgnite().context().dataStructures().semaphore(
+ t.get2(),
+ 0,
+ false,
+ false);
+
+ if (sem == null)
+ throw new IllegalStateException("Semaphore was not found on deserialization: " + t.get2());
+
+ return sem;
+ }
+ catch (IgniteCheckedException e) {
+ throw U.withCause(new InvalidObjectException(e.getMessage()), e);
+ }
+ finally {
+ stash.remove();
+ }
+ }
+
/** {@inheritDoc} */
@Override public void close() {
if (!rmvd) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 034c314..9ae74a7 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1037,6 +1037,8 @@ org.apache.ignite.internal.processors.datastructures.GridCacheQueueItemKey
org.apache.ignite.internal.processors.datastructures.GridCacheQueueProxy
org.apache.ignite.internal.processors.datastructures.GridCacheSemaphoreImpl
org.apache.ignite.internal.processors.datastructures.GridCacheSemaphoreState
+org.apache.ignite.internal.processors.datastructures.GridCacheLockImpl
+org.apache.ignite.internal.processors.datastructures.GridCacheLockState
org.apache.ignite.internal.processors.datastructures.GridCacheSetHeader
org.apache.ignite.internal.processors.datastructures.GridCacheSetHeaderKey
org.apache.ignite.internal.processors.datastructures.GridCacheSetImpl$CollocatedItemKey
http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index 4653ce9..4622fff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteAtomicStamped;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteLock;
import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.testframework.GridTestUtils;
@@ -774,4 +775,61 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertFalse(srvSemaphore.tryAcquire());
assertFalse(srvSemaphore.tryAcquire());
}
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReentrantLockReconnect() throws Exception {
+ testReentrantLockReconnect(false);
+
+ testReentrantLockReconnect(true);
+ }
+
+ private void testReentrantLockReconnect(final boolean fair) throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ IgniteLock clientLock = client.reentrantLock("lock1", true, fair, true);
+
+ assertEquals(false, clientLock.isLocked());
+
+ final IgniteLock srvLock = srv.reentrantLock("lock1", true, fair, true);
+
+ assertEquals(false, srvLock.isLocked());
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srvLock.lock();
+ }
+ });
+
+ assertTrue(srvLock.isLocked());
+ assertTrue(clientLock.isLocked());
+
+ assertEquals(1, srvLock.getHoldCount());
+
+ srvLock.lock();
+
+ assertTrue(srvLock.isLocked());
+ assertTrue(clientLock.isLocked());
+
+ assertEquals(2, srvLock.getHoldCount());
+
+ srvLock.unlock();
+
+ assertTrue(srvLock.isLocked());
+ assertTrue(clientLock.isLocked());
+
+ assertEquals(1, srvLock.getHoldCount());
+
+ srvLock.unlock();
+
+ assertFalse(srvLock.isLocked());
+ assertFalse(clientLock.isLocked());
+
+ assertEquals(0, srvLock.getHoldCount());
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 74023e9..6ba65ab 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -33,9 +33,11 @@ import org.apache.ignite.IgniteAtomicLong;
import org.apache.ignite.IgniteAtomicReference;
import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteAtomicStamped;
+import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteCountDownLatch;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLock;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.cache.CacheMode;
@@ -75,10 +77,10 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
private static final String TRANSACTIONAL_CACHE_NAME = "tx_cache";
/** */
- private static final int TOP_CHANGE_CNT = 5;
+ private static final int TOP_CHANGE_CNT = 2;
/** */
- private static final int TOP_CHANGE_THREAD_CNT = 3;
+ private static final int TOP_CHANGE_THREAD_CNT = 2;
/** */
private boolean client;
@@ -586,6 +588,208 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
/**
* @throws Exception If failed.
*/
+ public void testReentrantLockFailsWhenServersLeft() throws Exception {
+ testReentrantLockFailsWhenServersLeft(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFairReentrantLockFailsWhenServersLeft() throws Exception {
+ testReentrantLockFailsWhenServersLeft(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReentrantLockFailsWhenServersLeft(final boolean fair) throws Exception {
+ client = true;
+
+ Ignite client = startGrid(gridCount());
+
+ Ignite server = grid(0);
+
+ // Initialize lock.
+ IgniteLock srvLock = server.reentrantLock("lock", true, fair, true);
+
+ IgniteSemaphore semaphore = server.semaphore("sync", 0, true, true);
+
+ IgniteCompute compute = client.compute().withAsync();
+
+ compute.apply(new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ final IgniteLock l = ignite.reentrantLock("lock", true, fair, true);
+
+ l.lock();
+
+ assertTrue(l.isHeldByCurrentThread());
+
+ l.unlock();
+
+ assertFalse(l.isHeldByCurrentThread());
+
+ // Signal the server to go down.
+ ignite.semaphore("sync", 0, true, true).release();
+
+ boolean isExceptionThrown = false;
+
+ try {
+ // Wait for the server to go down.
+ Thread.sleep(1000);
+
+ l.lock();
+
+ fail("Exception must be thrown.");
+ }
+ catch (InterruptedException e) {
+ fail("Interrupted exception not expected here.");
+ }
+ catch (IgniteException e) {
+ isExceptionThrown = true;
+ }
+ finally {
+ assertTrue(isExceptionThrown);
+
+ assertFalse(l.isHeldByCurrentThread());
+ }
+ return null;
+ }
+ }, client);
+
+ // Wait for the lock on client to be acquired then released.
+ semaphore.acquire();
+
+ for (int i = 0; i < gridCount(); i++)
+ stopGrid(i);
+
+ compute.future().get();
+
+ client.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReentrantLockConstantTopologyChangeFailoverSafe() throws Exception {
+ doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReentrantLockConstantMultipleTopologyChangeFailoverSafe() throws Exception {
+ doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReentrantLockConstantTopologyChangeNonFailoverSafe() throws Exception {
+ doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReentrantLockConstantMultipleTopologyChangeNonFailoverSafe() throws Exception {
+ doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFairReentrantLockConstantTopologyChangeFailoverSafe() throws Exception {
+ doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFairReentrantLockConstantMultipleTopologyChangeFailoverSafe() throws Exception {
+ doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFairReentrantLockConstantTopologyChangeNonFailoverSafe() throws Exception {
+ doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFairReentrantLockConstantMultipleTopologyChangeNonFailoverSafe() throws Exception {
+ doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void doTestReentrantLock(ConstantTopologyChangeWorker topWorker, final boolean failoverSafe, final boolean fair) throws Exception {
+ try (IgniteLock lock = grid(0).reentrantLock(STRUCTURE_NAME, failoverSafe, fair, true)) {
+ IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ final IgniteLock l = ignite.reentrantLock(STRUCTURE_NAME, failoverSafe, fair, false);
+
+ final AtomicBoolean done = new AtomicBoolean(false);
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ try{
+ l.lock();
+ }
+ finally {
+ done.set(true);
+ }
+
+ return null;
+ }
+ });
+
+ // Wait until l.lock() has been called.
+ while(!l.hasQueuedThreads() && !done.get()){
+ // No-op.
+ }
+
+ return null;
+ }
+ });
+
+ while (!fut.isDone()) {
+ while (true) {
+ try {
+ lock.lock();
+ }
+ catch (IgniteException e) {
+ // Exception may happen in non-failoversafe mode.
+ if (failoverSafe)
+ throw e;
+ }
+ finally {
+ // Broken lock cannot be used in non-failoversafe mode.
+ if(!lock.isBroken() || failoverSafe) {
+ assertTrue(lock.isHeldByCurrentThread());
+
+ lock.unlock();
+
+ assertFalse(lock.isHeldByCurrentThread());
+ }
+ break;
+ }
+ }
+ }
+
+ fut.get();
+
+ for (Ignite g : G.allGrids())
+ assertFalse(g.reentrantLock(STRUCTURE_NAME, failoverSafe, fair, false).isHeldByCurrentThread());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testCountDownLatchConstantTopologyChange() throws Exception {
doTestCountDownLatch(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
index 34e7080..5929d42 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
@@ -18,11 +18,13 @@
package org.apache.ignite.internal.processors.cache.datastructures;
import java.util.concurrent.Callable;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteAtomicLong;
import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteLock;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteSet;
@@ -327,6 +329,74 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA
/**
* @throws Exception If failed.
*/
+ public void testReentrantLock() throws Exception {
+ Ignite clientNode = clientIgnite();
+ Ignite srvNode = serverNode();
+
+ testReentrantLock(clientNode, srvNode);
+ testReentrantLock(srvNode, clientNode);
+ }
+
+ /**
+ * @param creator Creator node.
+ * @param other Other node.
+ * @throws Exception If failed.
+ */
+ private void testReentrantLock(Ignite creator, final Ignite other) throws Exception {
+ assertNull(creator.reentrantLock("lock1", true, false, false));
+ assertNull(other.reentrantLock("lock1", true, false, false));
+
+ try (IgniteLock lock = creator.reentrantLock("lock1", true, false, true)) {
+ assertNotNull(lock);
+
+ assertFalse(lock.isLocked());
+
+ final Semaphore semaphore = new Semaphore(0);
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ IgniteLock lock0 = other.reentrantLock("lock1", true, false, false);
+
+ lock0.lock();
+
+ assertTrue(lock0.isLocked());
+
+ semaphore.release();
+
+ U.sleep(1000);
+
+ log.info("Release reentrant lock.");
+
+ lock0.unlock();
+
+ return null;
+ }
+ });
+
+ semaphore.acquire();
+
+ log.info("Try acquire lock.");
+
+ assertTrue(lock.tryLock(5000, TimeUnit.MILLISECONDS));
+
+ log.info("Finished wait.");
+
+ fut.get();
+
+ assertTrue(lock.isLocked());
+
+ lock.unlock();
+
+ assertFalse(lock.isLocked());
+ }
+
+ assertNull(creator.reentrantLock("lock1", true, false, false));
+ assertNull(other.reentrantLock("lock1", true, false, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testQueue() throws Exception {
Ignite clientNode = clientIgnite();
Ignite srvNode = serverNode();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
index e88c97b..a8e0095 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCountDownLatch;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteLock;
import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -241,7 +242,7 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT
private void testUniqueName(final boolean singleGrid) throws Exception {
final String name = IgniteUuid.randomUuid().toString();
- final int DS_TYPES = 8;
+ final int DS_TYPES = 9;
final int THREADS = DS_TYPES * 3;
@@ -322,6 +323,13 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT
res = ignite.semaphore(name, 0, false, true);
break;
+
+ case 8:
+ log.info("Create atomic reentrant lock, grid: " + ignite.name());
+
+ res = ignite.reentrantLock(name, true, true, true);
+
+ break;
default:
fail();
@@ -361,7 +369,8 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT
res instanceof IgniteCountDownLatch ||
res instanceof IgniteQueue ||
res instanceof IgniteSet ||
- res instanceof IgniteSemaphore);
+ res instanceof IgniteSemaphore ||
+ res instanceof IgniteLock);
log.info("Data structure created: " + dataStructure);