You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/11/21 01:29:07 UTC
[02/13] ignite git commit: ignite-638: Implement IgniteSemaphore data
structure
ignite-638: Implement IgniteSemaphore data structure
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8e7e3309
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8e7e3309
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8e7e3309
Branch: refs/heads/master
Commit: 8e7e330904b80f9a13659fdd7cf7f12dd6a36037
Parents: 900788b
Author: Vladisav Jelisavcic <vl...@gmail.com>
Authored: Fri Nov 20 17:39:40 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Fri Nov 20 17:39:40 2015 +0300
----------------------------------------------------------------------
.../datastructures/IgniteSemaphoreExample.java | 168 ++++
.../ignite/examples/CacheExamplesSelfTest.java | 10 +-
.../src/main/java/org/apache/ignite/Ignite.java | 17 +
.../java/org/apache/ignite/IgniteSemaphore.java | 312 ++++++++
.../apache/ignite/events/DiscoveryEvent.java | 6 +-
.../apache/ignite/internal/IgniteKernal.java | 21 +
.../datastructures/DataStructuresProcessor.java | 199 ++++-
.../datastructures/GridCacheSemaphoreEx.java | 47 ++
.../datastructures/GridCacheSemaphoreImpl.java | 763 +++++++++++++++++++
.../datastructures/GridCacheSemaphoreState.java | 144 ++++
.../IgniteClientReconnectAtomicsTest.java | 44 +-
...eAbstractDataStructuresFailoverSelfTest.java | 275 ++++++-
.../IgniteClientDataStructuresAbstractTest.java | 59 +-
.../IgniteDataStructureUniqueNameTest.java | 14 +-
.../IgniteSemaphoreAbstractSelfTest.java | 411 ++++++++++
.../local/IgniteLocalSemaphoreSelfTest.java | 98 +++
.../IgnitePartitionedSemaphoreSelfTest.java | 33 +
...eplicatedDataStructuresFailoverSelfTest.java | 2 +-
.../IgniteReplicatedSemaphoreSelfTest.java | 33 +
.../cache/GridCacheDataStructuresLoadTest.java | 283 ++++---
.../ignite/testframework/junits/IgniteMock.java | 10 +
.../junits/multijvm/IgniteProcessProxy.java | 7 +
.../org/apache/ignite/IgniteSpringBean.java | 12 +
23 files changed, 2837 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
new file mode 100644
index 0000000..1c078b0
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.datastructures;
+
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteSemaphore;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.examples.ExampleNodeStartup;
+import org.apache.ignite.lang.IgniteRunnable;
+
+/**
+ * This example demonstrates cache based semaphore.
+ * <p>
+ * Remote nodes should always be started with special configuration
+ * file which enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node with {@code
+ * examples/config/example-ignite.xml} configuration.
+ */
+public class IgniteSemaphoreExample {
+ /** Number of items for each producer/consumer to produce/consume. */
+ private static final int OPS_COUNT = 100;
+
+ /** Number of producers. */
+ private static final int NUM_PRODUCERS = 10;
+
+ /** Number of consumers. */
+ private static final int NUM_CONSUMERS = 10;
+
+ /** Synchronization semaphore name. */
+ private static final String SEM_NAME = IgniteSemaphoreExample.class.getSimpleName();
+
+ /**
+ * Executes example.
+ *
+ * @param args Command line arguments, none required.
+ */
+ public static void main(String[] args) {
+ try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+ System.out.println();
+ System.out.println(">>> Cache atomic semaphore example started.");
+
+ // Initialize semaphore.
+ IgniteSemaphore syncSemaphore = ignite.semaphore(SEM_NAME, 0, false, true);
+
+ // Make name of semaphore.
+ final String semaphoreName = UUID.randomUUID().toString();
+
+ // Initialize semaphore.
+ IgniteSemaphore semaphore = ignite.semaphore(semaphoreName, 0, false, true);
+
+ // Start consumers on all cluster nodes.
+ for (int i = 0; i < NUM_CONSUMERS; i++)
+ ignite.compute().withAsync().run(new Consumer(semaphoreName));
+
+ // Start producers on all cluster nodes.
+ for (int i = 0; i < NUM_PRODUCERS; i++)
+ ignite.compute().withAsync().run(new Producer(semaphoreName));
+
+ System.out.println("Master node is waiting for all other nodes to finish...");
+
+ // Wait for everyone to finish.
+ syncSemaphore.acquire(NUM_CONSUMERS + NUM_PRODUCERS);
+ }
+
+ System.out.flush();
+ System.out.println();
+ System.out.println("Finished semaphore example...");
+ System.out.println("Check all nodes for output (this node is also part of the cluster).");
+ }
+
+ /**
+ * Closure which simply waits on the latch on all nodes.
+ */
+ private abstract static class SemaphoreExampleClosure implements IgniteRunnable {
+ /** Semaphore name. */
+ protected final String semaphoreName;
+
+ /**
+ * @param semaphoreName Semaphore name.
+ */
+ SemaphoreExampleClosure(String semaphoreName) {
+ this.semaphoreName = semaphoreName;
+ }
+ }
+
+ /**
+ * Closure which simply signals the semaphore.
+ */
+ private static class Producer extends SemaphoreExampleClosure {
+ /**
+ * @param semaphoreName Semaphore name.
+ */
+ public Producer(String semaphoreName) {
+ super(semaphoreName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
+
+ for (int i = 0; i < OPS_COUNT; i++) {
+ System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() +
+ ", available=" + semaphore.availablePermits() + ']');
+
+ // Signals others that shared resource is available.
+ semaphore.release();
+ }
+
+ System.out.println("Producer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']');
+
+ // Gets the syncing semaphore
+ IgniteSemaphore sem = Ignition.ignite().semaphore(SEM_NAME, 0, true, true);
+
+ // Signals the master thread
+ sem.release();
+ }
+ }
+
+ /**
+ * Closure which simply waits on semaphore.
+ */
+ private static class Consumer extends SemaphoreExampleClosure {
+ /**
+ * @param semaphoreName Semaphore name.
+ */
+ public Consumer(String semaphoreName) {
+ super(semaphoreName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ IgniteSemaphore sem = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
+
+ for (int i = 0; i < OPS_COUNT; i++) {
+ // Block if no permits are available.
+ sem.acquire();
+
+ System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() +
+ ", available=" + sem.availablePermits() + ']');
+ }
+
+ System.out.println("Consumer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']');
+
+ // Gets the syncing semaphore
+ IgniteSemaphore sync = Ignition.ignite().semaphore(SEM_NAME, 3, true, true);
+
+ // Signals the master thread.
+ sync.release();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
----------------------------------------------------------------------
diff --git a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
index 79f404a..c11fa1a 100644
--- a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
+++ b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.examples.datastructures.IgniteAtomicReferenceExample;
import org.apache.ignite.examples.datastructures.IgniteAtomicSequenceExample;
import org.apache.ignite.examples.datastructures.IgniteAtomicStampedExample;
import org.apache.ignite.examples.datastructures.IgniteCountDownLatchExample;
+import org.apache.ignite.examples.datastructures.IgniteSemaphoreExample;
import org.apache.ignite.examples.datastructures.IgniteQueueExample;
import org.apache.ignite.examples.datastructures.IgniteSetExample;
import org.apache.ignite.testframework.junits.common.GridAbstractExamplesTest;
@@ -84,6 +85,13 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest {
/**
* @throws Exception If failed.
*/
+ public void testCacheSemaphoreExample() throws Exception {
+ IgniteSemaphoreExample.main(EMPTY_ARGS);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testCacheQueueExample() throws Exception {
IgniteQueueExample.main(EMPTY_ARGS);
}
@@ -150,4 +158,4 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest {
public void testCacheContinuousQueryExample() throws Exception {
CacheContinuousQueryExample.main(EMPTY_ARGS);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/Ignite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index fc9cf06..17221ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -430,6 +430,23 @@ public interface Ignite extends AutoCloseable {
throws IgniteException;
/**
+ * Gets or creates semaphore. If semaphore is not found in cache and {@code create} flag
+ * is {@code true}, it is created using provided name and count parameter.
+ *
+ * @param name Name of the semaphore.
+ * @param cnt Count for new semaphore creation. Ignored if {@code create} flag is {@code false}.
+ * @param failoverSafe {@code True} to create failover safe semaphore which means that
+ * if any node leaves topology permits already acquired by that node are silently released
+ * and become available for alive nodes to acquire. If flag is {@code false} then
+ * all threads waiting for available permits get interrupted.
+ * @param create Boolean flag indicating whether data structure should be created if does not exist.
+ * @return Semaphore for the given name.
+ * @throws IgniteException If semaphore could not be fetched or created.
+ */
+ public IgniteSemaphore semaphore(String name, int cnt, boolean failoverSafe, boolean create)
+ throws IgniteException;
+
+ /**
* Will get a named queue from cache and create one if it has not been created yet and {@code cfg} is not
* {@code null}.
* If queue is present already, queue properties will not be changed. Use
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
new file mode 100644
index 0000000..db748b4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This interface provides a rich API for working with distributed semaphore.
+ * <p>
+ * <h1 class="header">Functionality</h1>
+ * Distributed semaphore provides functionality similar to {@code java.util.concurrent.Semaphore}.
+ * <h1 class="header">Creating Distributed Semaphore</h1>
+ * Instance of cache semaphore can be created by calling the following method:
+ * {@link Ignite#semaphore(String, int, boolean, boolean)}.
+ */
+public interface IgniteSemaphore extends Closeable {
+ /**
+ * Gets name of the semaphore.
+ *
+ * @return Name of the semaphore.
+ */
+ public String name();
+
+ /**
+ * Acquires a permit from this semaphore, blocking until one is available, or the thread is {@linkplain
+ * Thread#interrupt interrupted}.
+ *
+ * <p>Acquires a permit, if one is available and returns immediately, reducing the number of available permits by
+ * one.
+ *
+ * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies
+ * dormant until one of two things happens: <ul> <li>Some other thread invokes the {@link #release} method for this
+ * semaphore and the current thread is next to be assigned a permit; or <li>Some other thread {@linkplain
+ * Thread#interrupt interrupts} the current thread. </ul>
+ *
+ * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+ * Thread#interrupt interrupted} while waiting for a permit, </ul> then {@link IgniteInterruptedException} is thrown
+ * and the current thread's interrupted status is cleared.
+ *
+ * @throws IgniteInterruptedException if the current thread is interrupted
+ */
+ public void acquire() throws IgniteInterruptedException;
+
+ /**
+ * Acquires a permit from this semaphore, blocking until one is available.
+ *
+ * <p>Acquires a permit, if one is available and returns immediately, reducing the number of available permits by
+ * one.
+ *
+ * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies
+ * dormant until some other thread invokes the {@link #release} method for this semaphore and the current thread is
+ * next to be assigned a permit.
+ *
+ * <p>If the current thread is {@linkplain Thread#interrupt interrupted} while waiting for a permit then it will
+ * continue to wait, but the time at which the thread is assigned a permit may change compared to the time it would
+ * have received the permit had no interruption occurred. When the thread does return from this method its
+ * interrupt status will be set.
+ */
+ public void acquireUninterruptibly();
+
+ /**
+ * Acquires a permit from this semaphore, only if one is available at the time of invocation.
+ *
+ * <p>Acquires a permit, if one is available and returns immediately, with the value {@code true}, reducing the
+ * number of available permits by one.
+ *
+ * <p>If no permit is available then this method will return immediately with the value {@code false}.
+ *
+ * @return {@code true} if a permit was acquired and {@code false} otherwise
+ */
+ public boolean tryAcquire();
+
+ /**
+ * Acquires a permit from this semaphore, if one becomes available within the given waiting time and the current
+ * thread has not been {@linkplain Thread#interrupt interrupted}.
+ *
+ * <p>Acquires a permit, if one is available and returns immediately, with the value {@code true}, reducing the
+ * number of available permits by one.
+ *
+ * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies
+ * dormant until one of three things happens: <ul> <li>Some other thread invokes the {@link #release} method for
+ * this semaphore and the current thread is next to be assigned a permit; or <li>Some other thread {@linkplain
+ * Thread#interrupt interrupts} the current thread; or <li>The specified waiting time elapses. </ul>
+ *
+ * <p>If a permit is acquired then the value {@code true} is returned.
+ *
+ * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+ * Thread#interrupt interrupted} while waiting to acquire a permit, </ul> then {@link IgniteInterruptedException} is
+ * thrown and the current thread's interrupted status is cleared.
+ *
+ * <p>If the specified waiting time elapses then the value {@code false} is returned. If the time is less than or
+ * equal to zero, the method will not wait at all.
+ *
+ * @param timeout the maximum time to wait for a permit
+ * @param unit the time unit of the {@code timeout} argument
+ * @return {@code true} if a permit was acquired and {@code false} if the waiting time elapsed before a permit was
+ * acquired
+ * @throws IgniteInterruptedException if the current thread is interrupted
+ */
+ public boolean tryAcquire(long timeout, TimeUnit unit)
+ throws IgniteInterruptedException;
+
+ /**
+ * Acquires the given number of permits from this semaphore, blocking until all are available.
+ *
+ * <p>Acquires the given number of permits, if they are available, and returns immediately, reducing the number of
+ * available permits by the given amount.
+ *
+ * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes
+ * and lies dormant until some other thread invokes one of the {@link #release() release} methods for this
+ * semaphore, the current thread is next to be assigned permits and the number of available permits satisfies this
+ * request.
+ *
+ * <p>If the current thread is {@linkplain Thread#interrupt interrupted} while waiting for permits then it will
+ * continue to wait and its position in the queue is not affected. When the thread does return from this method its
+ * interrupt status will be set.
+ *
+ * @param permits the number of permits to acquire
+ * @throws IllegalArgumentException if {@code permits} is negative
+ */
+ public void acquireUninterruptibly(int permits);
+
+ /**
+ * Returns the current number of permits available in this semaphore.
+ *
+ * <p>This method is typically used for debugging and testing purposes.
+ *
+ * @return the number of permits available in this semaphore
+ */
+ public int availablePermits();
+
+ /**
+ * Acquires and returns all permits that are immediately available.
+ *
+ * @return the number of permits acquired
+ */
+ public int drainPermits();
+
+ /**
+ * Releases a permit, returning it to the semaphore.
+ *
+ * <p>Releases a permit, increasing the number of available permits by one. If any threads are trying to acquire a
+ * permit, then one is selected and given the permit that was just released. That thread is (re)enabled for thread
+ * scheduling purposes.
+ *
+ * <p>There is no requirement that a thread that releases a permit must have acquired that permit by calling {@link
+ * #acquire}. Correct usage of a semaphore is established by programming convention in the application.
+ */
+ public void release();
+
+ /**
+ * Acquires the given number of permits from this semaphore, if all become available within the given waiting time
+ * and the current thread has not been {@linkplain Thread#interrupt interrupted}.
+ *
+ * <p>Acquires the given number of permits, if they are available and returns immediately, with the value {@code
+ * true}, reducing the number of available permits by the given amount.
+ *
+ * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes
+ * and lies dormant until one of three things happens: <ul> <li>Some other thread invokes one of the {@link
+ * #release() release} methods for this semaphore, the current thread is next to be assigned permits and the number
+ * of available permits satisfies this request; or <li>Some other thread {@linkplain Thread#interrupt interrupts}
+ * the current thread; or <li>The specified waiting time elapses. </ul>
+ *
+ * <p>If the permits are acquired then the value {@code true} is returned.
+ *
+ * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+ * Thread#interrupt interrupted} while waiting to acquire the permits, </ul> then {@link IgniteInterruptedException}
+ * is thrown and the current thread's interrupted status is cleared. Any permits that were to be assigned to this
+ * thread, are instead assigned to other threads trying to acquire permits, as if the permits had been made
+ * available by a call to {@link #release()}.
+ *
+ * <p>If the specified waiting time elapses then the value {@code false} is returned. If the time is less than or
+ * equal to zero, the method will not wait at all. Any permits that were to be assigned to this thread, are instead
+ * assigned to other threads trying to acquire permits, as if the permits had been made available by a call to
+ * {@link #release()}.
+ *
+ * @param permits the number of permits to acquire
+ * @param timeout the maximum time to wait for the permits
+ * @param unit the time unit of the {@code timeout} argument
+ * @return {@code true} if all permits were acquired and {@code false} if the waiting time elapsed before all
+ * permits were acquired
+ * @throws IgniteInterruptedException if the current thread is interrupted
+ * @throws IllegalArgumentException if {@code permits} is negative
+ */
+ public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
+ throws IgniteInterruptedException;
+
+ /**
+ * Acquires the given number of permits from this semaphore, only if all are available at the time of invocation.
+ *
+ * <p>Acquires the given number of permits, if they are available, and returns immediately, with the value {@code
+ * true}, reducing the number of available permits by the given amount.
+ *
+ * <p>If insufficient permits are available then this method will return immediately with the value {@code false}
+ * and the number of available permits is unchanged.
+ *
+ * <p>If you want to honor the failoverSafe setting, then use {@link #tryAcquire(int, long, TimeUnit)
+ * tryAcquire(permits, 0, TimeUnit.SECONDS) } which is almost equivalent (it also detects interruption).
+ *
+ * @param permits the number of permits to acquire
+ * @return {@code true} if the permits were acquired and {@code false} otherwise
+ * @throws IllegalArgumentException if {@code permits} is negative
+ */
+ public boolean tryAcquire(int permits);
+
+ /**
+ * Acquires the given number of permits from this semaphore, blocking until all are available, or the thread is
+ * {@linkplain Thread#interrupt interrupted}.
+ *
+ * <p>Acquires the given number of permits, if they are available, and returns immediately, reducing the number of
+ * available permits by the given amount.
+ *
+ * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes
+ * and lies dormant until one of two things happens: <ul> <li>Some other thread invokes one of the {@link #release()
+ * release} methods for this semaphore, the current thread is next to be assigned permits and the number of
+ * available permits satisfies this request; or <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+ * current thread. </ul>
+ *
+ * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+ * Thread#interrupt interrupted} while waiting for a permit, </ul> then {@link IgniteInterruptedException} is thrown
+ * and the current thread's interrupted status is cleared. Any permits that were to be assigned to this thread are
+ * instead assigned to other threads trying to acquire permits, as if permits had been made available by a call to
+ * {@link #release()}.
+ *
+ * @param permits the number of permits to acquire
+ * @throws IgniteInterruptedException if the current thread is interrupted
+ * @throws IllegalArgumentException if {@code permits} is negative
+ */
+ public void acquire(int permits) throws IgniteInterruptedException;
+
+ /**
+ * Releases the given number of permits, returning them to the semaphore.
+ *
+ * <p>Releases the given number of permits, increasing the number of available permits by that amount. If any
+ * threads are trying to acquire permits, then one is selected and given the permits that were just released. If the
+ * number of available permits satisfies that thread's request then that thread is (re)enabled for thread scheduling
+ * purposes; otherwise the thread will wait until sufficient permits are available. If there are still permits
+ * available after this thread's request has been satisfied, then those permits are assigned in turn to other
+ * threads trying to acquire permits.
+ *
+ * <p>There is no requirement that a thread that releases a permit must have acquired that permit by calling {@link
+ * IgniteSemaphore#acquire acquire}. Correct usage of a semaphore is established by programming convention in the
+ * application.
+ *
+ * @param permits the number of permits to release
+ * @throws IllegalArgumentException if {@code permits} is negative
+ */
+ public void release(int permits);
+
+ /**
+ * Returns {@code true} if this semaphore is safe to use after node failure.
+ * If not, IgniteInterruptedException is thrown on every other node after node failure.
+ *
+ * @return {@code true} if this semaphore has failoverSafe set true
+ */
+ public boolean isFailoverSafe();
+
+ /**
+ * Queries whether any threads are waiting to acquire. Note that because cancellations may occur at any time, a
+ * {@code true} return does not guarantee that any other thread will ever acquire. This method is designed
+ * primarily for use in monitoring of the system state.
+ *
+ * @return {@code true} if there may be other threads waiting to acquire the lock
+ */
+ public boolean hasQueuedThreads();
+
+ /**
+ * Returns an estimate of the number of nodes waiting to acquire. The value is only an estimate because the number
+ * of nodes that are waiting may change dynamically while this method traverses internal data structures. This method is designed
+ * for use in monitoring of the system state, not for synchronization control.
+ *
+ * @return the estimated number of nodes waiting for this lock
+ */
+ public int getQueueLength();
+
+ /**
+ * Gets {@code broken} status of the semaphore.
+ *
+ * @return {@code True} if a node failed on this semaphore and failoverSafe flag was set to false, {@code false} otherwise.
+ */
+ public boolean isBroken();
+
+ /**
+ * Gets {@code removed} status of the semaphore.
+ *
+ * @return {@code True} if semaphore was removed from cache, {@code false} otherwise.
+ */
+ public boolean removed();
+
+ /**
+ * Removes this semaphore.
+ *
+ * @throws IgniteException If operation failed.
+ */
+ @Override public void close();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/events/DiscoveryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/DiscoveryEvent.java b/modules/core/src/main/java/org/apache/ignite/events/DiscoveryEvent.java
index 49c4f6e..09f23bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/DiscoveryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/DiscoveryEvent.java
@@ -113,9 +113,9 @@ public class DiscoveryEvent extends EventAdapter {
/**
* Gets node that caused this event to be generated. It is potentially different from the node
* on which this event was recorded. For example, node {@code A} locally recorded the event that a remote node
- * {@code B} joined the topology. In this case this method will return ID of {@code B}.
+ * {@code B} joined the topology. In this case this method will return node {@code B}.
*
- * @return Event node ID.
+ * @return Event node.
*/
public ClusterNode eventNode() {
return evtNode;
@@ -162,4 +162,4 @@ public class DiscoveryEvent extends EventAdapter {
"type", name(),
"tstamp", timestamp());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index f1d67af..02096dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -58,6 +58,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteException;
@@ -2936,6 +2937,26 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
/** {@inheritDoc} */
+ @Nullable @Override public IgniteSemaphore semaphore(
+ String name,
+ int cnt,
+ boolean failoverSafe,
+ boolean create
+ ) {
+ guard();
+
+ try {
+ return ctx.dataStructures().semaphore(name, cnt, failoverSafe, create);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public <T> IgniteQueue<T> queue(String name,
int cap,
CollectionConfiguration cfg)
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 810bd8c..b532d7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -45,16 +45,20 @@ import org.apache.ignite.IgniteCountDownLatch;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.configuration.AtomicConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.CollectionConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
import org.apache.ignite.internal.processors.cache.CacheType;
@@ -82,12 +86,15 @@ import org.jsr166.ConcurrentHashMap8;
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_LONG;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_REF;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_SEQ;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_STAMPED;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.COUNT_DOWN_LATCH;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.QUEUE;
+import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SEMAPHORE;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SET;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -131,13 +138,16 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
/** Cache contains only {@code GridCacheCountDownLatchValue}. */
private IgniteInternalCache<GridCacheInternalKey, GridCacheCountDownLatchValue> cntDownLatchView;
+ /** Cache contains only {@code GridCacheSemaphoreState}. */
+ private IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semView;
+
/** Cache contains only {@code GridCacheAtomicReferenceValue}. */
private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicReferenceValue> atomicRefView;
/** Cache contains only {@code GridCacheAtomicStampedValue}. */
private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicStampedValue> atomicStampedView;
- /** Cache contains only entry {@code GridCacheSequenceValue}. */
+ /** Cache contains only entry {@code GridCacheSequenceValue}. */
private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicSequenceValue> seqView;
/** Cache context for atomic data structures. */
@@ -167,6 +177,37 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ super.start();
+
+ ctx.event().addLocalEventListener(
+ new GridLocalEventListener() {
+ @Override public void onEvent(final Event evt) {
+ // This may require cache operation to exectue,
+ // therefore cannot use event notification thread.
+ ctx.closure().callLocalSafe(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+ UUID leftNodeId = discoEvt.eventNode().id();
+
+ for (GridCacheRemovable ds : dsMap.values()) {
+ if (ds instanceof GridCacheSemaphoreEx)
+ ((GridCacheSemaphoreEx)ds).onNodeRemoved(leftNodeId);
+ }
+
+ return null;
+ }
+ },
+ false);
+ }
+ },
+ EVT_NODE_LEFT,
+ EVT_NODE_FAILED);
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void onKernalStart() throws IgniteCheckedException {
if (ctx.config().isDaemon())
@@ -187,6 +228,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
cntDownLatchView = atomicsCache;
+ semView = atomicsCache;
+
atomicLongView = atomicsCache;
atomicRefView = atomicsCache;
@@ -262,7 +305,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
*
* @param name Sequence name.
* @param initVal Initial value for sequence. If sequence already cached, {@code initVal} will be ignored.
- * @param create If {@code true} sequence will be created in case it is not in cache.
+ * @param create If {@code true} sequence will be created in case it is not in cache.
* @return Sequence.
* @throws IgniteCheckedException If loading failed.
*/
@@ -1194,6 +1237,124 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
}
/**
+ * Gets or creates semaphore. If semaphore is not found in cache,
+ * it is created using provided name and count parameter.
+ *
+ * @param name Name of the semaphore.
+ * @param cnt Initial count.
+ * @param failoverSafe {@code True} FailoverSafe parameter.
+ * @param create If {@code true} semaphore will be created in case it is not in cache,
+ * if it is {@code false} all parameters except {@code name} are ignored.
+ * @return Semaphore for the given name or {@code null} if it is not found and
+ * {@code create} is false.
+ * @throws IgniteCheckedException If operation failed.
+ */
+ public IgniteSemaphore semaphore(final String name, final int cnt, final boolean failoverSafe, final boolean create)
+ throws IgniteCheckedException {
+ A.notNull(name, "name");
+
+ awaitInitialization();
+
+ checkAtomicsConfiguration();
+
+ startQuery();
+
+ return getAtomic(new IgniteOutClosureX<IgniteSemaphore>() {
+ @Override public IgniteSemaphore applyx() throws IgniteCheckedException {
+ GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
+
+ dsCacheCtx.gate().enter();
+
+ try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class);
+
+ // Check that semaphore hasn't been created in other thread yet.
+ GridCacheSemaphoreEx sem = cast(dsMap.get(key), GridCacheSemaphoreEx.class);
+
+ if (sem != null) {
+ assert val != null;
+
+ return sem;
+ }
+
+ if (val == null && !create)
+ return null;
+
+ if (val == null) {
+ val = new GridCacheSemaphoreState(cnt, new HashMap<UUID, Integer>(), failoverSafe);
+
+ dsView.put(key, val);
+ }
+
+ GridCacheSemaphoreEx sem0 = new GridCacheSemaphoreImpl(
+ name,
+ key,
+ semView,
+ dsCacheCtx);
+
+ dsMap.put(key, sem0);
+
+ tx.commit();
+
+ return sem0;
+ }
+ catch (Error | Exception e) {
+ dsMap.remove(key);
+
+ U.error(log, "Failed to create semaphore: " + name, e);
+
+ throw e;
+ }
+ finally {
+ dsCacheCtx.gate().leave();
+ }
+ }
+ }, new DataStructureInfo(name, SEMAPHORE, null), create, GridCacheSemaphoreEx.class);
+ }
+
+ /**
+ * Removes semaphore from cache.
+ *
+ * @param name Name of the semaphore.
+ * @throws IgniteCheckedException If operation failed.
+ */
+ public void removeSemaphore(final String name) throws IgniteCheckedException {
+ assert name != null;
+ assert dsCacheCtx != null;
+
+ awaitInitialization();
+
+ removeDataStructure(new IgniteOutClosureX<Void>() {
+ @Override public Void applyx() throws IgniteCheckedException {
+ GridCacheInternal key = new GridCacheInternalKeyImpl(name);
+
+ dsCacheCtx.gate().enter();
+
+ try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ // Check correctness type of removable object.
+ GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class);
+
+ if (val != null) {
+ if (val.getCount() < 0)
+ throw new IgniteCheckedException("Failed to remove semaphore with blocked threads. ");
+
+ dsView.remove(key);
+
+ tx.commit();
+ }
+ else
+ tx.setRollbackOnly();
+
+ return null;
+ }
+ finally {
+ dsCacheCtx.gate().leave();
+ }
+ }
+ }, name, SEMAPHORE, null);
+ }
+
+ /**
* Remove internal entry by key from cache.
*
* @param key Internal entry key.
@@ -1240,7 +1401,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED)
- return evt.getValue() instanceof GridCacheCountDownLatchValue;
+ return evt.getValue() instanceof GridCacheCountDownLatchValue ||
+ evt.getValue() instanceof GridCacheSemaphoreState;
else {
assert evt.getEventType() == EventType.REMOVED : evt;
@@ -1318,6 +1480,25 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
", actual=" + latch.getClass() + ", value=" + latch + ']');
}
}
+ else if (val0 instanceof GridCacheSemaphoreState) {
+ GridCacheInternalKey key = evt.getKey();
+
+ // Notify semaphore on changes.
+ final GridCacheRemovable sem = dsMap.get(key);
+
+ GridCacheSemaphoreState val = (GridCacheSemaphoreState)val0;
+
+ if (sem instanceof GridCacheSemaphoreEx) {
+ final GridCacheSemaphoreEx semaphore0 = (GridCacheSemaphoreEx)sem;
+
+ semaphore0.onUpdate(val);
+ }
+ else if (sem != null) {
+ U.error(log, "Failed to cast object " +
+ "[expected=" + IgniteSemaphore.class.getSimpleName() +
+ ", actual=" + sem.getClass() + ", value=" + sem + ']');
+ }
+ }
}
else {
@@ -1407,7 +1588,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
* @return Removed value.
*/
@SuppressWarnings("unchecked")
- @Nullable private <T> T retryRemove(final IgniteInternalCache cache, final Object key) throws IgniteCheckedException {
+ @Nullable private <T> T retryRemove(final IgniteInternalCache cache, final Object key)
+ throws IgniteCheckedException {
return retry(log, new Callable<T>() {
@Nullable @Override public T call() throws Exception {
return (T)cache.getAndRemove(key);
@@ -1432,7 +1614,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
catch (ClusterGroupEmptyCheckedException e) {
throw new IgniteCheckedException(e);
}
- catch (IgniteTxRollbackCheckedException | CachePartialUpdateCheckedException | ClusterTopologyCheckedException e) {
+ catch (IgniteTxRollbackCheckedException |
+ CachePartialUpdateCheckedException |
+ ClusterTopologyCheckedException e) {
if (cnt++ == MAX_UPDATE_RETRIES)
throw e;
else {
@@ -1535,7 +1719,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
QUEUE(IgniteQueue.class.getSimpleName()),
/** */
- SET(IgniteSet.class.getSimpleName());
+ SET(IgniteSet.class.getSimpleName()),
+
+ /** */
+ SEMAPHORE(IgniteSemaphore.class.getSimpleName());
/** */
private static final DataStructureType[] VALS = values();
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
new file mode 100644
index 0000000..4d39635
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteSemaphore;
+
+/**
+ * Grid cache semaphore ({@code 'Ex'} stands for external).
+ */
+public interface GridCacheSemaphoreEx extends IgniteSemaphore, GridCacheRemovable {
+ /**
+ * Get current semaphore key.
+ *
+ * @return Semaphore key.
+ */
+ public GridCacheInternalKey key();
+
+ /**
+ * Callback to notify semaphore on changes.
+ *
+ * @param val State containing the number of available permissions.
+ */
+ public void onUpdate(GridCacheSemaphoreState val);
+
+ /**
+ * Callback to notify semaphore on topology changes.
+ *
+ * @param nodeId Id of the node that left the grid.
+ */
+ public void onNodeRemoved(UUID nodeId);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
new file mode 100644
index 0000000..37df9d5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Cache semaphore implementation based on AbstractQueuedSynchronizer.
+ * Current implementation supports only unfair semaphores.
+ * If any node fails after acquiring permissions on cache semaphore, there are two different behaviors controlled with the
+ * parameter failoverSafe. If this parameter is true, other nodes can reacquire permits that were acquired by the failing node.
+ * In case this parameter is false, IgniteInterruptedException is called on every node waiting on this semaphore.
+ */
+public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Deserialization stash. */
+ private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
+ new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
+ @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
+ return F.t2();
+ }
+ };
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** Semaphore name. */
+ private String name;
+
+ /** Removed flag. */
+ private volatile boolean rmvd;
+
+ /** Semaphore key. */
+ private GridCacheInternalKey key;
+
+ /** Semaphore projection. */
+ private IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semView;
+
+ /** Cache context. */
+ private GridCacheContext ctx;
+
+ /** Initialization guard. */
+ private final AtomicBoolean initGuard = new AtomicBoolean();
+
+ /** Initialization latch. */
+ private final CountDownLatch initLatch = new CountDownLatch(1);
+
+ /** Internal synchronization object. */
+ private Sync sync;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridCacheSemaphoreImpl() {
+ // No-op.
+ }
+
+ /**
+ * Synchronization implementation for semaphore.
+ * Uses AQS state to represent permits.
+ */
+ final class Sync extends AbstractQueuedSynchronizer {
+ private static final long serialVersionUID = 1192457210091910933L;
+
+ /** Map containing number of acquired permits for each node waiting on this semaphore. */
+ private Map<UUID, Integer> nodeMap;
+
+ /** Flag indicating that it is safe to continue after node that acquired semaphore fails. */
+ final boolean failoverSafe;
+
+ /** Flag indicating that a node failed and it is not safe to continue using this semaphore. */
+ protected boolean broken = false;
+
+ protected Sync(int permits, Map<UUID, Integer> waiters, boolean failoverSafe) {
+ setState(permits);
+ nodeMap = waiters;
+ this.failoverSafe = failoverSafe;
+ }
+
+ /**
+ * Sets a map containing number of permits acquired by each node using this semaphore. This method should only
+ * be called in {@linkplain GridCacheSemaphoreImpl#onUpdate(GridCacheSemaphoreState)}.
+ *
+ * @param nodeMap NodeMap.
+ */
+ protected synchronized void setWaiters(Map<UUID, Integer> nodeMap) {
+ this.nodeMap = nodeMap;
+ }
+
+ /**
+ * Gets the number of nodes waiting at this semaphore.
+ *
+ * @return Number of nodes waiting at this semaphore.
+ */
+ public int getWaiters() {
+ int totalWaiters = 0;
+
+ for (Integer i : nodeMap.values()) {
+ if (i > 0)
+ totalWaiters++;
+ }
+
+ return totalWaiters;
+ }
+
+ /**
+ * Get number of permits for node.
+ *
+ * @param nodeID Node ID.
+ * @return Number of permits node has acquired at this semaphore. Can be less than 0 if
+ * more permits were released than acquired on node.
+ */
+ public int getPermitsForNode(UUID nodeID){
+ return nodeMap.containsKey(nodeID) ? nodeMap.get(nodeID) : 0;
+ }
+
+ /**
+ * Sets the number of permits currently available on this semaphore. This method should only be used in
+ * {@linkplain GridCacheSemaphoreImpl#onUpdate(GridCacheSemaphoreState)}.
+ *
+ * @param permits Number of permits available at this semaphore.
+ */
+ final synchronized void setPermits(int permits) {
+ setState(permits);
+ }
+
+ /**
+ * Gets the number of permissions currently available.
+ *
+ * @return Number of permits available at this semaphore.
+ */
+ final int getPermits() {
+ return getState();
+ }
+
+ /**
+ * This method is used by the AQS to test if the current thread should block or not.
+ *
+ * @param acquires Number of permits to acquire.
+ * @return Negative number if thread should block, positive if thread successfully acquires permits.
+ */
+ final int nonfairTryAcquireShared(int acquires) {
+ for (;;) {
+ int available = getState();
+
+ int remaining = available - acquires;
+
+ if (remaining < 0 || compareAndSetGlobalState(available, remaining, false)) {
+ return remaining;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int tryAcquireShared(int acquires) {
+ return nonfairTryAcquireShared(acquires);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected final boolean tryReleaseShared(int releases) {
+ // Check if some other node updated the state.
+ // This method is called with release==0 only when trying to wake through update.
+ if (releases == 0)
+ return true;
+
+ for (;;) {
+ int cur = getState();
+
+ int next = cur + releases;
+
+ if (next < cur) // overflow
+ throw new Error("Maximum permit count exceeded");
+
+ if (compareAndSetGlobalState(cur, next, false))
+ return true;
+ }
+ }
+
+ /**
+ * This method is used internally to implement {@linkplain GridCacheSemaphoreImpl#drainPermits()}.
+ *
+ * @return Number of permits to drain.
+ */
+ final int drainPermits() {
+ for (;;) {
+
+ int current = getState();
+
+ if (current == 0 || compareAndSetGlobalState(current, 0, true))
+ return current;
+ }
+ }
+
+ /**
+ * This method is used for synchronizing the semaphore state across all nodes.
+ *
+ * @param expVal Expected number of permits.
+ * @param newVal New number of permits.
+ * @param draining True if used for draining the permits.
+ * @return True if this is the call that succeeded to change the global state.
+ */
+ protected boolean compareAndSetGlobalState(final int expVal, final int newVal, final boolean draining) {
+ try {
+ return CU.outTx(
+ retryTopologySafe(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx,
+ semView,
+ PESSIMISTIC, REPEATABLE_READ)
+ ) {
+ GridCacheSemaphoreState val = semView.get(key);
+
+ if (val == null)
+ throw new IgniteCheckedException("Failed to find semaphore with given name: " +
+ name);
+
+ boolean retVal = val.getCount() == expVal;
+
+ if (retVal) {
+ // If this is not a call to drain permits,
+ // Modify global permission count for the calling node.
+ if (!draining) {
+ UUID nodeID = ctx.localNodeId();
+
+ Map<UUID,Integer> map = val.getWaiters();
+
+ int waitingCnt = expVal - newVal;
+
+ if(map.containsKey(nodeID))
+ waitingCnt += map.get(nodeID);
+
+ map.put(nodeID, waitingCnt);
+
+ val.setWaiters(map);
+ }
+
+ val.setCount(newVal);
+
+ semView.put(key, val);
+
+ tx.commit();
+ }
+
+ return retVal;
+ }
+ catch (Error | Exception e) {
+ if (!ctx.kernalContext().isStopping())
+ U.error(log, "Failed to compare and set: " + this, e);
+
+ throw e;
+ }
+ }
+ }),
+ ctx
+ );
+ }
+ catch (IgniteCheckedException e) {
+ if (ctx.kernalContext().isStopping()) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring failure in semaphore on node left handler (node is stopping): " + e);
+
+ return true;
+ }
+ else
+ throw U.convertException(e);
+ }
+ }
+
+ /**
+ * This method is used for releasing the permits acquired by failing node.
+ *
+ * @param nodeId ID of the failing node.
+ * @return True if this is the call that succeeded to change the global state.
+ */
+ protected boolean releaseFailedNode(final UUID nodeId) {
+ try {
+ return CU.outTx(
+ retryTopologySafe(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ try (
+ IgniteInternalTx tx = CU.txStartInternal(ctx,
+ semView,
+ PESSIMISTIC, REPEATABLE_READ)
+ ) {
+ GridCacheSemaphoreState val = semView.get(key);
+
+ if (val == null)
+ throw new IgniteCheckedException("Failed to find semaphore with given name: " +
+ name);
+
+ Map<UUID,Integer> map = val.getWaiters();
+
+ if(!map.containsKey(nodeId)){
+ tx.rollback();
+
+ return false;
+ }
+
+ int numPermits = map.get(nodeId);
+
+ if(numPermits > 0)
+ val.setCount(val.getCount() + numPermits);
+
+ map.remove(nodeId);
+
+ val.setWaiters(map);
+
+ semView.put(key, val);
+
+ sync.nodeMap = map;
+
+ tx.commit();
+
+ return true;
+ }
+ catch (Error | Exception e) {
+ if (!ctx.kernalContext().isStopping())
+ U.error(log, "Failed to compare and set: " + this, e);
+
+ throw e;
+ }
+ }
+ }),
+ ctx
+ );
+ }
+ catch (IgniteCheckedException e) {
+ if (ctx.kernalContext().isStopping()) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring failure in semaphore on node left handler (node is stopping): " + e);
+
+ return true;
+ }
+ else
+ throw U.convertException(e);
+ }
+ }
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param name Semaphore name.
+ * @param key Semaphore key.
+ * @param semView Semaphore projection.
+ * @param ctx Cache context.
+ */
+ public GridCacheSemaphoreImpl(
+ String name,
+ GridCacheInternalKey key,
+ IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semView,
+ GridCacheContext ctx
+ ) {
+ assert name != null;
+ assert key != null;
+ assert semView != null;
+ assert ctx != null;
+
+ this.name = name;
+ this.key = key;
+ this.semView = semView;
+ this.ctx = ctx;
+
+ log = ctx.logger(getClass());
+ }
+
+ /**
+ * @throws IgniteCheckedException If operation failed.
+ */
+ private void initializeSemaphore() throws IgniteCheckedException {
+ if (!initGuard.get() && initGuard.compareAndSet(false, true)) {
+ try {
+ sync = CU.outTx(
+ retryTopologySafe(new Callable<Sync>() {
+ @Override public Sync call() throws Exception {
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx,
+ semView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheSemaphoreState val = semView.get(key);
+
+ if (val == null) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to find semaphore with given name: " + name);
+
+ return null;
+ }
+
+ final int count = val.getCount();
+
+ Map<UUID, Integer> waiters = val.getWaiters();
+
+ final boolean failoverSafe = val.isFailoverSafe();
+
+ tx.commit();
+
+ return new Sync(count, waiters, failoverSafe);
+ }
+ }
+ }),
+ ctx
+ );
+
+ if (log.isDebugEnabled())
+ log.debug("Initialized internal sync structure: " + sync);
+ }
+ finally {
+ initLatch.countDown();
+ }
+ }
+ else {
+ U.await(initLatch);
+
+ if (sync == null)
+ throw new IgniteCheckedException("Internal semaphore has not been properly initialized.");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheInternalKey key() {
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ return rmvd;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onRemoved() {
+ return rmvd = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdate(GridCacheSemaphoreState val) {
+ if (sync == null)
+ return;
+
+ // Update permission count.
+ sync.setPermits(val.getCount());
+
+ // Update waiters' counts.
+ sync.setWaiters(val.getWaiters());
+
+ // Try to notify any waiting threads.
+ sync.releaseShared(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onNodeRemoved(UUID nodeId) {
+ int numPermits = sync.getPermitsForNode(nodeId);
+
+ if (numPermits > 0) {
+ if (sync.failoverSafe)
+ // Release permits acquired by threads on failing node.
+ sync.releaseFailedNode(nodeId);
+ else {
+ // Interrupt every waiting thread if this semaphore is not failover safe.
+ sync.broken = true;
+
+ for (Thread t : sync.getSharedQueuedThreads())
+ t.interrupt();
+
+ // Try to notify any waiting threads.
+ sync.releaseShared(0);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void needCheckNotRemoved() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void acquire() throws IgniteInterruptedException {
+ acquire(1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void acquire(int permits) throws IgniteInterruptedException {
+ A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
+ try {
+ initializeSemaphore();
+
+ if(isBroken())
+ Thread.currentThread().interrupt();
+
+ sync.acquireSharedInterruptibly(permits);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void acquireUninterruptibly() {
+ try {
+ initializeSemaphore();
+
+ sync.acquireShared(1);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void acquireUninterruptibly(int permits) {
+ A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
+ try {
+ initializeSemaphore();
+
+ sync.acquireShared(permits);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int availablePermits() {
+ int ret;
+ try {
+ initializeSemaphore();
+
+ ret = CU.outTx(
+ retryTopologySafe(new Callable<Integer>() {
+ @Override public Integer call() throws Exception {
+ try (
+ IgniteInternalTx tx = CU.txStartInternal(ctx,
+ semView, PESSIMISTIC, REPEATABLE_READ)
+ ) {
+ GridCacheSemaphoreState val = semView.get(key);
+
+ if (val == null)
+ throw new IgniteException("Failed to find semaphore with given name: " + name);
+
+ int count = val.getCount();
+
+ tx.rollback();
+
+ return count;
+ }
+ }
+ }),
+ ctx
+ );
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+
+ return ret;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int drainPermits() {
+ try {
+ initializeSemaphore();
+
+ return sync.drainPermits();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean tryAcquire() {
+ try {
+ initializeSemaphore();
+
+ return sync.nonfairTryAcquireShared(1) >= 0;
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean tryAcquire(long timeout, TimeUnit unit) throws IgniteException {
+ try {
+ initializeSemaphore();
+
+ return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void release() {
+ release(1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void release(int permits) {
+ A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
+ try {
+ initializeSemaphore();
+
+ sync.releaseShared(permits);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean tryAcquire(int permits) {
+ A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
+ try {
+ initializeSemaphore();
+
+ return sync.nonfairTryAcquireShared(permits) >= 0;
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws IgniteInterruptedException {
+ A.ensure(permits >= 0, "Number of permits must be non-negative.");
+ try {
+ initializeSemaphore();
+
+ return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isFailoverSafe() {
+ return sync.failoverSafe;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasQueuedThreads() {
+ try {
+ initializeSemaphore();
+
+ return sync.getWaiters() != 0;
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getQueueLength() {
+ try {
+ initializeSemaphore();
+
+ return sync.getWaiters();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isBroken(){
+ return sync.broken;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(ctx.kernalContext());
+ out.writeUTF(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ IgniteBiTuple<GridKernalContext, String> t = stash.get();
+
+ t.set1((GridKernalContext)in.readObject());
+ t.set2(in.readUTF());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ if (!rmvd) {
+ try {
+ ctx.kernalContext().dataStructures().removeSemaphore(name);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheSemaphoreImpl.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
new file mode 100644
index 0000000..50cdf10
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Grid cache semaphore state.
+ */
+public class GridCacheSemaphoreState implements GridCacheInternal, Externalizable, Cloneable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Permission count. */
+ private int cnt;
+
+ /** Map containing number of acquired permits for each node waiting on this semaphore. */
+ @GridToStringInclude
+ private Map<UUID, Integer> waiters;
+
+ /** FailoverSafe flag. */
+ private boolean failoverSafe;
+
+ /**
+ * Constructor.
+ *
+ * @param cnt Number of permissions.
+ * @param waiters Waiters map.
+ * @param failoverSafe Failover safe flag.
+ */
+ public GridCacheSemaphoreState(int cnt, @Nullable Map<UUID,Integer> waiters, boolean failoverSafe) {
+ this.cnt = cnt;
+ this.waiters = waiters;
+ this.failoverSafe = failoverSafe;
+ }
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ public GridCacheSemaphoreState() {
+ // No-op.
+ }
+
+ /**
+ * @param cnt New count.
+ */
+ public void setCount(int cnt) {
+ this.cnt = cnt;
+ }
+
+ /**
+ * @return Current count.
+ */
+ public int getCount() {
+ return cnt;
+ }
+
+ /**
+ * @return Waiters.
+ */
+ public Map<UUID,Integer> getWaiters() {
+ return waiters;
+ }
+
+ /**
+ * @param waiters Map containing the number of permissions acquired by each node.
+ */
+ public void setWaiters(Map<UUID, Integer> waiters) {
+ this.waiters = waiters;
+ }
+
+ /**
+ * @return failoverSafe flag.
+ */
+ public boolean isFailoverSafe() {
+ return failoverSafe;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object clone() throws CloneNotSupportedException {
+ return super.clone();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(cnt);
+ out.writeBoolean(failoverSafe);
+ out.writeBoolean(waiters != null);
+
+ if (waiters != null) {
+ out.writeInt(waiters.size());
+
+ for (Map.Entry<UUID, Integer> e : waiters.entrySet()) {
+ U.writeUuid(out, e.getKey());
+ out.writeInt(e.getValue());
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException {
+ cnt = in.readInt();
+ failoverSafe = in.readBoolean();
+
+ if (in.readBoolean()) {
+ int size = in.readInt();
+
+ waiters = U.newHashMap(size);
+
+ for (int i = 0; i < size; i++)
+ waiters.put(U.readUuid(in), in.readInt());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheSemaphoreState.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index 55dbb57..c46b5c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteAtomicStamped;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.testframework.GridTestUtils;
@@ -675,4 +676,45 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertTrue(srvLatch.await(1000));
assertTrue(clientLatch.await(1000));
}
-}
\ No newline at end of file
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSemaphoreReconnect() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ IgniteSemaphore clientSemaphore = client.semaphore("semaphore1", 3, false, true);
+
+ assertEquals(3, clientSemaphore.availablePermits());
+
+ final IgniteSemaphore srvSemaphore = srv.semaphore("semaphore1", 3, false, false);
+
+ assertEquals(3, srvSemaphore.availablePermits());
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srvSemaphore.acquire();
+ }
+ });
+
+ assertEquals(2, srvSemaphore.availablePermits());
+ assertEquals(2, clientSemaphore.availablePermits());
+
+ srvSemaphore.acquire();
+
+ assertEquals(1, srvSemaphore.availablePermits());
+ assertEquals(1, clientSemaphore.availablePermits());
+
+ clientSemaphore.acquire();
+
+ assertEquals(0, srvSemaphore.availablePermits());
+ assertEquals(0, clientSemaphore.availablePermits());
+
+ assertFalse(srvSemaphore.tryAcquire());
+ assertFalse(srvSemaphore.tryAcquire());
+ }
+}