You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/10/23 18:10:42 UTC
[1/6] ignite git commit: Implements distributed semaphore ignite-638
Repository: ignite
Updated Branches:
refs/heads/ignite-638 [created] 5f38a18e1
Implements distributed semaphore ignite-638
This interface provides a rich API for working with distributed semaphore.
Distributed semaphore provides functionality similar to java.util.concurrent.Semaphore.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e9567ade
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e9567ade
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e9567ade
Branch: refs/heads/ignite-638
Commit: e9567adec4057b0a0528735d4934440f4cd9fee3
Parents: b08e0b2
Author: vladisav <vl...@gmail.com>
Authored: Fri Sep 25 11:14:18 2015 +0200
Committer: vladisav <vl...@gmail.com>
Committed: Thu Oct 1 04:26:48 2015 +0200
----------------------------------------------------------------------
.../datastructures/IgniteSemaphoreExample.java | 183 ++++++
.../src/main/java/org/apache/ignite/Ignite.java | 14 +
.../java/org/apache/ignite/IgniteSemaphore.java | 390 ++++++++++++
.../apache/ignite/internal/IgniteKernal.java | 19 +
.../datastructures/DataStructuresProcessor.java | 161 ++++-
.../datastructures/GridCacheSemaphoreEx.java | 22 +
.../datastructures/GridCacheSemaphoreImpl.java | 619 +++++++++++++++++++
.../datastructures/GridCacheSemaphoreState.java | 128 ++++
.../datastructures/GridCacheSemaphoreValue.java | 115 ++++
.../ignite/testframework/junits/IgniteMock.java | 10 +
.../junits/multijvm/IgniteProcessProxy.java | 7 +
.../org/apache/ignite/IgniteSpringBean.java | 11 +
12 files changed, 1677 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
new file mode 100644
index 0000000..2ef242c
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
@@ -0,0 +1,183 @@
+package org.apache.ignite.examples.datastructures;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.examples.ExampleNodeStartup;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.UUID;
+
+/**
+ * This example demonstrates cache based semaphore.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-ignite.xml} configuration.
+ *
+ * @author Vladisav Jelisavcic
+ */
+public class IgniteSemaphoreExample {
+ /** Cache name. */
+ private static final String CACHE_NAME = IgniteSemaphoreExample.class.getSimpleName();
+
+ /** Number of items for each producer/consumer to produce/consume. */
+ private static final int ITEM_COUNT = 100;
+
+ /** Number of producers. */
+ private static final int NUM_PRODUCERS = 10;
+
+ /** Number of consumers. */
+ private static final int NUM_CONSUMERS = 10;
+
+ /** Synchronization semaphore name. */
+ private static final String syncName = IgniteSemaphoreExample.class.getSimpleName();
+
+ public static void main(String[] args) {
+ try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+ System.out.println();
+ System.out.println(">>> Cache atomic semaphore example started.");
+
+ // Initialize semaphore.
+ IgniteSemaphore syncSemaphore = ignite.semaphore(syncName,0,false,true);
+
+ // Make name of semaphore.
+ final String semaphoreName = UUID.randomUUID().toString();
+
+ // Make name of mutex
+ final String mutexName = UUID.randomUUID().toString();
+
+ // Make shared resource
+ final String resourceName = UUID.randomUUID().toString();
+ IgniteCache<String, Queue<String>> cache = ignite.getOrCreateCache(CACHE_NAME);
+ cache.put(resourceName, new LinkedList<>());
+
+ // Initialize semaphore.
+ IgniteSemaphore semaphore = ignite.semaphore(semaphoreName, 0, false, true);
+
+ // Initialize mutex.
+ IgniteSemaphore mutex = ignite.semaphore(mutexName,1,false,true);
+
+ // Start consumers on all cluster nodes.
+ for (int i = 0; i < NUM_CONSUMERS; i++)
+ ignite.compute().withAsync().run(new Consumer(mutexName, semaphoreName, resourceName));
+
+ // Start producers on all cluster nodes.
+ for(int i = 0; i < NUM_PRODUCERS; i++)
+ ignite.compute().withAsync().run(new Producer(mutexName, semaphoreName, resourceName));
+
+ System.out.println("Master node is waiting for all other nodes to finish...");
+
+ // Wait for everyone to finish.
+ syncSemaphore.acquire(NUM_CONSUMERS + NUM_PRODUCERS);
+ }
+
+ System.out.flush();
+ System.out.println();
+ System.out.println("Finished semaphore example...");
+ System.out.println("Check all nodes for output (this node is also part of the cluster).");
+ }
+
+ /**
+ * Closure which simply waits on the latch on all nodes.
+ */
+ private abstract static class SemaphoreExampleClosure implements IgniteRunnable {
+ /** Semaphore name. */
+ protected final String semaphoreName;
+
+ /** Mutex name. */
+ protected final String mutexName;
+
+ /** Resource name. */
+ protected final String resourceName;
+
+ /**
+ * @param mutexName Mutex name.
+ * @param semaphoreName Semaphore name.
+ * @param resourceName Resource name.
+ */
+ SemaphoreExampleClosure(String mutexName, String semaphoreName, String resourceName) {
+ this.semaphoreName = semaphoreName;
+ this.mutexName = mutexName;
+ this.resourceName = resourceName;
+ }
+ }
+
+ /**
+ * Closure which simply signals the semaphore.
+ */
+ private static class Producer extends SemaphoreExampleClosure {
+
+ /**
+ * @param mutexName Mutex name.
+ * @param semaphoreName Semaphore name.
+ * @param resourceName Resource name.
+ */
+ public Producer(String mutexName, String semaphoreName, String resourceName) {
+ super(mutexName, semaphoreName, resourceName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
+ IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName,0,true,true);
+
+ for(int i=0;i<ITEM_COUNT;i++) {
+ mutex.acquire();
+
+ Queue<String> queue = (Queue<String>) Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+ queue.add(Ignition.ignite().cluster().localNode().id().toString());
+ Ignition.ignite().cache(CACHE_NAME).put(resourceName, queue);
+ System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] produced data. Available: " + semaphore.availablePermits());
+
+ mutex.release();
+
+ semaphore.release();
+ }
+
+ System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");
+ IgniteSemaphore sync = Ignition.ignite().semaphore(syncName, 0, true, true);
+ sync.release();
+ }
+ }
+
+ /**
+ * Closure which simply waits on semaphore.
+ */
+ private static class Consumer extends SemaphoreExampleClosure {
+
+ /**
+ * @param mutexName Mutex name.
+ * @param semaphoreName Semaphore name.
+ * @param resourceName Resource name.
+ */
+ public Consumer(String mutexName, String semaphoreName, String resourceName) {
+ super(mutexName, semaphoreName, resourceName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
+ IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName,0,true,true);
+
+ for(int i=0;i<ITEM_COUNT;i++) {
+ semaphore.acquire();
+
+ mutex.acquire();
+
+ Queue<String> queue = (Queue<String>) Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+ String data = queue.remove();
+ Ignition.ignite().cache(CACHE_NAME).put(resourceName, queue);
+ System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] consumed data generated by producer [nodeId=" + data + "]");
+
+ mutex.release();
+ }
+
+ System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");
+ IgniteSemaphore sync = Ignition.ignite().semaphore(syncName, 3, true, true);
+ sync.release();
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/Ignite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index 0afccd0..dffb126 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -420,6 +420,20 @@ public interface Ignite extends AutoCloseable {
throws IgniteException;
/**
+ * Gets or creates semaphore. If semaphore is not found in cache and {@code create} flag
+ * is {@code true}, it is created using provided name and count parameter.
+ *
+ * @param name Name of the semaphore.
+ * @param cnt Count for new semaphore creation. Ignored if {@code create} flag is {@code false}.
+ * @param fair {@code True} to enable fairness.
+ * @param create Boolean flag indicating whether data structure should be created if does not exist.
+ * @return Semaphore for the given name.
+ * @throws IgniteException If semaphore could not be fetched or created.
+ */
+ public IgniteSemaphore semaphore(String name, int cnt, boolean fair, boolean create)
+ throws IgniteException;
+
+ /**
* Will get a named queue from cache and create one if it has not been created yet and {@code cfg} is not
* {@code null}.
* If queue is present already, queue properties will not be changed. Use
http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
new file mode 100644
index 0000000..0e29d00
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
@@ -0,0 +1,390 @@
+package org.apache.ignite;
+
+import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This interface provides a rich API for working with distributed semaphore.
+ * <p>
+ * <h1 class="header">Functionality</h1>
+ * Distributed semaphore provides functionality similar to {@code java.util.concurrent.Semaphore}.
+ * <h1 class="header">Creating Distributed Semaphore</h1>
+ * Instance of cache semaphore can be created by calling the following method:
+ * {@link Ignite#semaphore(String, int, boolean, boolean)}.
+ *
+ * @author Vladisav Jelisavcic
+ */
+public interface IgniteSemaphore extends Closeable{
+ /**
+ * Gets name of the semaphore.
+ *
+ * @return Name of the semaphore.
+ */
+ public String name();
+
+ /**
+ * Acquires a permit from this semaphore, blocking until one is
+ * available, or the thread is {@linkplain Thread#interrupt interrupted}.
+ *
+ * <p>Acquires a permit, if one is available and returns immediately,
+ * reducing the number of available permits by one.
+ *
+ * <p>If no permit is available then the current thread becomes
+ * disabled for thread scheduling purposes and lies dormant until
+ * one of two things happens:
+ * <ul>
+ * <li>Some other thread invokes the {@link #release} method for this
+ * semaphore and the current thread is next to be assigned a permit; or
+ * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+ * the current thread.
+ * </ul>
+ *
+ * <p>If the current thread:
+ * <ul>
+ * <li>has its interrupted status set on entry to this method; or
+ * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+ * for a permit,
+ * </ul>
+ * then {@link IgniteInterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.
+ *
+ * @throws IgniteInterruptedException if the current thread is interrupted
+ */
+ public void acquire() throws IgniteInterruptedException;
+
+ /**
+ * Acquires a permit from this semaphore, blocking until one is
+ * available.
+ *
+ * <p>Acquires a permit, if one is available and returns immediately,
+ * reducing the number of available permits by one.
+ *
+ * <p>If no permit is available then the current thread becomes
+ * disabled for thread scheduling purposes and lies dormant until
+ * some other thread invokes the {@link #release} method for this
+ * semaphore and the current thread is next to be assigned a permit.
+ *
+ * <p>If the current thread is {@linkplain Thread#interrupt interrupted}
+ * while waiting for a permit then it will continue to wait, but the
+ * time at which the thread is assigned a permit may change compared to
+ * the time it would have received the permit had no interruption
+ * occurred. When the thread does return from this method its interrupt
+ * status will be set.
+ */
+ public void acquireUninterruptibly();
+
+ /**
+ * Acquires a permit from this semaphore, only if one is available at the
+ * time of invocation.
+ *
+ * <p>Acquires a permit, if one is available and returns immediately,
+ * with the value {@code true},
+ * reducing the number of available permits by one.
+ *
+ * <p>If no permit is available then this method will return
+ * immediately with the value {@code false}.
+ *
+ * <p>Even when this semaphore has been set to use a
+ * fair ordering policy, a call to {@code tryAcquire()} <em>will</em>
+ * immediately acquire a permit if one is available, whether or not
+ * other threads are currently waiting.
+ * This "barging" behavior can be useful in certain
+ * circumstances, even though it breaks fairness. If you want to honor
+ * the fairness setting, then use
+ * {@link #tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit.SECONDS) }
+ * which is almost equivalent (it also detects interruption).
+ *
+ * @return {@code true} if a permit was acquired and {@code false}
+ * otherwise
+ */
+ public boolean tryAcquire();
+
+ /**
+ * Acquires a permit from this semaphore, if one becomes available
+ * within the given waiting time and the current thread has not
+ * been {@linkplain Thread#interrupt interrupted}.
+ *
+ * <p>Acquires a permit, if one is available and returns immediately,
+ * with the value {@code true},
+ * reducing the number of available permits by one.
+ *
+ * <p>If no permit is available then the current thread becomes
+ * disabled for thread scheduling purposes and lies dormant until
+ * one of three things happens:
+ * <ul>
+ * <li>Some other thread invokes the {@link #release} method for this
+ * semaphore and the current thread is next to be assigned a permit; or
+ * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+ * the current thread; or
+ * <li>The specified waiting time elapses.
+ * </ul>
+ *
+ * <p>If a permit is acquired then the value {@code true} is returned.
+ *
+ * <p>If the current thread:
+ * <ul>
+ * <li>has its interrupted status set on entry to this method; or
+ * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+ * to acquire a permit,
+ * </ul>
+ * then {@link IgniteInterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.
+ *
+ * <p>If the specified waiting time elapses then the value {@code false}
+ * is returned. If the time is less than or equal to zero, the method
+ * will not wait at all.
+ *
+ * @param timeout the maximum time to wait for a permit
+ * @param unit the time unit of the {@code timeout} argument
+ * @return {@code true} if a permit was acquired and {@code false}
+ * if the waiting time elapsed before a permit was acquired
+ * @throws IgniteInterruptedException if the current thread is interrupted
+ */
+ public boolean tryAcquire(long timeout, TimeUnit unit)
+ throws IgniteInterruptedException;
+
+ /**
+ * Acquires the given number of permits from this semaphore,
+ * blocking until all are available.
+ *
+ * <p>Acquires the given number of permits, if they are available,
+ * and returns immediately, reducing the number of available permits
+ * by the given amount.
+ *
+ * <p>If insufficient permits are available then the current thread becomes
+ * disabled for thread scheduling purposes and lies dormant until
+ * some other thread invokes one of the {@link #release() release}
+ * methods for this semaphore, the current thread is next to be assigned
+ * permits and the number of available permits satisfies this request.
+ *
+ * <p>If the current thread is {@linkplain Thread#interrupt interrupted}
+ * while waiting for permits then it will continue to wait and its
+ * position in the queue is not affected. When the thread does return
+ * from this method its interrupt status will be set.
+ *
+ * @param permits the number of permits to acquire
+ * @throws IllegalArgumentException if {@code permits} is negative
+ */
+ public void acquireUninterruptibly(int permits);
+
+
+ /**
+ * Returns the current number of permits available in this semaphore.
+ *
+ * <p>This method is typically used for debugging and testing purposes.
+ *
+ * @return the number of permits available in this semaphore
+ */
+ public int availablePermits();
+
+ /**
+ * Acquires and returns all permits that are immediately available.
+ *
+ * @return the number of permits acquired
+ */
+ public int drainPermits();
+
+ /**
+ * Releases a permit, returning it to the semaphore.
+ *
+ * <p>Releases a permit, increasing the number of available permits by
+ * one. If any threads are trying to acquire a permit, then one is
+ * selected and given the permit that was just released. That thread
+ * is (re)enabled for thread scheduling purposes.
+ *
+ * <p>There is no requirement that a thread that releases a permit must
+ * have acquired that permit by calling {@link #acquire}.
+ * Correct usage of a semaphore is established by programming convention
+ * in the application.
+ */
+ public void release();
+
+ /**
+ * Acquires the given number of permits from this semaphore, if all
+ * become available within the given waiting time and the current
+ * thread has not been {@linkplain Thread#interrupt interrupted}.
+ *
+ * <p>Acquires the given number of permits, if they are available and
+ * returns immediately, with the value {@code true},
+ * reducing the number of available permits by the given amount.
+ *
+ * <p>If insufficient permits are available then
+ * the current thread becomes disabled for thread scheduling
+ * purposes and lies dormant until one of three things happens:
+ * <ul>
+ * <li>Some other thread invokes one of the {@link #release() release}
+ * methods for this semaphore, the current thread is next to be assigned
+ * permits and the number of available permits satisfies this request; or
+ * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+ * the current thread; or
+ * <li>The specified waiting time elapses.
+ * </ul>
+ *
+ * <p>If the permits are acquired then the value {@code true} is returned.
+ *
+ * <p>If the current thread:
+ * <ul>
+ * <li>has its interrupted status set on entry to this method; or
+ * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+ * to acquire the permits,
+ * </ul>
+ * then {@link IgniteInterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.
+ * Any permits that were to be assigned to this thread, are instead
+ * assigned to other threads trying to acquire permits, as if
+ * the permits had been made available by a call to {@link #release()}.
+ *
+ * <p>If the specified waiting time elapses then the value {@code false}
+ * is returned. If the time is less than or equal to zero, the method
+ * will not wait at all. Any permits that were to be assigned to this
+ * thread, are instead assigned to other threads trying to acquire
+ * permits, as if the permits had been made available by a call to
+ * {@link #release()}.
+ *
+ * @param permits the number of permits to acquire
+ * @param timeout the maximum time to wait for the permits
+ * @param unit the time unit of the {@code timeout} argument
+ * @return {@code true} if all permits were acquired and {@code false}
+ * if the waiting time elapsed before all permits were acquired
+ * @throws IgniteInterruptedException if the current thread is interrupted
+ * @throws IllegalArgumentException if {@code permits} is negative
+ */
+ public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
+ throws IgniteInterruptedException;
+
+ /**
+ * Acquires the given number of permits from this semaphore, only
+ * if all are available at the time of invocation.
+ *
+ * <p>Acquires the given number of permits, if they are available, and
+ * returns immediately, with the value {@code true},
+ * reducing the number of available permits by the given amount.
+ *
+ * <p>If insufficient permits are available then this method will return
+ * immediately with the value {@code false} and the number of available
+ * permits is unchanged.
+ *
+ * <p>Even when this semaphore has been set to use a fair ordering
+ * policy, a call to {@code tryAcquire} <em>will</em>
+ * immediately acquire a permit if one is available, whether or
+ * not other threads are currently waiting. This
+ * "barging" behavior can be useful in certain
+ * circumstances, even though it breaks fairness. If you want to
+ * honor the fairness setting, then use {@link #tryAcquire(int,
+ * long, TimeUnit) tryAcquire(permits, 0, TimeUnit.SECONDS) }
+ * which is almost equivalent (it also detects interruption).
+ *
+ * @param permits the number of permits to acquire
+ * @return {@code true} if the permits were acquired and
+ * {@code false} otherwise
+ * @throws IllegalArgumentException if {@code permits} is negative
+ */
+ public boolean tryAcquire(int permits);
+
+ /**
+ * Acquires the given number of permits from this semaphore,
+ * blocking until all are available,
+ * or the thread is {@linkplain Thread#interrupt interrupted}.
+ *
+ * <p>Acquires the given number of permits, if they are available,
+ * and returns immediately, reducing the number of available permits
+ * by the given amount.
+ *
+ * <p>If insufficient permits are available then the current thread becomes
+ * disabled for thread scheduling purposes and lies dormant until
+ * one of two things happens:
+ * <ul>
+ * <li>Some other thread invokes one of the {@link #release() release}
+ * methods for this semaphore, the current thread is next to be assigned
+ * permits and the number of available permits satisfies this request; or
+ * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+ * the current thread.
+ * </ul>
+ *
+ * <p>If the current thread:
+ * <ul>
+ * <li>has its interrupted status set on entry to this method; or
+ * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+ * for a permit,
+ * </ul>
+ * then {@link IgniteInterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.
+ * Any permits that were to be assigned to this thread are instead
+ * assigned to other threads trying to acquire permits, as if
+ * permits had been made available by a call to {@link #release()}.
+ *
+ * @param permits the number of permits to acquire
+ * @throws IgniteInterruptedException if the current thread is interrupted
+ * @throws IllegalArgumentException if {@code permits} is negative
+ */
+ public void acquire(int permits) throws IgniteInterruptedException;
+
+ /**
+ * Releases the given number of permits, returning them to the semaphore.
+ *
+ * <p>Releases the given number of permits, increasing the number of
+ * available permits by that amount.
+ * If any threads are trying to acquire permits, then one
+ * is selected and given the permits that were just released.
+ * If the number of available permits satisfies that thread's request
+ * then that thread is (re)enabled for thread scheduling purposes;
+ * otherwise the thread will wait until sufficient permits are available.
+ * If there are still permits available
+ * after this thread's request has been satisfied, then those permits
+ * are assigned in turn to other threads trying to acquire permits.
+ *
+ * <p>There is no requirement that a thread that releases a permit must
+ * have acquired that permit by calling {@link IgniteSemaphore#acquire acquire}.
+ * Correct usage of a semaphore is established by programming convention
+ * in the application.
+ *
+ * @param permits the number of permits to release
+ * @throws IllegalArgumentException if {@code permits} is negative
+ */
+ public void release(int permits);
+
+ /**
+ * Returns {@code true} if this semaphore has fairness set true.
+ *
+ * @return {@code true} if this semaphore has fairness set true
+ */
+ public boolean isFair();
+
+ /**
+ * Queries whether any threads are waiting to acquire. Note that
+ * because cancellations may occur at any time, a {@code true}
+ * return does not guarantee that any other thread will ever
+ * acquire. This method is designed primarily for use in
+ * monitoring of the system state.
+ *
+ * @return {@code true} if there may be other threads waiting to
+ * acquire the lock
+ */
+ public boolean hasQueuedThreads();
+
+ /**
+ * Returns an estimate of the number of threads waiting to acquire.
+ * The value is only an estimate because the number of threads may
+ * change dynamically while this method traverses internal data
+ * structures. This method is designed for use in monitoring of the
+ * system state, not for synchronization control.
+ *
+ * @return the estimated number of threads waiting for this lock
+ */
+ public int getQueueLength();
+
+ /**
+ * Gets {@code removed} status of the semaphore.
+ *
+ * @return {@code True} if semaphore was removed from cache, {@code false} otherwise.
+ */
+ public boolean removed();
+
+ /**
+ * Removes this semaphore.
+ *
+ * @throws IgniteException If operation failed.
+ */
+ @Override public void close();
+}
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index c02dc59..756278a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteException;
@@ -2890,6 +2891,24 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
/** {@inheritDoc} */
+ @Nullable @Override public IgniteSemaphore semaphore(String name,
+ int cnt,
+ boolean fair,
+ boolean create) {
+ guard();
+
+ try {
+ return ctx.dataStructures().semaphore(name, cnt, fair, create);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public <T> IgniteQueue<T> queue(String name,
int cap,
CollectionConfiguration cfg)
http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index ef2c543..220dec2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -42,6 +42,7 @@ import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteAtomicStamped;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteQueue;
@@ -89,6 +90,7 @@ import static org.apache.ignite.internal.processors.datastructures.DataStructure
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.COUNT_DOWN_LATCH;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.QUEUE;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SET;
+import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SEMAPHORE;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -131,6 +133,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
/** Cache contains only {@code GridCacheCountDownLatchValue}. */
private IgniteInternalCache<GridCacheInternalKey, GridCacheCountDownLatchValue> cntDownLatchView;
+ /** Cache contains only {@code GridCacheSemaphoreState}. */
+ private IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semaphoreView;
+
/** Cache contains only {@code GridCacheAtomicReferenceValue}. */
private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicReferenceValue> atomicRefView;
@@ -187,6 +192,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
cntDownLatchView = atomicsCache;
+ semaphoreView = atomicsCache;
+
atomicLongView = atomicsCache;
atomicRefView = atomicsCache;
@@ -1176,6 +1183,133 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
}
/**
+ * Gets or creates semaphore. If semaphore is not found in cache,
+ * it is created using provided name and count parameter.
+ *
+ * @param name Name of the semaphore.
+ * @param cnt Initial count.
+ * @param fair {@code True} Fairness parameter.
+ * @param create If {@code true} semaphore will be created in case it is not in cache,
+ * if it is {@code false} all parameters except {@code name} are ignored.
+ * @return Semaphore for the given name or {@code null} if it is not found and
+ * {@code create} is false.
+ * @throws IgniteCheckedException If operation failed.
+ */
+ public IgniteSemaphore semaphore(final String name,
+ final int cnt,
+ final boolean fair,
+ final boolean create)
+ throws IgniteCheckedException
+ {
+ A.notNull(name, "name");
+
+ awaitInitialization();
+
+ if (create)
+ A.ensure(cnt >= 0, "count can not be negative");
+
+ checkAtomicsConfiguration();
+
+ startQuery();
+
+ return getAtomic(new IgniteOutClosureX<IgniteSemaphore>() {
+ @Override public IgniteSemaphore applyx() throws IgniteCheckedException {
+ GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
+
+ dsCacheCtx.gate().enter();
+
+ try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class);
+
+ // Check that count down hasn't been created in other thread yet.
+ GridCacheSemaphoreEx semaphore = cast(dsMap.get(key), GridCacheSemaphoreEx.class);
+
+ if (semaphore != null) {
+ assert val != null;
+
+ return semaphore;
+ }
+
+ if (val == null && !create)
+ return null;
+
+ if (val == null) {
+ val = new GridCacheSemaphoreState(cnt, 0);
+
+ dsView.put(key, val);
+ }
+
+ semaphore = new GridCacheSemaphoreImpl(name, val.getCnt(),
+ fair,
+ key,
+ semaphoreView,
+ dsCacheCtx);
+
+ dsMap.put(key, semaphore);
+
+ tx.commit();
+
+ return semaphore;
+ }
+ catch (Error | Exception e) {
+ dsMap.remove(key);
+
+ U.error(log, "Failed to create count down latch: " + name, e);
+
+ throw e;
+ }
+ finally {
+ dsCacheCtx.gate().leave();
+ }
+ }
+ }, new DataStructureInfo(name, SEMAPHORE, null), create, GridCacheSemaphoreEx.class);
+ }
+
+ /**
+ * Removes semaphore from cache.
+ *
+ * @param name Name of the semaphore.
+ * @throws IgniteCheckedException If operation failed.
+ */
+ public void removeSemaphore(final String name) throws IgniteCheckedException {
+ assert name != null;
+ assert dsCacheCtx != null;
+
+ awaitInitialization();
+
+ removeDataStructure(new IgniteOutClosureX<Void>() {
+ @Override
+ public Void applyx() throws IgniteCheckedException {
+ GridCacheInternal key = new GridCacheInternalKeyImpl(name);
+
+ dsCacheCtx.gate().enter();
+
+ try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ // Check correctness type of removable object.
+ GridCacheSemaphoreState val =
+ cast(dsView.get(key), GridCacheSemaphoreState.class);
+
+ if (val != null) {
+ if (val.getCnt() < 0) {
+ throw new IgniteCheckedException("Failed to remove semaphore " +
+ "with blocked threads. ");
+ }
+
+ dsView.remove(key);
+
+ tx.commit();
+ } else
+ tx.setRollbackOnly();
+
+ return null;
+ } finally {
+ dsCacheCtx.gate().leave();
+ }
+ }
+ }, name, SEMAPHORE, null);
+ }
+
+ /**
* Remove internal entry by key from cache.
*
* @param key Internal entry key.
@@ -1222,7 +1356,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED)
- return evt.getValue() instanceof GridCacheCountDownLatchValue;
+ return evt.getValue() instanceof GridCacheCountDownLatchValue ||
+ evt.getValue() instanceof GridCacheSemaphoreState;
else {
assert evt.getEventType() == EventType.REMOVED : evt;
@@ -1297,6 +1432,25 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
", actual=" + latch.getClass() + ", value=" + latch + ']');
}
}
+ else if(val0 instanceof GridCacheSemaphoreState) {
+ GridCacheInternalKey key = evt.getKey();
+
+ // Notify semaphore on changes.
+ final GridCacheRemovable semaphore = dsMap.get(key);
+
+ GridCacheSemaphoreState val = (GridCacheSemaphoreState)val0;
+
+ if (semaphore instanceof GridCacheSemaphoreEx) {
+ final GridCacheSemaphoreEx semaphore0 = (GridCacheSemaphoreEx)semaphore;
+
+ semaphore0.onUpdate(val);
+ }
+ else if (semaphore != null) {
+ U.error(log, "Failed to cast object " +
+ "[expected=" + IgniteSemaphore.class.getSimpleName() +
+ ", actual=" + semaphore.getClass() + ", value=" + semaphore + ']');
+ }
+ }
}
else {
@@ -1514,7 +1668,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
QUEUE(IgniteQueue.class.getSimpleName()),
/** */
- SET(IgniteSet.class.getSimpleName());
+ SET(IgniteSet.class.getSimpleName()),
+
+ /** */
+ SEMAPHORE(IgniteSemaphore.class.getSimpleName());
/** */
private static final DataStructureType[] VALS = values();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
new file mode 100644
index 0000000..0f939d5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
@@ -0,0 +1,22 @@
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.IgniteSemaphore;
+
+/**
+ * Created by vladisav on 20.9.15..
+ */
+public interface GridCacheSemaphoreEx extends IgniteSemaphore, GridCacheRemovable {
+ /**
+ * Get current semaphore key.
+ *
+ * @return Semaphore key.
+ */
+ public GridCacheInternalKey key();
+
+ /**
+ * Callback to notify semaphore on changes.
+ *
+ * @param val Id of the caller and number of permissions to acquire (or release; can be negative).
+ */
+ public void onUpdate(GridCacheSemaphoreState val);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
new file mode 100644
index 0000000..24c3ec5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -0,0 +1,619 @@
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Cache semaphore implementation based on AbstractQueuedSynchronizer.
+ * Current implementation supports only unfair and locally fair modes.
+ * When fairness set false, this class makes no guarantees about the order in which threads acquire permits.
+ * When fairness is set true, the semaphore only guarantees that local threads invoking any of the acquire methods
+ * are selected to obtain permits in the order in which their invocation of those methods was processed (FIFO).
+ *
+ * @author Vladisav Jelisavcic
+ */
+public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Deserialization stash. */
+ private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
+ new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
+ @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
+ return F.t2();
+ }
+ };
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** Semaphore name. */
+ private String name;
+
+ /** Removed flag.*/
+ private volatile boolean rmvd;
+
+ /** Semaphore key. */
+ private GridCacheInternalKey key;
+
+ /** Semaphore projection. */
+ private IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semaphoreView;
+
+ /** Cache context. */
+ private GridCacheContext ctx;
+
+ /** Fairness flag. */
+ private boolean isFair;
+
+ /** Initial count. */
+ private transient final int initCnt;
+
+ /** Initialization guard. */
+ private final AtomicBoolean initGuard = new AtomicBoolean();
+
+ /** Initialization latch. */
+ private final CountDownLatch initLatch = new CountDownLatch(1);
+
+ /** Internal synchronization object. */
+ private Sync sync;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridCacheSemaphoreImpl() {
+ // No-op.
+ initCnt = 0;
+ }
+
+ /**
+ * Synchronization implementation for semaphore. Uses AQS state
+ * to represent permits. Subclassed into fair and nonfair
+ * versions.
+ */
+ abstract class Sync extends AbstractQueuedSynchronizer {
+ private static final long serialVersionUID = 1192457210091910933L;
+
+ protected ConcurrentMap<Thread,Integer> threadMap;
+ protected int totalWaiters;
+
+ Sync(int permits) {
+ setState(permits);
+ threadMap = new ConcurrentHashMap<>();
+ }
+
+ protected synchronized void setWaiters(int waiters){
+ totalWaiters = waiters;
+ }
+
+ public int getWaiters() {
+ return totalWaiters;
+ }
+
+ final synchronized void setPermits(int permits){
+ setState(permits);
+ }
+
+ final int getPermits() {
+ return getState();
+ }
+
+ final int nonfairTryAcquireShared(int acquires) {
+ for (;;) {
+ int available = getState();
+ int remaining = available - acquires;
+
+ if (remaining < 0 || compareAndSetGlobalState(available, remaining)) {
+ if(remaining < 0){
+ if(!threadMap.containsKey(Thread.currentThread()))
+ getAndIncWaitingCount();
+ }
+
+ return remaining;
+ }
+ }
+ }
+
+ protected final boolean tryReleaseShared(int releases) {
+ // Check if some other node updated the state.
+ // This method is called with release==0 only when trying to wake through update.
+ if(releases == 0)
+ return true;
+
+ for (;;) {
+ int current = getState();
+ int next = current + releases;
+ if (next < current) // overflow
+ throw new Error("Maximum permit count exceeded");
+ if (compareAndSetGlobalState(current, next))
+ return true;
+ }
+ }
+
+ final void reducePermits(int reductions) {
+ for (;;) {
+ int current = getState();
+ int next = current - reductions;
+ if (next > current) // underflow
+ throw new Error("Permit count underflow");
+ if (compareAndSetGlobalState(current, next))
+ return;
+ }
+ }
+
+ final int drainPermits() {
+ for (;;) {
+ int current = getState();
+ if (current == 0 || compareAndSetGlobalState(current, 0))
+ return current;
+ }
+ }
+
+ protected void getAndIncWaitingCount() {
+ try {
+ CU.outTx(
+ retryTopologySafe(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheSemaphoreState val = semaphoreView.get(key);
+
+ if (val == null)
+ throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
+
+ int waiting = val.getWaiters();
+ sync.threadMap.put(Thread.currentThread(), waiting);
+
+ waiting++;
+ val.setWaiters(waiting);
+ semaphoreView.put(key, val);
+ tx.commit();
+
+ return true;
+ } catch (Error | Exception e) {
+ U.error(log, "Failed to compare and set: " + this, e);
+
+ throw e;
+ }
+ }
+ }),
+ ctx
+ );
+ } catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ protected boolean compareAndSetGlobalState(final int expVal, final int newVal) {
+ try {
+ return CU.outTx(
+ retryTopologySafe(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheSemaphoreState val = semaphoreView.get(key);
+
+ if (val == null)
+ throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
+
+ boolean retVal = val.getCnt() == expVal;
+
+ if (retVal) {
+ /* If current thread is queued, than this call is the call that is going to be unblocked. */
+ if(sync.isQueued(Thread.currentThread())) {
+
+ int waiting = val.getWaiters() - 1;
+ val.setWaiters(waiting);
+
+ sync.threadMap.remove(Thread.currentThread());
+ }
+
+ val.setCnt(newVal);
+
+ semaphoreView.put(key, val);
+ tx.commit();
+ }
+
+ return retVal;
+ } catch (Error | Exception e) {
+ U.error(log, "Failed to compare and set: " + this, e);
+
+ throw e;
+ }
+ }
+ }),
+ ctx
+ );
+ } catch( IgniteCheckedException e){
+ throw U.convertException(e);
+ }
+ }
+ }
+
+ /**
+ * NonFair version
+ */
+ final class NonfairSync extends Sync {
+ private static final long serialVersionUID = 7983135489326435495L;
+
+ NonfairSync(int permits) {
+ super(permits);
+ }
+
+ protected int tryAcquireShared(int acquires) {
+ return nonfairTryAcquireShared(acquires);
+ }
+ }
+
+ /**
+ * Fair version
+ */
+ final class FairSync extends Sync {
+ private static final long serialVersionUID = 3468129658421667L;
+
+ FairSync(int permits) {
+ super(permits);
+ }
+
+ protected int tryAcquireShared(int acquires) {
+ for (;;) {
+ if (hasQueuedPredecessors())
+ return -1;
+
+ int available = getState();
+ int remaining = available - acquires;
+
+ if (remaining < 0 || compareAndSetGlobalState(available, remaining)) {
+ if(remaining < 0){
+ if(!threadMap.containsKey(Thread.currentThread()))
+ getAndIncWaitingCount();
+ }
+ return remaining;
+ }
+ }
+ }
+
+
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param name Semaphore name.
+ * @param initCnt Initial count.
+ * @param key Semaphore key.
+ * @param semaphoreView Semaphore projection.
+ * @param ctx Cache context.
+ */
+ public GridCacheSemaphoreImpl(String name,
+ int initCnt,
+ boolean fair,
+ GridCacheInternalKey key,
+ IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semaphoreView,
+ GridCacheContext ctx)
+ {
+ assert name != null;
+ assert key != null;
+ assert semaphoreView != null;
+ assert ctx != null;
+
+ this.name = name;
+ this.initCnt = initCnt;
+ this.key = key;
+ this.semaphoreView = semaphoreView;
+ this.ctx = ctx;
+ this.isFair = fair;
+
+ log = ctx.logger(getClass());
+ }
+
+ /**
+ * @throws IgniteCheckedException If operation failed.
+ */
+ private void initializeSemaphore() throws IgniteCheckedException {
+ if (initGuard.compareAndSet(false, true)) {
+ try {
+ sync = CU.outTx(
+ retryTopologySafe(new Callable<Sync>() {
+ @Override
+ public Sync call() throws Exception {
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheSemaphoreState val = semaphoreView.get(key);
+
+ if (val == null) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to find semaphore with given name: " + name);
+
+ return null;
+ }
+
+ final int count = val.getCnt();
+ tx.commit();
+
+ return val.isFair() ? new FairSync(count) : new NonfairSync(count);
+ }
+ }
+ }),
+ ctx
+ );
+ if (log.isDebugEnabled())
+ log.debug("Initialized internal sync structure: " + sync);
+ }
+ finally {
+ initLatch.countDown();
+ }
+ }
+ else {
+ U.await(initLatch);
+
+ if (sync == null)
+ throw new IgniteCheckedException("Internal latch has not been properly initialized.");
+ }
+ }
+
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheInternalKey key() { return key; }
+
+ /** {@inheritDoc} */
+ @Override public boolean removed(){ return rmvd; }
+
+ /** {@inheritDoc} */
+ @Override public boolean onRemoved() {
+ return rmvd = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdate(GridCacheSemaphoreState val) {
+ if(sync == null)
+ return;
+
+ // Update permission count.
+ sync.setPermits(val.getCnt());
+
+ // Update waiters count.
+ sync.setWaiters(val.getWaiters());
+
+ // Try to notify any waiting threads.
+ sync.releaseShared(0);
+ }
+
+ @Override
+ public void needCheckNotRemoved() {
+ // No-op.
+ }
+
+ @Override
+ public void acquire() throws IgniteException {
+ acquire(1);
+ }
+
+ @Override
+ public void acquire(int permits) throws IgniteInterruptedException {
+ A.ensure(permits >= 0, "Number of permits must be non-negative.");
+ try {
+ initializeSemaphore();
+ sync.acquireSharedInterruptibly(permits);
+
+ } catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ } catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+ }
+
+
+ @Override
+ public void acquireUninterruptibly() {
+ try {
+ initializeSemaphore();
+ sync.acquireShared(1);
+
+ } catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ @Override
+ public void acquireUninterruptibly(int permits) {
+ A.ensure(permits >= 0, "Number of permits must be non-negative.");
+ try {
+ initializeSemaphore();
+ sync.acquireShared(permits);
+
+ } catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ @Override
+ public int availablePermits(){
+ int ret;
+ try {
+ initializeSemaphore();
+ ret = CU.outTx(
+ retryTopologySafe(new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheSemaphoreState val = semaphoreView.get(key);
+
+ if (val == null)
+ throw new IgniteException("Failed to find semaphore with given name: " + name);
+
+ int count = val.getCnt();
+ tx.rollback();
+
+ return count;
+ }
+ }
+ }),
+ ctx
+ );
+ } catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ return ret;
+ }
+
+ @Override
+ public int drainPermits() {
+ try {
+ initializeSemaphore();
+ return sync.drainPermits();
+
+ } catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ @Override
+ public boolean tryAcquire() {
+ try {
+ initializeSemaphore();
+ return sync.nonfairTryAcquireShared(1) >= 0;
+
+ } catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ @Override
+ public boolean tryAcquire(long timeout, TimeUnit unit) throws IgniteException {
+ try {
+ initializeSemaphore();
+ return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+
+ } catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ } catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+ }
+
+ @Override
+ public void release() {
+ release(1);
+ }
+
+ @Override
+ public void release(int permits) {
+ A.ensure(permits >= 0, "Number of permits must be non-negative.");
+ try {
+ initializeSemaphore();
+ sync.releaseShared(permits);
+
+ } catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ @Override
+ public boolean tryAcquire(int permits) {
+ A.ensure(permits >= 0, "Number of permits must be non-negative.");
+ try {
+ initializeSemaphore();
+ return sync.nonfairTryAcquireShared(permits) >= 0;
+
+ } catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ @Override
+ public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws IgniteInterruptedException {
+ A.ensure(permits >= 0, "Number of permits must be non-negative.");
+ try {
+ initializeSemaphore();
+ return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
+
+ } catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ } catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+ }
+
+ @Override
+ public boolean isFair() {
+ return false;
+ }
+
+ @Override
+ public boolean hasQueuedThreads() {
+ try {
+ initializeSemaphore();
+ return sync.getWaiters()!=0;
+ } catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ @Override
+ public int getQueueLength() {
+ try {
+ initializeSemaphore();
+ return sync.getWaiters();
+ } catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(ctx.kernalContext());
+ out.writeUTF(name);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ IgniteBiTuple<GridKernalContext, String> t = stash.get();
+
+ t.set1((GridKernalContext)in.readObject());
+ t.set2(in.readUTF());
+ }
+
+ @Override
+ public void close() {
+ if (!rmvd) {
+ try {
+ ctx.kernalContext().dataStructures().removeSemaphore(name);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheSemaphoreImpl.class, this);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
new file mode 100644
index 0000000..cf44b7d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
@@ -0,0 +1,128 @@
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+
+/**
+ * Grid cache semaphore state.
+ *
+ * @author Vladisav Jelisavcic
+ */
+public class GridCacheSemaphoreState implements GridCacheInternal, Externalizable, Cloneable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Permission count.
+ */
+ private int cnt;
+
+ /**
+ * Waiter id.
+ */
+ private int waiters;
+
+ /**
+ * Fairness flag.
+ */
+ private boolean fair;
+
+
+ /**
+ * Constructor.
+ *
+ * @param cnt Number of permissions.
+ */
+ public GridCacheSemaphoreState(int cnt, int waiters) {
+ this.cnt = cnt;
+ this.waiters = waiters;
+ this.fair = false;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param cnt Number of permissions.
+ */
+ public GridCacheSemaphoreState(int cnt, int waiters, boolean fair) {
+ this.cnt = cnt;
+ this.waiters = waiters;
+ this.fair = fair;
+ }
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ public GridCacheSemaphoreState() {
+ // No-op.
+ }
+
+ /**
+ * @param cnt New count.
+ */
+ public void setCnt(int cnt) {
+ this.cnt = cnt;
+ }
+
+ /**
+ * @return Current count.
+ */
+ public int getCnt() {
+ return cnt;
+ }
+
+ public int getWaiters() {
+ return waiters;
+ }
+
+ public void setWaiters(int id) {
+ this.waiters = id;
+ }
+
+ public boolean isFair() {
+ return fair;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ return super.clone();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(cnt);
+ out.writeInt(waiters);
+ out.writeBoolean(fair);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void readExternal(ObjectInput in) throws IOException {
+ cnt = in.readInt();
+ waiters = in.readInt();
+ fair = in.readBoolean();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString() {
+ return S.toString(GridCacheSemaphoreState.class, this);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
new file mode 100644
index 0000000..689b647
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
@@ -0,0 +1,115 @@
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Created by vladisav on 20.9.15..
+ */
+public class GridCacheSemaphoreValue implements GridCacheInternal, Externalizable, Cloneable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Permission count.
+ */
+ private int cnt;
+
+ /**
+ * Semaphore ID.
+ */
+ private long semaphoreId;
+
+ /**
+ * Constructor.
+ *
+ * @param cnt Number of permissions.
+ * @param
+ */
+ public GridCacheSemaphoreValue(int cnt, long semaphoreId) {
+ this.cnt = cnt;
+
+ this.semaphoreId = semaphoreId;
+ }
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ public GridCacheSemaphoreValue() {
+ // No-op.
+ }
+
+ /**
+ * @param cnt New count.
+ */
+ public void set(int cnt) {
+ this.cnt = cnt;
+ }
+
+ /**
+ * @return Current count.
+ */
+ public int get() {
+ return cnt;
+ }
+
+ /**
+ * @return true if number of permissions to be added is positive
+ */
+ public boolean isRelease(){
+ return cnt>0;
+ }
+
+ /**
+ * @return true if permission count should be lowered
+ */
+ public boolean isAwait(){
+ return cnt<0;
+ }
+
+ /**
+ * @return Semaphore ID.
+ */
+ public long semaphoreId() {
+ return semaphoreId;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ return super.clone();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(cnt);
+ out.writeLong(semaphoreId);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void readExternal(ObjectInput in) throws IOException {
+ cnt = in.readInt();
+ semaphoreId = in.readLong();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString() {
+ return S.toString(GridCacheSemaphoreValue.class, this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
index 964753d..b633a6a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCluster;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteFileSystem;
@@ -308,6 +309,15 @@ public class IgniteMock implements Ignite {
}
/** {@inheritDoc} */
+ @Nullable @Override public IgniteSemaphore semaphore(String name,
+ int cnt,
+ boolean fair,
+ boolean create)
+ {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public <T> IgniteQueue<T> queue(String name,
int cap,
CollectionConfiguration cfg)
http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
index ec7dab7..495fd6b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
@@ -34,6 +34,7 @@ import org.apache.ignite.IgniteAtomicStamped;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteException;
@@ -536,6 +537,12 @@ public class IgniteProcessProxy implements IgniteEx {
}
/** {@inheritDoc} */
+ @Override public IgniteSemaphore semaphore(String name, int cnt, boolean fair,
+ boolean create) throws IgniteException {
+ throw new UnsupportedOperationException("Operation isn't supported yet.");
+ }
+
+ /** {@inheritDoc} */
@Override public <T> IgniteQueue<T> queue(String name, int cap,
@Nullable CollectionConfiguration cfg) throws IgniteException {
throw new UnsupportedOperationException("Operation isn't supported yet.");
http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
index 42514e3..4867a10 100644
--- a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
+++ b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
@@ -398,6 +398,17 @@ public class IgniteSpringBean implements Ignite, DisposableBean, InitializingBea
}
/** {@inheritDoc} */
+ @Nullable @Override public IgniteSemaphore semaphore(String name,
+ int cnt,
+ boolean fair,
+ boolean create)
+ {
+ assert g != null;
+
+ return g.semaphore(name, cnt, fair, create);
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public <T> IgniteQueue<T> queue(String name,
int cap,
CollectionConfiguration cfg)
[5/6] ignite git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/ignite into master-sem-review-1
Posted by yz...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite into master-sem-review-1
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ad99064e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ad99064e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ad99064e
Branch: refs/heads/ignite-638
Commit: ad99064e87c42d91dfc303d8d87861d2ed003bed
Parents: 8c6852d 0bc1d6f
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri Oct 23 17:40:05 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Oct 23 17:40:05 2015 +0300
----------------------------------------------------------------------
DEVNOTES.txt | 3 +-
RELEASE_NOTES.txt | 2 +
assembly/dependencies-fabric.xml | 2 +-
assembly/dependencies-hadoop.xml | 1 +
assembly/release-fabric-lgpl.xml | 63 +
assembly/release-fabric.xml | 10 +-
assembly/release-hadoop-lgpl.xml | 39 +
examples-lgpl/README.txt | 27 +
examples-lgpl/config/example-cache.xml | 73 +
examples-lgpl/config/example-ignite.xml | 83 ++
examples-lgpl/config/hibernate/README.txt | 8 +
.../hibernate/example-hibernate-L2-cache.xml | 64 +
examples-lgpl/pom-standalone.xml | 186 +++
examples-lgpl/pom.xml | 128 ++
.../hibernate/HibernateL2CacheExample.java | 245 ++++
.../examples/datagrid/hibernate/Post.java | 130 ++
.../examples/datagrid/hibernate/User.java | 154 ++
.../datagrid/hibernate/package-info.java | 22 +
.../hibernate/CacheHibernatePersonStore.java | 122 ++
.../hibernate/CacheHibernateStoreExample.java | 151 ++
.../datagrid/store/hibernate/Person.hbm.xml | 34 +
.../datagrid/store/hibernate/hibernate.cfg.xml | 41 +
.../datagrid/store/hibernate/package-info.java | 22 +
.../misc/schedule/ComputeScheduleExample.java | 82 ++
.../examples/misc/schedule/package-info.java | 22 +
.../misc/schedule/ComputeScheduleExample.java | 68 +
.../java8/misc/schedule/package-info.java | 22 +
.../ignite/examples/java8/package-info.java | 23 +
.../scalar/examples/ScalarScheduleExample.scala | 66 +
...ibernateL2CacheExampleMultiNodeSelfTest.java | 31 +
.../HibernateL2CacheExampleSelfTest.java | 33 +
.../IgniteLgplExamplesSelfTestSuite.java | 48 +
...ibernateL2CacheExampleMultiNodeSelfTest.java | 29 +
.../HibernateL2CacheExampleSelfTest.java | 37 +
.../IgniteLgplExamplesJ8SelfTestSuite.java | 46 +
.../ScalarLgplExamplesMultiNodeSelfTest.scala | 33 +
.../examples/ScalarLgplExamplesSelfTest.scala | 36 +
.../ScalarLgplExamplesSelfTestSuite.scala | 37 +
examples/config/hibernate/README.txt | 8 -
.../hibernate/example-hibernate-L2-cache.xml | 64 -
examples/pom-standalone.xml | 12 -
examples/pom.xml | 12 -
examples/schema-import/pom-standalone.xml | 90 ++
examples/schema-import/pom.xml | 13 +-
.../hibernate/HibernateL2CacheExample.java | 245 ----
.../examples/datagrid/hibernate/Post.java | 130 --
.../examples/datagrid/hibernate/User.java | 154 --
.../datagrid/hibernate/package-info.java | 22 -
.../hibernate/CacheHibernatePersonStore.java | 122 --
.../hibernate/CacheHibernateStoreExample.java | 151 --
.../datagrid/store/hibernate/Person.hbm.xml | 34 -
.../datagrid/store/hibernate/hibernate.cfg.xml | 41 -
.../datagrid/store/hibernate/package-info.java | 22 -
.../misc/schedule/ComputeScheduleExample.java | 82 --
.../examples/misc/schedule/package-info.java | 22 -
.../misc/schedule/ComputeScheduleExample.java | 68 -
.../java8/misc/schedule/package-info.java | 22 -
.../scalar/examples/ScalarScheduleExample.scala | 66 -
...ibernateL2CacheExampleMultiNodeSelfTest.java | 31 -
.../HibernateL2CacheExampleSelfTest.java | 33 -
.../testsuites/IgniteExamplesSelfTestSuite.java | 4 -
...ibernateL2CacheExampleMultiNodeSelfTest.java | 29 -
.../HibernateL2CacheExampleSelfTest.java | 37 -
.../tests/examples/ScalarExamplesSelfTest.scala | 5 -
modules/apache-license-gen/README.txt | 33 +
modules/apache-license-gen/pom.xml | 3 +
.../JettyRestProcessorAbstractSelfTest.java | 252 +++-
.../org/apache/ignite/IgniteFileSystem.java | 2 +
.../apache/ignite/IgniteSystemProperties.java | 3 +
.../org/apache/ignite/IgniteTransactions.java | 4 -
.../discovery/GridDiscoveryManager.java | 57 +-
.../internal/portable/PortableContext.java | 19 +-
.../portable/api/PortableMarshaller.java | 14 +-
.../cache/DynamicCacheDescriptor.java | 17 +
.../processors/cache/GridCacheAdapter.java | 357 +++--
.../cache/GridCacheClearAllRunnable.java | 18 +-
.../cache/GridCacheConcurrentMap.java | 4 +-
.../processors/cache/GridCacheContext.java | 2 +-
.../processors/cache/GridCacheIoManager.java | 29 +-
.../processors/cache/GridCacheMvccManager.java | 20 +-
.../GridCachePartitionExchangeManager.java | 72 +-
.../processors/cache/GridCacheProcessor.java | 57 +-
.../processors/cache/GridCacheProxyImpl.java | 14 +-
.../processors/cache/IgniteCacheProxy.java | 2 +-
.../processors/cache/IgniteInternalCache.java | 19 +-
.../dht/GridClientPartitionTopology.java | 13 +-
.../distributed/dht/GridDhtCacheAdapter.java | 6 +-
.../cache/distributed/dht/GridDhtGetFuture.java | 4 +-
.../distributed/dht/GridDhtLocalPartition.java | 3 +-
.../distributed/dht/GridDhtLockRequest.java | 5 +-
.../dht/GridDhtPartitionTopologyImpl.java | 12 +-
.../dht/GridPartitionedGetFuture.java | 5 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 17 +-
.../colocated/GridDhtColocatedLockFuture.java | 11 +-
.../GridDhtPartitionDemandMessage.java | 4 +-
.../GridDhtPartitionSupplyMessage.java | 3 +-
.../GridDhtPartitionsExchangeFuture.java | 12 +-
.../preloader/GridDhtPartitionsFullMessage.java | 12 +-
.../GridDhtPartitionsSingleMessage.java | 11 +-
.../dht/preloader/GridDhtPreloader.java | 18 +-
.../distributed/near/GridNearCacheAdapter.java | 21 +-
.../near/GridNearCacheClearAllRunnable.java | 9 +-
.../distributed/near/GridNearGetFuture.java | 2 +
.../distributed/near/GridNearLockFuture.java | 11 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 24 +-
.../cache/query/GridCacheQueryManager.java | 33 +-
.../cache/query/GridCacheSqlIndexMetadata.java | 7 +-
.../cache/query/GridCacheSqlMetadata.java | 22 +-
.../datastructures/DataStructuresProcessor.java | 48 +-
.../processors/igfs/IgfsDataManager.java | 2 -
.../processors/igfs/IgfsDeleteWorker.java | 102 +-
.../internal/processors/igfs/IgfsFileInfo.java | 13 +-
.../internal/processors/igfs/IgfsImpl.java | 228 +--
.../processors/igfs/IgfsMetaManager.java | 909 ++++++++----
.../processors/igfs/IgfsOutputStreamImpl.java | 2 +
.../internal/processors/igfs/IgfsUtils.java | 23 +
.../processors/rest/GridRestCommand.java | 8 +-
.../processors/rest/GridRestProcessor.java | 364 ++++-
.../handlers/cache/GridCacheCommandHandler.java | 364 ++---
.../handlers/query/QueryCommandHandler.java | 195 ++-
.../top/GridTopologyCommandHandler.java | 31 +-
.../rest/request/RestQueryRequest.java | 175 +++
.../rest/request/RestSqlQueryRequest.java | 125 --
.../ignite/internal/util/GridJavaProcess.java | 12 +-
.../ignite/internal/util/IgniteUtils.java | 2 +-
.../ignite/internal/util/lang/GridFunc.java | 12 +
.../ignite/internal/util/nio/GridNioServer.java | 2 +-
.../apache/ignite/marshaller/Marshaller.java | 2 +-
.../optimized/OptimizedMarshallerUtils.java | 6 +-
.../communication/tcp/TcpCommunicationSpi.java | 22 +-
.../ignite/spi/deployment/DeploymentSpi.java | 8 +-
modules/core/src/test/config/tests.properties | 3 +
.../ignite/igfs/IgfsEventsAbstractSelfTest.java | 6 +-
.../cache/CacheAffinityCallSelfTest.java | 4 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 486 ++++---
.../cache/GridCacheClearSelfTest.java | 308 ++++
.../GridCacheDeploymentOffHeapSelfTest.java | 15 +
.../IgniteCacheConfigurationTemplateTest.java | 31 +
.../cache/IgniteCacheEntryListenerTxTest.java | 4 +
.../cache/IgniteCachePutAllRestartTest.java | 4 +-
.../CacheAbstractRestartSelfTest.java | 247 ++++
.../CacheGetFutureHangsSelfTest.java | 159 +--
...NearDisabledAtomicInvokeRestartSelfTest.java | 179 +++
...abledTransactionalInvokeRestartSelfTest.java | 173 +++
...edTransactionalWriteReadRestartSelfTest.java | 124 ++
.../CacheNoValueClassOnServerNodeTest.java | 1 +
...niteCacheClientNodeChangingTopologyTest.java | 10 +-
.../distributed/IgniteCacheCreatePutTest.java | 125 ++
.../dht/GridCacheDhtEntrySelfTest.java | 2 +-
.../dht/GridCacheDhtPreloadPerformanceTest.java | 133 ++
.../dht/GridCacheTxNodeFailureSelfTest.java | 2 +
.../dht/GridNearCacheTxNodeFailureSelfTest.java | 4 +
...gniteAtomicLongChangingTopologySelfTest.java | 159 ++-
...tomicClientOnlyMultiNodeFullApiSelfTest.java | 71 +-
...ledFairAffinityMultiNodeFullApiSelfTest.java | 8 +-
...icOffHeapTieredMultiNodeFullApiSelfTest.java | 7 +-
.../near/GridCacheNearTxExceptionSelfTest.java | 4 +
.../near/GridCacheNearTxForceKeyTest.java | 2 +-
...achePartitionedMultiNodeFullApiSelfTest.java | 129 +-
.../replicated/GridReplicatedTxPreloadTest.java | 7 +-
...bledFairAffinityMultiJvmFullApiSelfTest.java | 5 +
...tomicNearEnabledMultiJvmFullApiSelfTest.java | 5 +
.../DataStreamerMultiThreadedSelfTest.java | 4 +-
.../DataStreamerMultinodeCreateCacheTest.java | 2 +
.../processors/igfs/IgfsAbstractSelfTest.java | 812 ++++++++---
.../igfs/IgfsDataManagerSelfTest.java | 13 +-
.../igfs/IgfsMetaManagerSelfTest.java | 170 +--
.../processors/igfs/IgfsProcessorSelfTest.java | 12 +-
.../nio/IgniteExceptionInNioWorkerSelfTest.java | 105 ++
.../tcp/TcpClientDiscoverySpiSelfTest.java | 2 +
.../testframework/junits/GridAbstractTest.java | 116 +-
.../junits/IgniteTestResources.java | 8 +-
.../junits/common/GridCommonAbstractTest.java | 15 +-
.../junits/multijvm/AffinityProcessProxy.java | 440 ++++--
.../multijvm/IgniteCacheProcessProxy.java | 1346 ++++++++++++++----
.../multijvm/IgniteClusterProcessProxy.java | 115 +-
.../multijvm/IgniteEventsProcessProxy.java | 50 +-
.../junits/multijvm/IgniteNodeRunner.java | 39 +-
.../junits/multijvm/IgniteProcessProxy.java | 107 +-
.../ignite/testsuites/IgniteBasicTestSuite.java | 3 +
.../IgniteCacheFullApiSelfTestSuite.java | 8 +-
.../IgniteCacheLoadConsistencyTestSuite.java | 42 +
.../testsuites/IgniteCacheTestSuite4.java | 5 +
.../ignite/testsuites/IgniteIgfsTestSuite.java | 6 +
modules/extdata/uri/pom.xml | 11 +-
.../hadoop/igfs/HadoopIgfsWrapper.java | 54 +-
.../HadoopIgfs20FileSystemAbstractSelfTest.java | 27 +-
.../IgniteHadoopFileSystemAbstractSelfTest.java | 2 +-
.../CacheHibernateBlobStoreSelfTest.java | 6 +-
.../cache/CacheConfigurationP2PTest.java | 3 +
.../cache/SqlFieldsQuerySelfTest.java | 172 +++
.../IgniteCacheQuerySelfTestSuite.java | 2 +
.../main/cpp/common/project/vs/common.vcxproj | 4 +-
.../http/jetty/GridJettyJsonConfig.java | 158 +-
.../http/jetty/GridJettyRestHandler.java | 186 +--
.../spi/deployment/uri/UriDeploymentSpi.java | 93 +-
.../scanners/http/UriDeploymentHttpScanner.java | 10 +-
.../http/GridHttpDeploymentSelfTest.java | 132 +-
.../visor/commands/kill/VisorKillCommand.scala | 2 +-
.../scala/org/apache/ignite/visor/visor.scala | 1 -
modules/yardstick/pom.xml | 10 +-
pom.xml | 88 +-
202 files changed, 10323 insertions(+), 4135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad99064e/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad99064e/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
----------------------------------------------------------------------
[6/6] ignite git commit: review
Posted by yz...@apache.org.
review
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5f38a18e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5f38a18e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5f38a18e
Branch: refs/heads/ignite-638
Commit: 5f38a18e1dcb03523d151e2693f7bd7c2fedca81
Parents: ad99064
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri Oct 23 19:09:35 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Oct 23 19:09:35 2015 +0300
----------------------------------------------------------------------
.../datastructures/IgniteSemaphoreExample.java | 69 +++++++++++++-------
.../src/main/java/org/apache/ignite/Ignite.java | 4 +-
.../java/org/apache/ignite/IgniteSemaphore.java | 29 ++++++--
.../datastructures/DataStructuresProcessor.java | 32 +++++----
.../datastructures/GridCacheSemaphoreEx.java | 17 +++++
.../datastructures/GridCacheSemaphoreImpl.java | 57 +++++++++++-----
.../datastructures/GridCacheSemaphoreState.java | 55 +++++++++-------
.../IgniteSemaphoreAbstractSelfTest.java | 1 +
.../IgnitePartitionedSemaphoreSelfTest.java | 1 -
9 files changed, 182 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f38a18e/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
index ece0ffc..1c078b0 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.ignite.examples.datastructures;
import java.util.UUID;
@@ -8,17 +25,17 @@ import org.apache.ignite.examples.ExampleNodeStartup;
import org.apache.ignite.lang.IgniteRunnable;
/**
- * This example demonstrates cache based semaphore. <p> Remote nodes should always be started with special configuration
- * file which enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. <p> Alternatively
- * you can run {@link ExampleNodeStartup} in another JVM which will start node with {@code
+ * This example demonstrates cache based semaphore.
+ * <p>
+ * Remote nodes should always be started with special configuration
+ * file which enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node with {@code
* examples/config/example-ignite.xml} configuration.
*/
public class IgniteSemaphoreExample {
- /** Cache name. */
- private static final String CACHE_NAME = IgniteSemaphoreExample.class.getSimpleName();
-
/** Number of items for each producer/consumer to produce/consume. */
- private static final int ITEM_COUNT = 100;
+ private static final int OPS_COUNT = 100;
/** Number of producers. */
private static final int NUM_PRODUCERS = 10;
@@ -27,15 +44,20 @@ public class IgniteSemaphoreExample {
private static final int NUM_CONSUMERS = 10;
/** Synchronization semaphore name. */
- private static final String syncName = IgniteSemaphoreExample.class.getSimpleName();
+ private static final String SEM_NAME = IgniteSemaphoreExample.class.getSimpleName();
+ /**
+ * Executes example.
+ *
+ * @param args Command line arguments, none required.
+ */
public static void main(String[] args) {
try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
System.out.println();
System.out.println(">>> Cache atomic semaphore example started.");
// Initialize semaphore.
- IgniteSemaphore syncSemaphore = ignite.semaphore(syncName, 0, false, true);
+ IgniteSemaphore syncSemaphore = ignite.semaphore(SEM_NAME, 0, false, true);
// Make name of semaphore.
final String semaphoreName = UUID.randomUUID().toString();
@@ -82,7 +104,6 @@ public class IgniteSemaphoreExample {
* Closure which simply signals the semaphore.
*/
private static class Producer extends SemaphoreExampleClosure {
-
/**
* @param semaphoreName Semaphore name.
*/
@@ -94,20 +115,21 @@ public class IgniteSemaphoreExample {
@Override public void run() {
IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
- for (int i = 0; i < ITEM_COUNT; i++) {
- System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "]. Available: " + semaphore.availablePermits());
+ for (int i = 0; i < OPS_COUNT; i++) {
+ System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() +
+ ", available=" + semaphore.availablePermits() + ']');
// Signals others that shared resource is available.
semaphore.release();
}
- System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");
+ System.out.println("Producer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']');
// Gets the syncing semaphore
- IgniteSemaphore sync = Ignition.ignite().semaphore(syncName, 0, true, true);
+ IgniteSemaphore sem = Ignition.ignite().semaphore(SEM_NAME, 0, true, true);
// Signals the master thread
- sync.release();
+ sem.release();
}
}
@@ -115,7 +137,6 @@ public class IgniteSemaphoreExample {
* Closure which simply waits on semaphore.
*/
private static class Consumer extends SemaphoreExampleClosure {
-
/**
* @param semaphoreName Semaphore name.
*/
@@ -125,23 +146,23 @@ public class IgniteSemaphoreExample {
/** {@inheritDoc} */
@Override public void run() {
- IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
+ IgniteSemaphore sem = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
- for (int i = 0; i < ITEM_COUNT; i++) {
+ for (int i = 0; i < OPS_COUNT; i++) {
// Block if no permits are available.
- semaphore.acquire();
+ sem.acquire();
- System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "]. Available: " + semaphore.availablePermits());
+ System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() +
+ ", available=" + sem.availablePermits() + ']');
}
- System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");
+ System.out.println("Consumer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']');
// Gets the syncing semaphore
- IgniteSemaphore sync = Ignition.ignite().semaphore(syncName, 3, true, true);
+ IgniteSemaphore sync = Ignition.ignite().semaphore(SEM_NAME, 3, true, true);
- // Signals the master thread
+ // Signals the master thread.
sync.release();
}
}
}
-
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f38a18e/modules/core/src/main/java/org/apache/ignite/Ignite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index dffb126..5437903 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -431,7 +431,7 @@ public interface Ignite extends AutoCloseable {
* @throws IgniteException If semaphore could not be fetched or created.
*/
public IgniteSemaphore semaphore(String name, int cnt, boolean fair, boolean create)
- throws IgniteException;
+ throws IgniteException;
/**
* Will get a named queue from cache and create one if it has not been created yet and {@code cfg} is not
@@ -491,4 +491,4 @@ public interface Ignite extends AutoCloseable {
* @return Affinity.
*/
public <K> Affinity<K> affinity(String cacheName);
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f38a18e/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
index 5a4b377..1db79a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
@@ -1,13 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.ignite;
import java.io.Closeable;
import java.util.concurrent.TimeUnit;
/**
- * This interface provides a rich API for working with distributed semaphore. <p> <h1 class="header">Functionality</h1>
- * Distributed semaphore provides functionality similar to {@code java.util.concurrent.Semaphore}. <h1
- * class="header">Creating Distributed Semaphore</h1> Instance of cache semaphore can be created by calling the
- * following method: {@link Ignite#semaphore(String, int, boolean, boolean)}.
+ * This interface provides a rich API for working with distributed semaphore.
+ * <p>
+ * <h1 class="header">Functionality</h1>
+ * Distributed semaphore provides functionality similar to {@code java.util.concurrent.Semaphore}.
+ * <h1 class="header">Creating Distributed Semaphore</h1>
+ * Instance of cache semaphore can be created by calling the following method:
+ * {@link Ignite#semaphore(String, int, boolean, boolean)}.
*/
public interface IgniteSemaphore extends Closeable {
/**
@@ -291,4 +311,3 @@ public interface IgniteSemaphore extends Closeable {
*/
@Override public void close();
}
-
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f38a18e/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 82d6725..76db4ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -1250,11 +1250,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsView.put(key, val);
}
- semaphore = new GridCacheSemaphoreImpl(name, val.getCount(),
- fair,
- key,
- semaphoreView,
- dsCacheCtx);
+ semaphore = new GridCacheSemaphoreImpl(
+ name, val.getCount(),
+ fair,
+ key,
+ semaphoreView,
+ dsCacheCtx);
dsMap.put(key, semaphore);
@@ -1299,19 +1300,19 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class);
if (val != null) {
- if (val.getCount() < 0) {
- throw new IgniteCheckedException("Failed to remove semaphore " +
- "with blocked threads. ");
- }
+ if (val.getCount() < 0)
+ throw new IgniteCheckedException("Failed to remove semaphore with blocked threads. ");
dsView.remove(key);
tx.commit();
- } else
+ }
+ else
tx.setRollbackOnly();
return null;
- } finally {
+ }
+ finally {
dsCacheCtx.gate().leave();
}
}
@@ -1327,7 +1328,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException If removing failed or class of object is different to expected class.
*/
private <R> boolean removeInternal(final GridCacheInternal key, final Class<R> cls) throws IgniteCheckedException {
- return CU.outTx(new Callable<Boolean>() {
+ return CU.outTx(
+ new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
// Check correctness type of removable object.
@@ -1365,7 +1367,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
@Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED)
return evt.getValue() instanceof GridCacheCountDownLatchValue ||
- evt.getValue() instanceof GridCacheSemaphoreState;
+ evt.getValue() instanceof GridCacheSemaphoreState;
else {
assert evt.getEventType() == EventType.REMOVED : evt;
@@ -1573,7 +1575,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
catch (ClusterGroupEmptyCheckedException e) {
throw new IgniteCheckedException(e);
}
- catch (IgniteTxRollbackCheckedException | CachePartialUpdateCheckedException | ClusterTopologyCheckedException e) {
+ catch (IgniteTxRollbackCheckedException |
+ CachePartialUpdateCheckedException |
+ ClusterTopologyCheckedException e) {
if (cnt++ == MAX_UPDATE_RETRIES)
throw e;
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f38a18e/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
index 8ecbcc5..76d8bb7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.ignite.internal.processors.datastructures;
import org.apache.ignite.IgniteSemaphore;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f38a18e/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index 78d923a..85ba9f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.ignite.internal.processors.datastructures;
import java.io.Externalizable;
@@ -90,8 +107,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
/**
- * Synchronization implementation for semaphore. Uses AQS state to represent permits. Subclassed into fair and
- * nonfair versions.
+ * Synchronization implementation for semaphore.
+ * Uses AQS state to represent permits. Subclassed into fair and nonfair versions.
*/
abstract class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
@@ -195,7 +212,6 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
*/
final int drainPermits() {
for (; ; ) {
-
int current = getState();
if (current == 0 || compareAndSetGlobalState(current, 0))
@@ -212,7 +228,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
CU.outTx(
retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (
+ IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)
+ ) {
GridCacheSemaphoreState val = semaphoreView.get(key);
if (val == null)
@@ -255,18 +273,22 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
return CU.outTx(
retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (
+ IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView,
+ PESSIMISTIC, REPEATABLE_READ)
+ ) {
GridCacheSemaphoreState val = semaphoreView.get(key);
if (val == null)
- throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
+ throw new IgniteCheckedException("Failed to find semaphore with given name: " +
+ name);
boolean retVal = val.getCount() == expVal;
if (retVal) {
- /* If current thread is queued, than this call is the call that is going to be unblocked. */
+ // If current thread is queued, than this call is
+ // the call that is going to be unblocked.
if (sync.isQueued(Thread.currentThread())) {
-
int waiting = val.getWaiters() - 1;
val.setWaiters(waiting);
@@ -300,7 +322,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
/**
- * NonFair version
+ * NonFair version.
*/
final class NonfairSync extends Sync {
private static final long serialVersionUID = 7983135489326435495L;
@@ -356,12 +378,14 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
* @param semaphoreView Semaphore projection.
* @param ctx Cache context.
*/
- public GridCacheSemaphoreImpl(String name,
+ public GridCacheSemaphoreImpl(
+ String name,
int initCnt,
boolean fair,
GridCacheInternalKey key,
IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semaphoreView,
- GridCacheContext ctx) {
+ GridCacheContext ctx
+ ) {
assert name != null;
assert key != null;
assert semaphoreView != null;
@@ -381,7 +405,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
* @throws IgniteCheckedException If operation failed.
*/
private void initializeSemaphore() throws IgniteCheckedException {
- if (initGuard.compareAndSet(false, true)) {
+ if (!initGuard.get() && initGuard.compareAndSet(false, true)) {
try {
sync = CU.outTx(
retryTopologySafe(new Callable<Sync>() {
@@ -499,6 +523,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
/** {@inheritDoc} */
@Override public void acquireUninterruptibly(int permits) {
A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
try {
initializeSemaphore();
@@ -514,10 +539,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
int ret;
try {
initializeSemaphore();
+
ret = CU.outTx(
retryTopologySafe(new Callable<Integer>() {
@Override public Integer call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (
+ IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)
+ ) {
GridCacheSemaphoreState val = semaphoreView.get(key);
if (val == null)
@@ -537,6 +565,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+
return ret;
}
@@ -688,6 +717,4 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
@Override public String toString() {
return S.toString(GridCacheSemaphoreImpl.class, this);
}
-
}
-
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f38a18e/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
index 1109b53..ce0da0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.ignite.internal.processors.datastructures;
import java.io.Externalizable;
@@ -14,19 +31,13 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
/** */
private static final long serialVersionUID = 0L;
- /**
- * Permission count.
- */
+ /** Permission count. */
private int count;
- /**
- * Waiter id.
- */
+ /** Waiter ID. */
private int waiters;
- /**
- * Fairness flag.
- */
+ /** Fairness flag. */
private boolean fair;
/**
@@ -72,48 +83,48 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
return count;
}
+ /**
+ * @return Waiters.
+ */
public int getWaiters() {
return waiters;
}
+ /**
+ * @param id Waiters.
+ */
public void setWaiters(int id) {
this.waiters = id;
}
+ /**
+ * @return Fair flag.
+ */
public boolean isFair() {
return fair;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override public Object clone() throws CloneNotSupportedException {
return super.clone();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(count);
out.writeInt(waiters);
out.writeBoolean(fair);
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException {
count = in.readInt();
waiters = in.readInt();
fair = in.readBoolean();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheSemaphoreState.class, this);
}
}
-
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f38a18e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
index 977a414..24959ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
@@ -60,6 +60,7 @@ public abstract class IgniteSemaphoreAbstractSelfTest extends IgniteAtomicsAbstr
/** */
private static final Random RND = new Random();
+ /** */
@Rule
public final ExpectedException exception = ExpectedException.none();
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f38a18e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java
index 059e80a..c302cad 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java
@@ -30,5 +30,4 @@ public class IgnitePartitionedSemaphoreSelfTest extends IgniteSemaphoreAbstractS
@Override protected CacheMode atomicsCacheMode() {
return PARTITIONED;
}
-
}
[2/6] ignite git commit: Fixes formatting issues;
Posted by yz...@apache.org.
Fixes formatting issues;
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/be332a82
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/be332a82
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/be332a82
Branch: refs/heads/ignite-638
Commit: be332a82711ddd7e9088e00d7f26edd7de407a11
Parents: e9567ad
Author: Vladisav Jelisavcic <vl...@gmail.com>
Authored: Thu Oct 1 20:19:32 2015 +0200
Committer: vladisav <vl...@gmail.com>
Committed: Thu Oct 1 20:19:32 2015 +0200
----------------------------------------------------------------------
.../datastructures/IgniteSemaphoreExample.java | 68 ++--
.../java/org/apache/ignite/IgniteSemaphore.java | 396 +++++++------------
.../datastructures/GridCacheSemaphoreEx.java | 6 +-
.../datastructures/GridCacheSemaphoreImpl.java | 346 +++++++++-------
.../datastructures/GridCacheSemaphoreState.java | 9 +-
.../datastructures/GridCacheSemaphoreValue.java | 115 ------
6 files changed, 396 insertions(+), 544 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
index 2ef242c..5849f5f 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
@@ -1,22 +1,20 @@
package org.apache.ignite.examples.datastructures;
-import org.apache.ignite.*;
-import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.examples.ExampleNodeStartup;
import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSemaphore;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.examples.ExampleNodeStartup;
+import org.apache.ignite.lang.IgniteRunnable;
/**
- * This example demonstrates cache based semaphore.
- * <p>
- * Remote nodes should always be started with special configuration file which
- * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
- * <p>
- * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will
- * start node with {@code examples/config/example-ignite.xml} configuration.
- *
- * @author Vladisav Jelisavcic
+ * This example demonstrates cache based semaphore. <p> Remote nodes should always be started with special configuration
+ * file which enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. <p> Alternatively
+ * you can run {@link ExampleNodeStartup} in another JVM which will start node with {@code
+ * examples/config/example-ignite.xml} configuration.
*/
public class IgniteSemaphoreExample {
/** Cache name. */
@@ -40,7 +38,7 @@ public class IgniteSemaphoreExample {
System.out.println(">>> Cache atomic semaphore example started.");
// Initialize semaphore.
- IgniteSemaphore syncSemaphore = ignite.semaphore(syncName,0,false,true);
+ IgniteSemaphore syncSemaphore = ignite.semaphore(syncName, 0, false, true);
// Make name of semaphore.
final String semaphoreName = UUID.randomUUID().toString();
@@ -50,21 +48,25 @@ public class IgniteSemaphoreExample {
// Make shared resource
final String resourceName = UUID.randomUUID().toString();
+
+ // Get cache view where the resource will be held
IgniteCache<String, Queue<String>> cache = ignite.getOrCreateCache(CACHE_NAME);
+
+ // Put the resource queue the cache
cache.put(resourceName, new LinkedList<>());
// Initialize semaphore.
IgniteSemaphore semaphore = ignite.semaphore(semaphoreName, 0, false, true);
// Initialize mutex.
- IgniteSemaphore mutex = ignite.semaphore(mutexName,1,false,true);
+ IgniteSemaphore mutex = ignite.semaphore(mutexName, 1, false, true);
// Start consumers on all cluster nodes.
for (int i = 0; i < NUM_CONSUMERS; i++)
ignite.compute().withAsync().run(new Consumer(mutexName, semaphoreName, resourceName));
// Start producers on all cluster nodes.
- for(int i = 0; i < NUM_PRODUCERS; i++)
+ for (int i = 0; i < NUM_PRODUCERS; i++)
ignite.compute().withAsync().run(new Producer(mutexName, semaphoreName, resourceName));
System.out.println("Master node is waiting for all other nodes to finish...");
@@ -121,23 +123,33 @@ public class IgniteSemaphoreExample {
/** {@inheritDoc} */
@Override public void run() {
IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
- IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName,0,true,true);
+ IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName, 0, true, true);
- for(int i=0;i<ITEM_COUNT;i++) {
+ for (int i = 0; i < ITEM_COUNT; i++) {
+ // Mutex is used to access shared resource.
mutex.acquire();
- Queue<String> queue = (Queue<String>) Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+ Queue<String> queue = (Queue<String>)Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+
queue.add(Ignition.ignite().cluster().localNode().id().toString());
+
Ignition.ignite().cache(CACHE_NAME).put(resourceName, queue);
+
System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] produced data. Available: " + semaphore.availablePermits());
+ // Mutex is released for others to access the resource.
mutex.release();
+ // Signals others that shared resource is available.
semaphore.release();
}
System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");
- IgniteSemaphore sync = Ignition.ignite().semaphore(syncName, 0, true, true);
+
+ // Gets the syncing semaphore
+ IgniteSemaphore sync = Ignition.ignite().semaphore(syncName, 0, true, true);
+
+ // Signals the master thread
sync.release();
}
}
@@ -159,23 +171,33 @@ public class IgniteSemaphoreExample {
/** {@inheritDoc} */
@Override public void run() {
IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
- IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName,0,true,true);
+ IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName, 0, true, true);
- for(int i=0;i<ITEM_COUNT;i++) {
+ for (int i = 0; i < ITEM_COUNT; i++) {
+ // Block if queue is empty.
semaphore.acquire();
+ // Mutex is used to access shared resource.
mutex.acquire();
- Queue<String> queue = (Queue<String>) Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+ Queue<String> queue = (Queue<String>)Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+
String data = queue.remove();
+
Ignition.ignite().cache(CACHE_NAME).put(resourceName, queue);
+
System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] consumed data generated by producer [nodeId=" + data + "]");
+ // Signals others that shared resource is available.
mutex.release();
}
System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");
- IgniteSemaphore sync = Ignition.ignite().semaphore(syncName, 3, true, true);
+
+ // Gets the syncing semaphore
+ IgniteSemaphore sync = Ignition.ignite().semaphore(syncName, 3, true, true);
+
+ // Signals the master thread
sync.release();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
index 0e29d00..5a4b377 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
@@ -4,17 +4,12 @@ import java.io.Closeable;
import java.util.concurrent.TimeUnit;
/**
- * This interface provides a rich API for working with distributed semaphore.
- * <p>
- * <h1 class="header">Functionality</h1>
- * Distributed semaphore provides functionality similar to {@code java.util.concurrent.Semaphore}.
- * <h1 class="header">Creating Distributed Semaphore</h1>
- * Instance of cache semaphore can be created by calling the following method:
- * {@link Ignite#semaphore(String, int, boolean, boolean)}.
- *
- * @author Vladisav Jelisavcic
+ * This interface provides a rich API for working with distributed semaphore. <p> <h1 class="header">Functionality</h1>
+ * Distributed semaphore provides functionality similar to {@code java.util.concurrent.Semaphore}. <h1
+ * class="header">Creating Distributed Semaphore</h1> Instance of cache semaphore can be created by calling the
+ * following method: {@link Ignite#semaphore(String, int, boolean, boolean)}.
*/
-public interface IgniteSemaphore extends Closeable{
+public interface IgniteSemaphore extends Closeable {
/**
* Gets name of the semaphore.
*
@@ -23,151 +18,110 @@ public interface IgniteSemaphore extends Closeable{
public String name();
/**
- * Acquires a permit from this semaphore, blocking until one is
- * available, or the thread is {@linkplain Thread#interrupt interrupted}.
- *
- * <p>Acquires a permit, if one is available and returns immediately,
- * reducing the number of available permits by one.
- *
- * <p>If no permit is available then the current thread becomes
- * disabled for thread scheduling purposes and lies dormant until
- * one of two things happens:
- * <ul>
- * <li>Some other thread invokes the {@link #release} method for this
- * semaphore and the current thread is next to be assigned a permit; or
- * <li>Some other thread {@linkplain Thread#interrupt interrupts}
- * the current thread.
- * </ul>
- *
- * <p>If the current thread:
- * <ul>
- * <li>has its interrupted status set on entry to this method; or
- * <li>is {@linkplain Thread#interrupt interrupted} while waiting
- * for a permit,
- * </ul>
- * then {@link IgniteInterruptedException} is thrown and the current thread's
- * interrupted status is cleared.
+ * Acquires a permit from this semaphore, blocking until one is available, or the thread is {@linkplain
+ * Thread#interrupt interrupted}.
+ *
+ * <p>Acquires a permit, if one is available and returns immediately, reducing the number of available permits by
+ * one.
+ *
+ * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies
+ * dormant until one of two things happens: <ul> <li>Some other thread invokes the {@link #release} method for this
+ * semaphore and the current thread is next to be assigned a permit; or <li>Some other thread {@linkplain
+ * Thread#interrupt interrupts} the current thread. </ul>
+ *
+ * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+ * Thread#interrupt interrupted} while waiting for a permit, </ul> then {@link IgniteInterruptedException} is thrown
+ * and the current thread's interrupted status is cleared.
*
* @throws IgniteInterruptedException if the current thread is interrupted
*/
public void acquire() throws IgniteInterruptedException;
/**
- * Acquires a permit from this semaphore, blocking until one is
- * available.
- *
- * <p>Acquires a permit, if one is available and returns immediately,
- * reducing the number of available permits by one.
- *
- * <p>If no permit is available then the current thread becomes
- * disabled for thread scheduling purposes and lies dormant until
- * some other thread invokes the {@link #release} method for this
- * semaphore and the current thread is next to be assigned a permit.
- *
- * <p>If the current thread is {@linkplain Thread#interrupt interrupted}
- * while waiting for a permit then it will continue to wait, but the
- * time at which the thread is assigned a permit may change compared to
- * the time it would have received the permit had no interruption
- * occurred. When the thread does return from this method its interrupt
- * status will be set.
+ * Acquires a permit from this semaphore, blocking until one is available.
+ *
+ * <p>Acquires a permit, if one is available and returns immediately, reducing the number of available permits by
+ * one.
+ *
+ * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies
+ * dormant until some other thread invokes the {@link #release} method for this semaphore and the current thread is
+ * next to be assigned a permit.
+ *
+ * <p>If the current thread is {@linkplain Thread#interrupt interrupted} while waiting for a permit then it will
+ * continue to wait, but the time at which the thread is assigned a permit may change compared to the time it would
+ * have received the permit had no interruption occurred. When the thread does return from this method its
+ * interrupt status will be set.
*/
public void acquireUninterruptibly();
/**
- * Acquires a permit from this semaphore, only if one is available at the
- * time of invocation.
- *
- * <p>Acquires a permit, if one is available and returns immediately,
- * with the value {@code true},
- * reducing the number of available permits by one.
- *
- * <p>If no permit is available then this method will return
- * immediately with the value {@code false}.
- *
- * <p>Even when this semaphore has been set to use a
- * fair ordering policy, a call to {@code tryAcquire()} <em>will</em>
- * immediately acquire a permit if one is available, whether or not
- * other threads are currently waiting.
- * This "barging" behavior can be useful in certain
- * circumstances, even though it breaks fairness. If you want to honor
- * the fairness setting, then use
- * {@link #tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit.SECONDS) }
- * which is almost equivalent (it also detects interruption).
- *
- * @return {@code true} if a permit was acquired and {@code false}
- * otherwise
+ * Acquires a permit from this semaphore, only if one is available at the time of invocation.
+ *
+ * <p>Acquires a permit, if one is available and returns immediately, with the value {@code true}, reducing the
+ * number of available permits by one.
+ *
+ * <p>If no permit is available then this method will return immediately with the value {@code false}.
+ *
+ * <p>Even when this semaphore has been set to use a fair ordering policy, a call to {@code tryAcquire()}
+ * <em>will</em> immediately acquire a permit if one is available, whether or not other threads are currently
+ * waiting. This "barging" behavior can be useful in certain circumstances, even though it breaks
+ * fairness. If you want to honor the fairness setting, then use {@link #tryAcquire(long, TimeUnit) tryAcquire(0,
+ * TimeUnit.SECONDS) } which is almost equivalent (it also detects interruption).
+ *
+ * @return {@code true} if a permit was acquired and {@code false} otherwise
*/
public boolean tryAcquire();
/**
- * Acquires a permit from this semaphore, if one becomes available
- * within the given waiting time and the current thread has not
- * been {@linkplain Thread#interrupt interrupted}.
- *
- * <p>Acquires a permit, if one is available and returns immediately,
- * with the value {@code true},
- * reducing the number of available permits by one.
- *
- * <p>If no permit is available then the current thread becomes
- * disabled for thread scheduling purposes and lies dormant until
- * one of three things happens:
- * <ul>
- * <li>Some other thread invokes the {@link #release} method for this
- * semaphore and the current thread is next to be assigned a permit; or
- * <li>Some other thread {@linkplain Thread#interrupt interrupts}
- * the current thread; or
- * <li>The specified waiting time elapses.
- * </ul>
+ * Acquires a permit from this semaphore, if one becomes available within the given waiting time and the current
+ * thread has not been {@linkplain Thread#interrupt interrupted}.
+ *
+ * <p>Acquires a permit, if one is available and returns immediately, with the value {@code true}, reducing the
+ * number of available permits by one.
+ *
+ * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies
+ * dormant until one of three things happens: <ul> <li>Some other thread invokes the {@link #release} method for
+ * this semaphore and the current thread is next to be assigned a permit; or <li>Some other thread {@linkplain
+ * Thread#interrupt interrupts} the current thread; or <li>The specified waiting time elapses. </ul>
*
* <p>If a permit is acquired then the value {@code true} is returned.
*
- * <p>If the current thread:
- * <ul>
- * <li>has its interrupted status set on entry to this method; or
- * <li>is {@linkplain Thread#interrupt interrupted} while waiting
- * to acquire a permit,
- * </ul>
- * then {@link IgniteInterruptedException} is thrown and the current thread's
- * interrupted status is cleared.
+ * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+ * Thread#interrupt interrupted} while waiting to acquire a permit, </ul> then {@link IgniteInterruptedException} is
+ * thrown and the current thread's interrupted status is cleared.
*
- * <p>If the specified waiting time elapses then the value {@code false}
- * is returned. If the time is less than or equal to zero, the method
- * will not wait at all.
+ * <p>If the specified waiting time elapses then the value {@code false} is returned. If the time is less than or
+ * equal to zero, the method will not wait at all.
*
* @param timeout the maximum time to wait for a permit
* @param unit the time unit of the {@code timeout} argument
- * @return {@code true} if a permit was acquired and {@code false}
- * if the waiting time elapsed before a permit was acquired
+ * @return {@code true} if a permit was acquired and {@code false} if the waiting time elapsed before a permit was
+ * acquired
* @throws IgniteInterruptedException if the current thread is interrupted
*/
public boolean tryAcquire(long timeout, TimeUnit unit)
- throws IgniteInterruptedException;
+ throws IgniteInterruptedException;
/**
- * Acquires the given number of permits from this semaphore,
- * blocking until all are available.
+ * Acquires the given number of permits from this semaphore, blocking until all are available.
*
- * <p>Acquires the given number of permits, if they are available,
- * and returns immediately, reducing the number of available permits
- * by the given amount.
+ * <p>Acquires the given number of permits, if they are available, and returns immediately, reducing the number of
+ * available permits by the given amount.
*
- * <p>If insufficient permits are available then the current thread becomes
- * disabled for thread scheduling purposes and lies dormant until
- * some other thread invokes one of the {@link #release() release}
- * methods for this semaphore, the current thread is next to be assigned
- * permits and the number of available permits satisfies this request.
+ * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes
+ * and lies dormant until some other thread invokes one of the {@link #release() release} methods for this
+ * semaphore, the current thread is next to be assigned permits and the number of available permits satisfies this
+ * request.
*
- * <p>If the current thread is {@linkplain Thread#interrupt interrupted}
- * while waiting for permits then it will continue to wait and its
- * position in the queue is not affected. When the thread does return
- * from this method its interrupt status will be set.
+ * <p>If the current thread is {@linkplain Thread#interrupt interrupted} while waiting for permits then it will
+ * continue to wait and its position in the queue is not affected. When the thread does return from this method its
+ * interrupt status will be set.
*
* @param permits the number of permits to acquire
* @throws IllegalArgumentException if {@code permits} is negative
*/
public void acquireUninterruptibly(int permits);
-
/**
* Returns the current number of permits available in this semaphore.
*
@@ -187,131 +141,91 @@ public interface IgniteSemaphore extends Closeable{
/**
* Releases a permit, returning it to the semaphore.
*
- * <p>Releases a permit, increasing the number of available permits by
- * one. If any threads are trying to acquire a permit, then one is
- * selected and given the permit that was just released. That thread
- * is (re)enabled for thread scheduling purposes.
+ * <p>Releases a permit, increasing the number of available permits by one. If any threads are trying to acquire a
+ * permit, then one is selected and given the permit that was just released. That thread is (re)enabled for thread
+ * scheduling purposes.
*
- * <p>There is no requirement that a thread that releases a permit must
- * have acquired that permit by calling {@link #acquire}.
- * Correct usage of a semaphore is established by programming convention
- * in the application.
+ * <p>There is no requirement that a thread that releases a permit must have acquired that permit by calling {@link
+ * #acquire}. Correct usage of a semaphore is established by programming convention in the application.
*/
public void release();
/**
- * Acquires the given number of permits from this semaphore, if all
- * become available within the given waiting time and the current
- * thread has not been {@linkplain Thread#interrupt interrupted}.
+ * Acquires the given number of permits from this semaphore, if all become available within the given waiting time
+ * and the current thread has not been {@linkplain Thread#interrupt interrupted}.
*
- * <p>Acquires the given number of permits, if they are available and
- * returns immediately, with the value {@code true},
- * reducing the number of available permits by the given amount.
- *
- * <p>If insufficient permits are available then
- * the current thread becomes disabled for thread scheduling
- * purposes and lies dormant until one of three things happens:
- * <ul>
- * <li>Some other thread invokes one of the {@link #release() release}
- * methods for this semaphore, the current thread is next to be assigned
- * permits and the number of available permits satisfies this request; or
- * <li>Some other thread {@linkplain Thread#interrupt interrupts}
- * the current thread; or
- * <li>The specified waiting time elapses.
- * </ul>
+ * <p>Acquires the given number of permits, if they are available and returns immediately, with the value {@code
+ * true}, reducing the number of available permits by the given amount.
+ *
+ * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes
+ * and lies dormant until one of three things happens: <ul> <li>Some other thread invokes one of the {@link
+ * #release() release} methods for this semaphore, the current thread is next to be assigned permits and the number
+ * of available permits satisfies this request; or <li>Some other thread {@linkplain Thread#interrupt interrupts}
+ * the current thread; or <li>The specified waiting time elapses. </ul>
*
* <p>If the permits are acquired then the value {@code true} is returned.
*
- * <p>If the current thread:
- * <ul>
- * <li>has its interrupted status set on entry to this method; or
- * <li>is {@linkplain Thread#interrupt interrupted} while waiting
- * to acquire the permits,
- * </ul>
- * then {@link IgniteInterruptedException} is thrown and the current thread's
- * interrupted status is cleared.
- * Any permits that were to be assigned to this thread, are instead
- * assigned to other threads trying to acquire permits, as if
- * the permits had been made available by a call to {@link #release()}.
- *
- * <p>If the specified waiting time elapses then the value {@code false}
- * is returned. If the time is less than or equal to zero, the method
- * will not wait at all. Any permits that were to be assigned to this
- * thread, are instead assigned to other threads trying to acquire
- * permits, as if the permits had been made available by a call to
+ * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+ * Thread#interrupt interrupted} while waiting to acquire the permits, </ul> then {@link IgniteInterruptedException}
+ * is thrown and the current thread's interrupted status is cleared. Any permits that were to be assigned to this
+ * thread, are instead assigned to other threads trying to acquire permits, as if the permits had been made
+ * available by a call to {@link #release()}.
+ *
+ * <p>If the specified waiting time elapses then the value {@code false} is returned. If the time is less than or
+ * equal to zero, the method will not wait at all. Any permits that were to be assigned to this thread, are instead
+ * assigned to other threads trying to acquire permits, as if the permits had been made available by a call to
* {@link #release()}.
*
* @param permits the number of permits to acquire
* @param timeout the maximum time to wait for the permits
* @param unit the time unit of the {@code timeout} argument
- * @return {@code true} if all permits were acquired and {@code false}
- * if the waiting time elapsed before all permits were acquired
+ * @return {@code true} if all permits were acquired and {@code false} if the waiting time elapsed before all
+ * permits were acquired
* @throws IgniteInterruptedException if the current thread is interrupted
* @throws IllegalArgumentException if {@code permits} is negative
*/
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
- throws IgniteInterruptedException;
+ throws IgniteInterruptedException;
/**
- * Acquires the given number of permits from this semaphore, only
- * if all are available at the time of invocation.
- *
- * <p>Acquires the given number of permits, if they are available, and
- * returns immediately, with the value {@code true},
- * reducing the number of available permits by the given amount.
- *
- * <p>If insufficient permits are available then this method will return
- * immediately with the value {@code false} and the number of available
- * permits is unchanged.
- *
- * <p>Even when this semaphore has been set to use a fair ordering
- * policy, a call to {@code tryAcquire} <em>will</em>
- * immediately acquire a permit if one is available, whether or
- * not other threads are currently waiting. This
- * "barging" behavior can be useful in certain
- * circumstances, even though it breaks fairness. If you want to
- * honor the fairness setting, then use {@link #tryAcquire(int,
- * long, TimeUnit) tryAcquire(permits, 0, TimeUnit.SECONDS) }
- * which is almost equivalent (it also detects interruption).
+ * Acquires the given number of permits from this semaphore, only if all are available at the time of invocation.
+ *
+ * <p>Acquires the given number of permits, if they are available, and returns immediately, with the value {@code
+ * true}, reducing the number of available permits by the given amount.
+ *
+ * <p>If insufficient permits are available then this method will return immediately with the value {@code false}
+ * and the number of available permits is unchanged.
+ *
+ * <p>Even when this semaphore has been set to use a fair ordering policy, a call to {@code tryAcquire}
+ * <em>will</em> immediately acquire a permit if one is available, whether or not other threads are currently
+ * waiting. This "barging" behavior can be useful in certain circumstances, even though it breaks
+ * fairness. If you want to honor the fairness setting, then use {@link #tryAcquire(int, long, TimeUnit)
+ * tryAcquire(permits, 0, TimeUnit.SECONDS) } which is almost equivalent (it also detects interruption).
*
* @param permits the number of permits to acquire
- * @return {@code true} if the permits were acquired and
- * {@code false} otherwise
+ * @return {@code true} if the permits were acquired and {@code false} otherwise
* @throws IllegalArgumentException if {@code permits} is negative
*/
public boolean tryAcquire(int permits);
/**
- * Acquires the given number of permits from this semaphore,
- * blocking until all are available,
- * or the thread is {@linkplain Thread#interrupt interrupted}.
- *
- * <p>Acquires the given number of permits, if they are available,
- * and returns immediately, reducing the number of available permits
- * by the given amount.
- *
- * <p>If insufficient permits are available then the current thread becomes
- * disabled for thread scheduling purposes and lies dormant until
- * one of two things happens:
- * <ul>
- * <li>Some other thread invokes one of the {@link #release() release}
- * methods for this semaphore, the current thread is next to be assigned
- * permits and the number of available permits satisfies this request; or
- * <li>Some other thread {@linkplain Thread#interrupt interrupts}
- * the current thread.
- * </ul>
- *
- * <p>If the current thread:
- * <ul>
- * <li>has its interrupted status set on entry to this method; or
- * <li>is {@linkplain Thread#interrupt interrupted} while waiting
- * for a permit,
- * </ul>
- * then {@link IgniteInterruptedException} is thrown and the current thread's
- * interrupted status is cleared.
- * Any permits that were to be assigned to this thread are instead
- * assigned to other threads trying to acquire permits, as if
- * permits had been made available by a call to {@link #release()}.
+ * Acquires the given number of permits from this semaphore, blocking until all are available, or the thread is
+ * {@linkplain Thread#interrupt interrupted}.
+ *
+ * <p>Acquires the given number of permits, if they are available, and returns immediately, reducing the number of
+ * available permits by the given amount.
+ *
+ * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes
+ * and lies dormant until one of two things happens: <ul> <li>Some other thread invokes one of the {@link #release()
+ * release} methods for this semaphore, the current thread is next to be assigned permits and the number of
+ * available permits satisfies this request; or <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+ * current thread. </ul>
+ *
+ * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+ * Thread#interrupt interrupted} while waiting for a permit, </ul> then {@link IgniteInterruptedException} is thrown
+ * and the current thread's interrupted status is cleared. Any permits that were to be assigned to this thread are
+ * instead assigned to other threads trying to acquire permits, as if permits had been made available by a call to
+ * {@link #release()}.
*
* @param permits the number of permits to acquire
* @throws IgniteInterruptedException if the current thread is interrupted
@@ -322,21 +236,16 @@ public interface IgniteSemaphore extends Closeable{
/**
* Releases the given number of permits, returning them to the semaphore.
*
- * <p>Releases the given number of permits, increasing the number of
- * available permits by that amount.
- * If any threads are trying to acquire permits, then one
- * is selected and given the permits that were just released.
- * If the number of available permits satisfies that thread's request
- * then that thread is (re)enabled for thread scheduling purposes;
- * otherwise the thread will wait until sufficient permits are available.
- * If there are still permits available
- * after this thread's request has been satisfied, then those permits
- * are assigned in turn to other threads trying to acquire permits.
- *
- * <p>There is no requirement that a thread that releases a permit must
- * have acquired that permit by calling {@link IgniteSemaphore#acquire acquire}.
- * Correct usage of a semaphore is established by programming convention
- * in the application.
+ * <p>Releases the given number of permits, increasing the number of available permits by that amount. If any
+ * threads are trying to acquire permits, then one is selected and given the permits that were just released. If the
+ * number of available permits satisfies that thread's request then that thread is (re)enabled for thread scheduling
+ * purposes; otherwise the thread will wait until sufficient permits are available. If there are still permits
+ * available after this thread's request has been satisfied, then those permits are assigned in turn to other
+ * threads trying to acquire permits.
+ *
+ * <p>There is no requirement that a thread that releases a permit must have acquired that permit by calling {@link
+ * IgniteSemaphore#acquire acquire}. Correct usage of a semaphore is established by programming convention in the
+ * application.
*
* @param permits the number of permits to release
* @throws IllegalArgumentException if {@code permits} is negative
@@ -351,23 +260,18 @@ public interface IgniteSemaphore extends Closeable{
public boolean isFair();
/**
- * Queries whether any threads are waiting to acquire. Note that
- * because cancellations may occur at any time, a {@code true}
- * return does not guarantee that any other thread will ever
- * acquire. This method is designed primarily for use in
- * monitoring of the system state.
- *
- * @return {@code true} if there may be other threads waiting to
- * acquire the lock
+ * Queries whether any threads are waiting to acquire. Note that because cancellations may occur at any time, a
+ * {@code true} return does not guarantee that any other thread will ever acquire. This method is designed
+ * primarily for use in monitoring of the system state.
+ *
+ * @return {@code true} if there may be other threads waiting to acquire the lock
*/
public boolean hasQueuedThreads();
/**
- * Returns an estimate of the number of threads waiting to acquire.
- * The value is only an estimate because the number of threads may
- * change dynamically while this method traverses internal data
- * structures. This method is designed for use in monitoring of the
- * system state, not for synchronization control.
+ * Returns an estimate of the number of threads waiting to acquire. The value is only an estimate because the number
+ * of threads may change dynamically while this method traverses internal data structures. This method is designed
+ * for use in monitoring of the system state, not for synchronization control.
*
* @return the estimated number of threads waiting for this lock
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
index 0f939d5..8ecbcc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
@@ -3,7 +3,7 @@ package org.apache.ignite.internal.processors.datastructures;
import org.apache.ignite.IgniteSemaphore;
/**
- * Created by vladisav on 20.9.15..
+ * Grid cache semaphore ({@code 'Ex'} stands for external).
*/
public interface GridCacheSemaphoreEx extends IgniteSemaphore, GridCacheRemovable {
/**
@@ -16,7 +16,7 @@ public interface GridCacheSemaphoreEx extends IgniteSemaphore, GridCacheRemovabl
/**
* Callback to notify semaphore on changes.
*
- * @param val Id of the caller and number of permissions to acquire (or release; can be negative).
+ * @param val State containing the number of available permissions.
*/
public void onUpdate(GridCacheSemaphoreState val);
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index 24c3ec5..17efc61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -1,6 +1,20 @@
package org.apache.ignite.internal.processors.datastructures;
-import org.apache.ignite.*;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -11,26 +25,16 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
- * Cache semaphore implementation based on AbstractQueuedSynchronizer.
- * Current implementation supports only unfair and locally fair modes.
- * When fairness set false, this class makes no guarantees about the order in which threads acquire permits.
- * When fairness is set true, the semaphore only guarantees that local threads invoking any of the acquire methods
- * are selected to obtain permits in the order in which their invocation of those methods was processed (FIFO).
- *
- * @author Vladisav Jelisavcic
+ * Cache semaphore implementation based on AbstractQueuedSynchronizer. Current implementation supports only unfair and
+ * locally fair modes. When fairness set false, this class makes no guarantees about the order in which threads acquire
+ * permits. When fairness is set true, the semaphore only guarantees that local threads invoking any of the acquire
+ * methods are selected to obtain permits in the order in which their invocation of those methods was processed (FIFO).
*/
public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Externalizable {
/** */
@@ -38,11 +42,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
/** Deserialization stash. */
private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
- new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
- @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
- return F.t2();
- }
- };
+ new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
+ @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
+ return F.t2();
+ }
+ };
/** Logger. */
private IgniteLogger log;
@@ -50,7 +54,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
/** Semaphore name. */
private String name;
- /** Removed flag.*/
+ /** Removed flag. */
private volatile boolean rmvd;
/** Semaphore key. */
@@ -86,14 +90,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
/**
- * Synchronization implementation for semaphore. Uses AQS state
- * to represent permits. Subclassed into fair and nonfair
- * versions.
+ * Synchronization implementation for semaphore. Uses AQS state to represent permits. Subclassed into fair and
+ * nonfair versions.
*/
abstract class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
- protected ConcurrentMap<Thread,Integer> threadMap;
+ protected final ConcurrentMap<Thread, Integer> threadMap;
protected int totalWaiters;
Sync(int permits) {
@@ -101,7 +104,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
threadMap = new ConcurrentHashMap<>();
}
- protected synchronized void setWaiters(int waiters){
+ protected synchronized void setWaiters(int waiters) {
totalWaiters = waiters;
}
@@ -109,7 +112,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
return totalWaiters;
}
- final synchronized void setPermits(int permits){
+ final synchronized void setPermits(int permits) {
setState(permits);
}
@@ -118,13 +121,14 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
final int nonfairTryAcquireShared(int acquires) {
- for (;;) {
+ for (; ; ) {
int available = getState();
+
int remaining = available - acquires;
if (remaining < 0 || compareAndSetGlobalState(available, remaining)) {
- if(remaining < 0){
- if(!threadMap.containsKey(Thread.currentThread()))
+ if (remaining < 0) {
+ if (!threadMap.containsKey(Thread.currentThread()))
getAndIncWaitingCount();
}
@@ -136,33 +140,41 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
protected final boolean tryReleaseShared(int releases) {
// Check if some other node updated the state.
// This method is called with release==0 only when trying to wake through update.
- if(releases == 0)
+ if (releases == 0)
return true;
- for (;;) {
+ for (; ; ) {
int current = getState();
+
int next = current + releases;
+
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
+
if (compareAndSetGlobalState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
- for (;;) {
+ for (; ; ) {
int current = getState();
+
int next = current - reductions;
+
if (next > current) // underflow
throw new Error("Permit count underflow");
+
if (compareAndSetGlobalState(current, next))
return;
}
}
final int drainPermits() {
- for (;;) {
+ for (; ; ) {
+
int current = getState();
+
if (current == 0 || compareAndSetGlobalState(current, 0))
return current;
}
@@ -171,34 +183,40 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
protected void getAndIncWaitingCount() {
try {
CU.outTx(
- retryTopologySafe(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheSemaphoreState val = semaphoreView.get(key);
+ retryTopologySafe(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheSemaphoreState val = semaphoreView.get(key);
- if (val == null)
- throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
+ if (val == null)
+ throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
- int waiting = val.getWaiters();
- sync.threadMap.put(Thread.currentThread(), waiting);
+ int waiting = val.getWaiters();
- waiting++;
- val.setWaiters(waiting);
- semaphoreView.put(key, val);
- tx.commit();
+ sync.threadMap.put(Thread.currentThread(), waiting);
- return true;
- } catch (Error | Exception e) {
- U.error(log, "Failed to compare and set: " + this, e);
+ waiting++;
- throw e;
- }
+ val.setWaiters(waiting);
+
+ semaphoreView.put(key, val);
+
+ tx.commit();
+
+ return true;
+ }
+ catch (Error | Exception e) {
+ U.error(log, "Failed to compare and set: " + this, e);
+
+ throw e;
}
- }),
- ctx
+ }
+ }),
+ ctx
);
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
@@ -206,44 +224,48 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
protected boolean compareAndSetGlobalState(final int expVal, final int newVal) {
try {
return CU.outTx(
- retryTopologySafe(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheSemaphoreState val = semaphoreView.get(key);
+ retryTopologySafe(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheSemaphoreState val = semaphoreView.get(key);
- if (val == null)
- throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
+ if (val == null)
+ throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
- boolean retVal = val.getCnt() == expVal;
+ boolean retVal = val.getCnt() == expVal;
- if (retVal) {
+ if (retVal) {
/* If current thread is queued, than this call is the call that is going to be unblocked. */
- if(sync.isQueued(Thread.currentThread())) {
-
- int waiting = val.getWaiters() - 1;
- val.setWaiters(waiting);
+ if (sync.isQueued(Thread.currentThread())) {
- sync.threadMap.remove(Thread.currentThread());
- }
+ int waiting = val.getWaiters() - 1;
- val.setCnt(newVal);
+ val.setWaiters(waiting);
- semaphoreView.put(key, val);
- tx.commit();
+ sync.threadMap.remove(Thread.currentThread());
}
- return retVal;
- } catch (Error | Exception e) {
- U.error(log, "Failed to compare and set: " + this, e);
+ val.setCnt(newVal);
+
+ semaphoreView.put(key, val);
- throw e;
+ tx.commit();
}
+
+ return retVal;
+ }
+ catch (Error | Exception e) {
+ U.error(log, "Failed to compare and set: " + this, e);
+
+ throw e;
}
- }),
- ctx
+ }
+ }),
+ ctx
);
- } catch( IgniteCheckedException e){
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
@@ -275,16 +297,17 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
protected int tryAcquireShared(int acquires) {
- for (;;) {
+ for (; ; ) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
+
int remaining = available - acquires;
if (remaining < 0 || compareAndSetGlobalState(available, remaining)) {
- if(remaining < 0){
- if(!threadMap.containsKey(Thread.currentThread()))
+ if (remaining < 0) {
+ if (!threadMap.containsKey(Thread.currentThread()))
getAndIncWaitingCount();
}
return remaining;
@@ -292,7 +315,6 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
-
}
/**
@@ -305,12 +327,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
* @param ctx Cache context.
*/
public GridCacheSemaphoreImpl(String name,
- int initCnt,
- boolean fair,
- GridCacheInternalKey key,
- IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semaphoreView,
- GridCacheContext ctx)
- {
+ int initCnt,
+ boolean fair,
+ GridCacheInternalKey key,
+ IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semaphoreView,
+ GridCacheContext ctx) {
assert name != null;
assert key != null;
assert semaphoreView != null;
@@ -333,28 +354,30 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
if (initGuard.compareAndSet(false, true)) {
try {
sync = CU.outTx(
- retryTopologySafe(new Callable<Sync>() {
- @Override
- public Sync call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheSemaphoreState val = semaphoreView.get(key);
+ retryTopologySafe(new Callable<Sync>() {
+ @Override
+ public Sync call() throws Exception {
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheSemaphoreState val = semaphoreView.get(key);
- if (val == null) {
- if (log.isDebugEnabled())
- log.debug("Failed to find semaphore with given name: " + name);
+ if (val == null) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to find semaphore with given name: " + name);
- return null;
- }
+ return null;
+ }
- final int count = val.getCnt();
- tx.commit();
+ final int count = val.getCnt();
- return val.isFair() ? new FairSync(count) : new NonfairSync(count);
- }
+ tx.commit();
+
+ return val.isFair() ? new FairSync(count) : new NonfairSync(count);
}
- }),
- ctx
+ }
+ }),
+ ctx
);
+
if (log.isDebugEnabled())
log.debug("Initialized internal sync structure: " + sync);
}
@@ -370,17 +393,20 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
-
/** {@inheritDoc} */
@Override public String name() {
return name;
}
/** {@inheritDoc} */
- @Override public GridCacheInternalKey key() { return key; }
+ @Override public GridCacheInternalKey key() {
+ return key;
+ }
/** {@inheritDoc} */
- @Override public boolean removed(){ return rmvd; }
+ @Override public boolean removed() {
+ return rmvd;
+ }
/** {@inheritDoc} */
@Override public boolean onRemoved() {
@@ -389,7 +415,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
/** {@inheritDoc} */
@Override public void onUpdate(GridCacheSemaphoreState val) {
- if(sync == null)
+ if (sync == null)
return;
// Update permission count.
@@ -415,25 +441,28 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
@Override
public void acquire(int permits) throws IgniteInterruptedException {
A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
try {
initializeSemaphore();
- sync.acquireSharedInterruptibly(permits);
- } catch (IgniteCheckedException e) {
+ sync.acquireSharedInterruptibly(permits);
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
}
-
@Override
public void acquireUninterruptibly() {
try {
initializeSemaphore();
- sync.acquireShared(1);
- } catch (IgniteCheckedException e) {
+ sync.acquireShared(1);
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
@@ -443,38 +472,41 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
A.ensure(permits >= 0, "Number of permits must be non-negative.");
try {
initializeSemaphore();
- sync.acquireShared(permits);
- } catch (IgniteCheckedException e) {
+ sync.acquireShared(permits);
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
@Override
- public int availablePermits(){
+ public int availablePermits() {
int ret;
try {
initializeSemaphore();
ret = CU.outTx(
- retryTopologySafe(new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheSemaphoreState val = semaphoreView.get(key);
+ retryTopologySafe(new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheSemaphoreState val = semaphoreView.get(key);
- if (val == null)
- throw new IgniteException("Failed to find semaphore with given name: " + name);
+ if (val == null)
+ throw new IgniteException("Failed to find semaphore with given name: " + name);
- int count = val.getCnt();
- tx.rollback();
+ int count = val.getCnt();
- return count;
- }
+ tx.rollback();
+
+ return count;
}
- }),
- ctx
+ }
+ }),
+ ctx
);
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
return ret;
@@ -484,9 +516,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
public int drainPermits() {
try {
initializeSemaphore();
- return sync.drainPermits();
- } catch (IgniteCheckedException e) {
+ return sync.drainPermits();
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
@@ -495,9 +528,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
public boolean tryAcquire() {
try {
initializeSemaphore();
- return sync.nonfairTryAcquireShared(1) >= 0;
- } catch (IgniteCheckedException e) {
+ return sync.nonfairTryAcquireShared(1) >= 0;
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
@@ -506,11 +540,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
public boolean tryAcquire(long timeout, TimeUnit unit) throws IgniteException {
try {
initializeSemaphore();
- return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
- } catch (IgniteCheckedException e) {
+ return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
}
@@ -523,11 +559,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
@Override
public void release(int permits) {
A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
try {
initializeSemaphore();
- sync.releaseShared(permits);
- } catch (IgniteCheckedException e) {
+ sync.releaseShared(permits);
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
@@ -535,11 +573,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
@Override
public boolean tryAcquire(int permits) {
A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
try {
initializeSemaphore();
- return sync.nonfairTryAcquireShared(permits) >= 0;
- } catch (IgniteCheckedException e) {
+ return sync.nonfairTryAcquireShared(permits) >= 0;
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
@@ -549,11 +589,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
A.ensure(permits >= 0, "Number of permits must be non-negative.");
try {
initializeSemaphore();
- return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
- } catch (IgniteCheckedException e) {
+ return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
}
@@ -567,8 +609,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
public boolean hasQueuedThreads() {
try {
initializeSemaphore();
- return sync.getWaiters()!=0;
- } catch (IgniteCheckedException e) {
+
+ return sync.getWaiters() != 0;
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
@@ -577,8 +621,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
public int getQueueLength() {
try {
initializeSemaphore();
+
return sync.getWaiters();
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
index cf44b7d..a02b7c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
@@ -1,18 +1,14 @@
package org.apache.ignite.internal.processors.datastructures;
-import org.apache.ignite.internal.processors.cache.GridCacheInternal;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-
+import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.util.typedef.internal.S;
/**
* Grid cache semaphore state.
- *
- * @author Vladisav Jelisavcic
*/
public class GridCacheSemaphoreState implements GridCacheInternal, Externalizable, Cloneable {
/** */
@@ -33,7 +29,6 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
*/
private boolean fair;
-
/**
* Constructor.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
deleted file mode 100644
index 689b647..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package org.apache.ignite.internal.processors.datastructures;
-
-import org.apache.ignite.internal.processors.cache.GridCacheInternal;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-
-/**
- * Created by vladisav on 20.9.15..
- */
-public class GridCacheSemaphoreValue implements GridCacheInternal, Externalizable, Cloneable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * Permission count.
- */
- private int cnt;
-
- /**
- * Semaphore ID.
- */
- private long semaphoreId;
-
- /**
- * Constructor.
- *
- * @param cnt Number of permissions.
- * @param
- */
- public GridCacheSemaphoreValue(int cnt, long semaphoreId) {
- this.cnt = cnt;
-
- this.semaphoreId = semaphoreId;
- }
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public GridCacheSemaphoreValue() {
- // No-op.
- }
-
- /**
- * @param cnt New count.
- */
- public void set(int cnt) {
- this.cnt = cnt;
- }
-
- /**
- * @return Current count.
- */
- public int get() {
- return cnt;
- }
-
- /**
- * @return true if number of permissions to be added is positive
- */
- public boolean isRelease(){
- return cnt>0;
- }
-
- /**
- * @return true if permission count should be lowered
- */
- public boolean isAwait(){
- return cnt<0;
- }
-
- /**
- * @return Semaphore ID.
- */
- public long semaphoreId() {
- return semaphoreId;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Object clone() throws CloneNotSupportedException {
- return super.clone();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(cnt);
- out.writeLong(semaphoreId);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void readExternal(ObjectInput in) throws IOException {
- cnt = in.readInt();
- semaphoreId = in.readLong();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public String toString() {
- return S.toString(GridCacheSemaphoreValue.class, this);
- }
-}
\ No newline at end of file
[4/6] ignite git commit: Adds test coverage; Simplifies example;
Posted by yz...@apache.org.
Adds test coverage; Simplifies example;
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8c6852d0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8c6852d0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8c6852d0
Branch: refs/heads/ignite-638
Commit: 8c6852d07481c3943584f676b48b8c938342e640
Parents: 77f9deb
Author: Vladisav Jelisavcic <vl...@gmail.com>
Authored: Thu Oct 15 01:17:02 2015 +0200
Committer: vladisav <vl...@gmail.com>
Committed: Thu Oct 15 01:17:02 2015 +0200
----------------------------------------------------------------------
.../datastructures/IgniteSemaphoreExample.java | 78 +----
.../datastructures/DataStructuresProcessor.java | 26 +-
.../datastructures/GridCacheSemaphoreImpl.java | 12 +-
.../datastructures/GridCacheSemaphoreState.java | 28 +-
.../IgniteClientReconnectAtomicsTest.java | 44 ++-
.../IgniteClientDataStructuresAbstractTest.java | 59 +++-
.../IgniteDataStructureUniqueNameTest.java | 14 +-
.../IgniteSemaphoreAbstractSelfTest.java | 328 +++++++++++++++++++
.../local/IgniteLocalSemaphoreSelfTest.java | 98 ++++++
.../IgnitePartitionedSemaphoreSelfTest.java | 34 ++
.../IgniteReplicatedSemaphoreSelfTest.java | 33 ++
.../cache/GridCacheDataStructuresLoadTest.java | 283 +++++++++-------
12 files changed, 812 insertions(+), 225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
index 5849f5f..ece0ffc 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
@@ -1,10 +1,7 @@
package org.apache.ignite.examples.datastructures;
-import java.util.LinkedList;
-import java.util.Queue;
import java.util.UUID;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.Ignition;
import org.apache.ignite.examples.ExampleNodeStartup;
@@ -43,31 +40,16 @@ public class IgniteSemaphoreExample {
// Make name of semaphore.
final String semaphoreName = UUID.randomUUID().toString();
- // Make name of mutex
- final String mutexName = UUID.randomUUID().toString();
-
- // Make shared resource
- final String resourceName = UUID.randomUUID().toString();
-
- // Get cache view where the resource will be held
- IgniteCache<String, Queue<String>> cache = ignite.getOrCreateCache(CACHE_NAME);
-
- // Put the resource queue the cache
- cache.put(resourceName, new LinkedList<>());
-
// Initialize semaphore.
IgniteSemaphore semaphore = ignite.semaphore(semaphoreName, 0, false, true);
- // Initialize mutex.
- IgniteSemaphore mutex = ignite.semaphore(mutexName, 1, false, true);
-
// Start consumers on all cluster nodes.
for (int i = 0; i < NUM_CONSUMERS; i++)
- ignite.compute().withAsync().run(new Consumer(mutexName, semaphoreName, resourceName));
+ ignite.compute().withAsync().run(new Consumer(semaphoreName));
// Start producers on all cluster nodes.
for (int i = 0; i < NUM_PRODUCERS; i++)
- ignite.compute().withAsync().run(new Producer(mutexName, semaphoreName, resourceName));
+ ignite.compute().withAsync().run(new Producer(semaphoreName));
System.out.println("Master node is waiting for all other nodes to finish...");
@@ -88,21 +70,11 @@ public class IgniteSemaphoreExample {
/** Semaphore name. */
protected final String semaphoreName;
- /** Mutex name. */
- protected final String mutexName;
-
- /** Resource name. */
- protected final String resourceName;
-
/**
- * @param mutexName Mutex name.
* @param semaphoreName Semaphore name.
- * @param resourceName Resource name.
*/
- SemaphoreExampleClosure(String mutexName, String semaphoreName, String resourceName) {
+ SemaphoreExampleClosure(String semaphoreName) {
this.semaphoreName = semaphoreName;
- this.mutexName = mutexName;
- this.resourceName = resourceName;
}
}
@@ -112,33 +84,18 @@ public class IgniteSemaphoreExample {
private static class Producer extends SemaphoreExampleClosure {
/**
- * @param mutexName Mutex name.
* @param semaphoreName Semaphore name.
- * @param resourceName Resource name.
*/
- public Producer(String mutexName, String semaphoreName, String resourceName) {
- super(mutexName, semaphoreName, resourceName);
+ public Producer(String semaphoreName) {
+ super(semaphoreName);
}
/** {@inheritDoc} */
@Override public void run() {
IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
- IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName, 0, true, true);
for (int i = 0; i < ITEM_COUNT; i++) {
- // Mutex is used to access shared resource.
- mutex.acquire();
-
- Queue<String> queue = (Queue<String>)Ignition.ignite().cache(CACHE_NAME).get(resourceName);
-
- queue.add(Ignition.ignite().cluster().localNode().id().toString());
-
- Ignition.ignite().cache(CACHE_NAME).put(resourceName, queue);
-
- System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] produced data. Available: " + semaphore.availablePermits());
-
- // Mutex is released for others to access the resource.
- mutex.release();
+ System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "]. Available: " + semaphore.availablePermits());
// Signals others that shared resource is available.
semaphore.release();
@@ -160,36 +117,21 @@ public class IgniteSemaphoreExample {
private static class Consumer extends SemaphoreExampleClosure {
/**
- * @param mutexName Mutex name.
* @param semaphoreName Semaphore name.
- * @param resourceName Resource name.
*/
- public Consumer(String mutexName, String semaphoreName, String resourceName) {
- super(mutexName, semaphoreName, resourceName);
+ public Consumer(String semaphoreName) {
+ super(semaphoreName);
}
/** {@inheritDoc} */
@Override public void run() {
IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
- IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName, 0, true, true);
for (int i = 0; i < ITEM_COUNT; i++) {
- // Block if queue is empty.
+ // Block if no permits are available.
semaphore.acquire();
- // Mutex is used to access shared resource.
- mutex.acquire();
-
- Queue<String> queue = (Queue<String>)Ignition.ignite().cache(CACHE_NAME).get(resourceName);
-
- String data = queue.remove();
-
- Ignition.ignite().cache(CACHE_NAME).put(resourceName, queue);
-
- System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] consumed data generated by producer [nodeId=" + data + "]");
-
- // Signals others that shared resource is available.
- mutex.release();
+ System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "]. Available: " + semaphore.availablePermits());
}
System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 220dec2..b97b24f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -1195,19 +1195,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
* {@code create} is false.
* @throws IgniteCheckedException If operation failed.
*/
- public IgniteSemaphore semaphore(final String name,
- final int cnt,
- final boolean fair,
- final boolean create)
- throws IgniteCheckedException
- {
+ public IgniteSemaphore semaphore(final String name, final int cnt, final boolean fair, final boolean create)
+ throws IgniteCheckedException {
A.notNull(name, "name");
awaitInitialization();
- if (create)
- A.ensure(cnt >= 0, "count can not be negative");
-
checkAtomicsConfiguration();
startQuery();
@@ -1234,12 +1227,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
return null;
if (val == null) {
- val = new GridCacheSemaphoreState(cnt, 0);
+ val = new GridCacheSemaphoreState(cnt, 0, fair);
dsView.put(key, val);
}
- semaphore = new GridCacheSemaphoreImpl(name, val.getCnt(),
+ semaphore = new GridCacheSemaphoreImpl(name, val.getCount(),
fair,
key,
semaphoreView,
@@ -1278,19 +1271,17 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
awaitInitialization();
removeDataStructure(new IgniteOutClosureX<Void>() {
- @Override
- public Void applyx() throws IgniteCheckedException {
+ @Override public Void applyx() throws IgniteCheckedException {
GridCacheInternal key = new GridCacheInternalKeyImpl(name);
dsCacheCtx.gate().enter();
try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
// Check correctness type of removable object.
- GridCacheSemaphoreState val =
- cast(dsView.get(key), GridCacheSemaphoreState.class);
+ GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class);
if (val != null) {
- if (val.getCnt() < 0) {
+ if (val.getCount() < 0) {
throw new IgniteCheckedException("Failed to remove semaphore " +
"with blocked threads. ");
}
@@ -1318,8 +1309,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException If removing failed or class of object is different to expected class.
*/
private <R> boolean removeInternal(final GridCacheInternal key, final Class<R> cls) throws IgniteCheckedException {
- return CU.outTx(
- new Callable<Boolean>() {
+ return CU.outTx(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
// Check correctness type of removable object.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index d2a966f..78d923a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -261,7 +261,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
if (val == null)
throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
- boolean retVal = val.getCnt() == expVal;
+ boolean retVal = val.getCount() == expVal;
if (retVal) {
/* If current thread is queued, than this call is the call that is going to be unblocked. */
@@ -274,7 +274,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
sync.threadMap.remove(Thread.currentThread());
}
- val.setCnt(newVal);
+ val.setCount(newVal);
semaphoreView.put(key, val);
@@ -396,7 +396,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
return null;
}
- final int count = val.getCnt();
+ final int count = val.getCount();
tx.commit();
@@ -448,7 +448,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
return;
// Update permission count.
- sync.setPermits(val.getCnt());
+ sync.setPermits(val.getCount());
// Update waiters count.
sync.setWaiters(val.getWaiters());
@@ -463,7 +463,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
/** {@inheritDoc} */
- @Override public void acquire() throws IgniteException {
+ @Override public void acquire() throws IgniteInterruptedException {
acquire(1);
}
@@ -523,7 +523,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
if (val == null)
throw new IgniteException("Failed to find semaphore with given name: " + name);
- int count = val.getCnt();
+ int count = val.getCount();
tx.rollback();
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
index e25649e..1109b53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
@@ -17,7 +17,7 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
/**
* Permission count.
*/
- private int cnt;
+ private int count;
/**
* Waiter id.
@@ -32,10 +32,10 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
/**
* Constructor.
*
- * @param cnt Number of permissions.
+ * @param count Number of permissions.
*/
- public GridCacheSemaphoreState(int cnt, int waiters) {
- this.cnt = cnt;
+ public GridCacheSemaphoreState(int count, int waiters) {
+ this.count = count;
this.waiters = waiters;
this.fair = false;
}
@@ -43,10 +43,10 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
/**
* Constructor.
*
- * @param cnt Number of permissions.
+ * @param count Number of permissions.
*/
- public GridCacheSemaphoreState(int cnt, int waiters, boolean fair) {
- this.cnt = cnt;
+ public GridCacheSemaphoreState(int count, int waiters, boolean fair) {
+ this.count = count;
this.waiters = waiters;
this.fair = fair;
}
@@ -59,17 +59,17 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
}
/**
- * @param cnt New count.
+ * @param count New count.
*/
- public void setCnt(int cnt) {
- this.cnt = cnt;
+ public void setCount(int count) {
+ this.count = count;
}
/**
* @return Current count.
*/
- public int getCnt() {
- return cnt;
+ public int getCount() {
+ return count;
}
public int getWaiters() {
@@ -95,7 +95,7 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
* {@inheritDoc}
*/
@Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(cnt);
+ out.writeInt(count);
out.writeInt(waiters);
out.writeBoolean(fair);
}
@@ -104,7 +104,7 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
* {@inheritDoc}
*/
@Override public void readExternal(ObjectInput in) throws IOException {
- cnt = in.readInt();
+ count = in.readInt();
waiters = in.readInt();
fair = in.readBoolean();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index 55dbb57..c46b5c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteAtomicStamped;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.testframework.GridTestUtils;
@@ -675,4 +676,45 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertTrue(srvLatch.await(1000));
assertTrue(clientLatch.await(1000));
}
-}
\ No newline at end of file
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSemaphoreReconnect() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ IgniteSemaphore clientSemaphore = client.semaphore("semaphore1", 3, false, true);
+
+ assertEquals(3, clientSemaphore.availablePermits());
+
+ final IgniteSemaphore srvSemaphore = srv.semaphore("semaphore1", 3, false, false);
+
+ assertEquals(3, srvSemaphore.availablePermits());
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srvSemaphore.acquire();
+ }
+ });
+
+ assertEquals(2, srvSemaphore.availablePermits());
+ assertEquals(2, clientSemaphore.availablePermits());
+
+ srvSemaphore.acquire();
+
+ assertEquals(1, srvSemaphore.availablePermits());
+ assertEquals(1, clientSemaphore.availablePermits());
+
+ clientSemaphore.acquire();
+
+ assertEquals(0, srvSemaphore.availablePermits());
+ assertEquals(0, clientSemaphore.availablePermits());
+
+ assertFalse(srvSemaphore.tryAcquire());
+ assertFalse(srvSemaphore.tryAcquire());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
index 989f75f..bf6dcda 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteAtomicLong;
import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteCountDownLatch;
import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -267,6 +268,62 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA
/**
* @throws Exception If failed.
*/
+ public void testSemaphore() throws Exception {
+ Ignite clientNode = clientIgnite();
+ Ignite srvNode = serverNode();
+
+ testSemaphore(clientNode, srvNode);
+ testSemaphore(srvNode, clientNode);
+ }
+
+ /**
+ * @param creator Creator node.
+ * @param other Other node.
+ * @throws Exception If failed.
+ */
+ private void testSemaphore(Ignite creator, final Ignite other) throws Exception {
+ assertNull(creator.semaphore("semaphore1", 1, true, false));
+ assertNull(other.semaphore("semaphore1", 1, true, false));
+
+ try (IgniteSemaphore semaphore = creator.semaphore("semaphore1", -1, true, true)) {
+ assertNotNull(semaphore);
+
+ assertEquals(-1, semaphore.availablePermits());
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ U.sleep(1000);
+
+ IgniteSemaphore semaphore0 = other.semaphore("semaphore1", -1, true, false);
+
+ assertEquals(-1, semaphore0.availablePermits());
+
+ log.info("Release semaphore.");
+
+ semaphore0.release(2);
+
+ return null;
+ }
+ });
+
+ log.info("Acquire semaphore.");
+
+ assertTrue(semaphore.tryAcquire(1, 5000, TimeUnit.MILLISECONDS));
+
+ log.info("Finished wait.");
+
+ fut.get();
+
+ assertEquals(0, semaphore.availablePermits());
+ }
+
+ assertNull(creator.semaphore("semaphore1", 1, true, false));
+ assertNull(other.semaphore("semaphore1", 1, true, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testQueue() throws Exception {
Ignite clientNode = clientIgnite();
Ignite srvNode = serverNode();
@@ -343,4 +400,4 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA
return ignite;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
index f5305a2..4a21765 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteAtomicStamped;
import org.apache.ignite.IgniteCountDownLatch;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMemoryMode;
@@ -239,7 +240,7 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT
private void testUniqueName(final boolean singleGrid) throws Exception {
final String name = IgniteUuid.randomUuid().toString();
- final int DS_TYPES = 7;
+ final int DS_TYPES = 8;
final int THREADS = DS_TYPES * 3;
@@ -314,6 +315,12 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT
break;
+ case 7:
+ log.info("Create atomic semaphore, grid: " + ignite.name());
+
+ res = ignite.semaphore(name, 0, false, true);
+
+ break;
default:
fail();
@@ -352,7 +359,8 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT
res instanceof IgniteAtomicStamped ||
res instanceof IgniteCountDownLatch ||
res instanceof IgniteQueue ||
- res instanceof IgniteSet);
+ res instanceof IgniteSet ||
+ res instanceof IgniteSemaphore);
log.info("Data structure created: " + dataStructure);
@@ -371,4 +379,4 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT
dataStructure.close();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
new file mode 100644
index 0000000..977a414
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSemaphore;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+
+/**
+ * Cache semaphore self test.
+ */
+public abstract class IgniteSemaphoreAbstractSelfTest extends IgniteAtomicsAbstractTest
+ implements Externalizable {
+ /** */
+ private static final int NODES_CNT = 4;
+
+ /** */
+ protected static final int THREADS_CNT = 5;
+
+ /** */
+ private static final Random RND = new Random();
+
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return NODES_CNT;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSemaphore() throws Exception {
+ checkSemaphore();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkSemaphore() throws Exception {
+ // Test API.
+
+ checkAcquire();
+
+ checkRelease();
+
+ checkFair();
+
+ // Test main functionality.
+ IgniteSemaphore semaphore1 = grid(0).semaphore("semaphore", -2, true, true);
+
+ assertEquals(-2, semaphore1.availablePermits());
+
+ IgniteCompute comp = grid(0).compute().withAsync();
+
+ comp.call(new IgniteCallable<Object>() {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @LoggerResource
+ private IgniteLogger log;
+
+ @Nullable @Override public Object call() throws Exception {
+ // Test semaphore in multiple threads on each node.
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(
+ new Callable<Object>() {
+ @Nullable @Override public Object call() throws Exception {
+ IgniteSemaphore semaphore = ignite.semaphore("semaphore", -2, true, true);
+
+ assert semaphore != null && semaphore.availablePermits() == -2;
+
+ log.info("Thread is going to wait on semaphore: " + Thread.currentThread().getName());
+
+ assert semaphore.tryAcquire(1, 1, MINUTES);
+
+ log.info("Thread is again runnable: " + Thread.currentThread().getName());
+
+ semaphore.release();
+
+ return null;
+ }
+ },
+ 5,
+ "test-thread"
+ );
+
+ fut.get();
+
+ return null;
+ }
+ });
+
+ IgniteFuture<Object> fut = comp.future();
+
+ Thread.sleep(3000);
+
+ semaphore1.release(2);
+
+ assert semaphore1.availablePermits() == 0;
+
+ semaphore1.release(1);
+
+ // Ensure there are no hangs.
+ fut.get();
+
+ // Test operations on removed semaphore.
+ semaphore1.close();
+
+ checkRemovedSemaphore(semaphore1);
+ }
+
+ /**
+ * @param semaphore Semaphore.
+ * @throws Exception If failed.
+ */
+ protected void checkRemovedSemaphore(final IgniteSemaphore semaphore) throws Exception {
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return semaphore.removed();
+ }
+ }, 5000);
+
+ assert semaphore.removed();
+ }
+
+ /**
+ * @throws Exception Exception.
+ */
+ private void checkFair() throws Exception {
+ // Checks only if semaphore is initialized properly
+ IgniteSemaphore semaphore = createSemaphore("rmv", 5, true);
+
+ assert semaphore.isFair();
+
+ removeSemaphore("rmv");
+
+ IgniteSemaphore semaphore1 = createSemaphore("rmv1", 5, false);
+
+ assert !semaphore1.isFair();
+
+ removeSemaphore("rmv1");
+ }
+
+ /**
+ * @throws Exception Exception.
+ */
+ private void checkAcquire() throws Exception {
+ // Check only 'false' cases here. Successful await is tested over the grid.
+ IgniteSemaphore semaphore = createSemaphore("acquire", 5, false);
+
+ assert !semaphore.tryAcquire(10);
+ assert !semaphore.tryAcquire(10, 10, MICROSECONDS);
+
+ removeSemaphore("acquire");
+ }
+
+ /**
+ * @throws Exception Exception.
+ */
+ private void checkRelease() throws Exception {
+ IgniteSemaphore semaphore = createSemaphore("release", 5, false);
+
+ semaphore.release();
+ assert semaphore.availablePermits() == 6;
+
+ semaphore.release(2);
+ assert semaphore.availablePermits() == 8;
+
+ assert semaphore.drainPermits() == 8;
+ assert semaphore.availablePermits() == 0;
+
+ removeSemaphore("release");
+
+ checkRemovedSemaphore(semaphore);
+
+ IgniteSemaphore semaphore2 = createSemaphore("release2", -5, false);
+
+ semaphore2.release();
+
+ assert semaphore2.availablePermits() == -4;
+
+ semaphore2.release(2);
+
+ assert semaphore2.availablePermits() == -2;
+
+ assert semaphore2.drainPermits() == -2;
+
+ assert semaphore2.availablePermits() == 0;
+
+ removeSemaphore("release2");
+
+ checkRemovedSemaphore(semaphore2);
+ }
+
+ /**
+ * @param semaphoreName Semaphore name.
+ * @param numPermissions Initial number of permissions.
+ * @param fair Fairness flag.
+ * @return New semaphore.
+ * @throws Exception If failed.
+ */
+ private IgniteSemaphore createSemaphore(String semaphoreName, int numPermissions, boolean fair)
+ throws Exception {
+ IgniteSemaphore semaphore = grid(RND.nextInt(NODES_CNT)).semaphore(semaphoreName, numPermissions, fair, true);
+
+ // Test initialization.
+ assert semaphoreName.equals(semaphore.name());
+ assert semaphore.availablePermits() == numPermissions;
+ assert semaphore.getQueueLength() == 0;
+ assert semaphore.isFair() == fair;
+
+ return semaphore;
+ }
+
+ /**
+ * @param semaphoreName Semaphore name.
+ * @throws Exception If failed.
+ */
+ private void removeSemaphore(String semaphoreName)
+ throws Exception {
+ IgniteSemaphore semaphore = grid(RND.nextInt(NODES_CNT)).semaphore(semaphoreName, 10, false, true);
+
+ assert semaphore != null;
+
+ if (semaphore.availablePermits() < 0)
+ semaphore.release(-semaphore.availablePermits());
+
+ // Remove semaphore on random node.
+ IgniteSemaphore semaphore0 = grid(RND.nextInt(NODES_CNT)).semaphore(semaphoreName, 0, false, true);
+
+ assertNotNull(semaphore0);
+
+ semaphore0.close();
+
+ // Ensure semaphore is removed on all nodes.
+ for (Ignite g : G.allGrids())
+ assertNull(((IgniteKernal)g).context().dataStructures().semaphore(semaphoreName, 10, true, false));
+
+ checkRemovedSemaphore(semaphore);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSemaphoreMultinode1() throws Exception {
+ if (gridCount() == 1)
+ return;
+
+ IgniteSemaphore semaphore = grid(0).semaphore("s1", 0, true, true);
+
+ List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+ for (int i = 0; i < gridCount(); i++) {
+ final Ignite ignite = grid(i);
+
+ futs.add(GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ IgniteSemaphore semaphore = ignite.semaphore("s1", 0, true, false);
+
+ assertNotNull(semaphore);
+
+ boolean wait = semaphore.tryAcquire(30_000, MILLISECONDS);
+
+ assertTrue(wait);
+
+ return null;
+ }
+ }));
+ }
+
+ for (int i = 0; i < 10; i++)
+ semaphore.release();
+
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.get(30_000);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalSemaphoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalSemaphoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalSemaphoreSelfTest.java
new file mode 100644
index 0000000..a516fc1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalSemaphoreSelfTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures.local;
+
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteSemaphore;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.datastructures.IgniteSemaphoreAbstractSelfTest;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+
+/**
+ *
+ */
+public class IgniteLocalSemaphoreSelfTest extends IgniteSemaphoreAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode atomicsCacheMode() {
+ return LOCAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testSemaphore() throws Exception {
+ // Test main functionality.
+ IgniteSemaphore semaphore = grid(0).semaphore("semaphore", -2, false, true);
+
+ assertNotNull(semaphore);
+
+ assertEquals(-2, semaphore.availablePermits());
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(
+ new Callable<Object>() {
+ @Nullable @Override public Object call() throws Exception {
+ IgniteSemaphore semaphore = grid(0).semaphore("semaphore", -2, false, true);
+
+ assert semaphore != null && semaphore.availablePermits() == -2;
+
+ info("Thread is going to wait on semaphore: " + Thread.currentThread().getName());
+
+ assert semaphore.tryAcquire(1, 1, MINUTES);
+
+ info("Thread is again runnable: " + Thread.currentThread().getName());
+
+ semaphore.release();
+
+ return null;
+ }
+ },
+ THREADS_CNT,
+ "test-thread"
+ );
+
+ Thread.sleep(3000);
+
+ assert semaphore.availablePermits() == -2;
+
+ semaphore.release(2);
+
+ assert semaphore.availablePermits() == 0;
+
+ semaphore.release();
+
+ // Ensure there are no hangs.
+ fut.get();
+
+ // Test operations on removed latch.
+ IgniteSemaphore semaphore0 = grid(0).semaphore("semaphore", 0, false, false);
+
+ assertNotNull(semaphore0);
+
+ semaphore0.close();
+
+ checkRemovedSemaphore(semaphore0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java
new file mode 100644
index 0000000..059e80a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures.partitioned;
+
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.processors.cache.datastructures.IgniteSemaphoreAbstractSelfTest;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class IgnitePartitionedSemaphoreSelfTest extends IgniteSemaphoreAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode atomicsCacheMode() {
+ return PARTITIONED;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedSemaphoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedSemaphoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedSemaphoreSelfTest.java
new file mode 100644
index 0000000..f58754f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedSemaphoreSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures.replicated;
+
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.processors.cache.datastructures.IgniteSemaphoreAbstractSelfTest;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class IgniteReplicatedSemaphoreSelfTest extends IgniteSemaphoreAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode atomicsCacheMode() {
+ return REPLICATED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java
index 2fa8940..d4ca9a5 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCountDownLatch;
import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CollectionConfiguration;
@@ -60,6 +61,9 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
/** Count down latch name. */
private static final String TEST_LATCH_NAME = "test-latch";
+ /** Semaphore name. */
+ private static final String TEST_SEMAPHORE_NAME = "test-semaphore";
+
/** */
private static final CollectionConfiguration colCfg = new CollectionConfiguration();
@@ -69,6 +73,9 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
/** Count down latch initial count. */
private static final int LATCH_INIT_CNT = 1000;
+ /** Semaphore initial count. */
+ private static final int SEMAPHORE_INIT_CNT = 1000;
+
/** */
private static final boolean LONG = false;
@@ -88,6 +95,9 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
private static final boolean LATCH = true;
/** */
+ private static final boolean SEMAPHORE = true;
+
+ /** */
private GridCacheDataStructuresLoadTest() {
// No-op
}
@@ -95,210 +105,247 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
/** Atomic long write closure. */
private final CIX1<Ignite> longWriteClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteAtomicLong al = ignite.atomicLong(TEST_LONG_NAME, 0, true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteAtomicLong al = ignite.atomicLong(TEST_LONG_NAME, 0, true);
- for (int i = 0; i < operationsPerTx; i++) {
- al.addAndGet(RAND.nextInt(MAX_INT));
+ for (int i = 0; i < operationsPerTx; i++) {
+ al.addAndGet(RAND.nextInt(MAX_INT));
- long cnt = writes.incrementAndGet();
+ long cnt = writes.incrementAndGet();
- if (cnt % WRITE_LOG_MOD == 0)
- info("Performed " + cnt + " writes.");
+ if (cnt % WRITE_LOG_MOD == 0)
+ info("Performed " + cnt + " writes.");
+ }
}
- }
- };
+ };
/** Atomic long read closure. */
private final CIX1<Ignite> longReadClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteAtomicLong al = ignite.atomicLong(TEST_LONG_NAME, 0, true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteAtomicLong al = ignite.atomicLong(TEST_LONG_NAME, 0, true);
- for (int i = 0; i < operationsPerTx; i++) {
- al.get();
+ for (int i = 0; i < operationsPerTx; i++) {
+ al.get();
- long cnt = reads.incrementAndGet();
+ long cnt = reads.incrementAndGet();
- if (cnt % READ_LOG_MOD == 0)
- info("Performed " + cnt + " reads.");
+ if (cnt % READ_LOG_MOD == 0)
+ info("Performed " + cnt + " reads.");
+ }
}
- }
- };
+ };
/** Atomic reference write closure. */
private final CIX1<Ignite> refWriteClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteAtomicReference<Integer> ar = ignite.atomicReference(TEST_REF_NAME,
- null, true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteAtomicReference<Integer> ar = ignite.atomicReference(TEST_REF_NAME,
+ null, true);
- for (int i = 0; i < operationsPerTx; i++) {
- ar.set(RAND.nextInt(MAX_INT));
+ for (int i = 0; i < operationsPerTx; i++) {
+ ar.set(RAND.nextInt(MAX_INT));
- long cnt = writes.incrementAndGet();
+ long cnt = writes.incrementAndGet();
- if (cnt % WRITE_LOG_MOD == 0)
- info("Performed " + cnt + " writes.");
+ if (cnt % WRITE_LOG_MOD == 0)
+ info("Performed " + cnt + " writes.");
+ }
}
- }
- };
+ };
/** Atomic reference read closure. */
private final CIX1<Ignite> refReadClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteAtomicReference<Integer> ar = ignite.atomicReference(TEST_REF_NAME, null,
- true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteAtomicReference<Integer> ar = ignite.atomicReference(TEST_REF_NAME, null,
+ true);
- for (int i = 0; i < operationsPerTx; i++) {
- ar.get();
+ for (int i = 0; i < operationsPerTx; i++) {
+ ar.get();
- long cnt = reads.incrementAndGet();
+ long cnt = reads.incrementAndGet();
- if (cnt % READ_LOG_MOD == 0)
- info("Performed " + cnt + " reads.");
+ if (cnt % READ_LOG_MOD == 0)
+ info("Performed " + cnt + " reads.");
+ }
}
- }
- };
+ };
/** Atomic sequence write closure. */
private final CIX1<Ignite> seqWriteClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteAtomicSequence as = ignite.atomicSequence(TEST_SEQ_NAME, 0, true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteAtomicSequence as = ignite.atomicSequence(TEST_SEQ_NAME, 0, true);
- for (int i = 0; i < operationsPerTx; i++) {
- as.addAndGet(RAND.nextInt(MAX_INT) + 1);
+ for (int i = 0; i < operationsPerTx; i++) {
+ as.addAndGet(RAND.nextInt(MAX_INT) + 1);
- long cnt = writes.incrementAndGet();
+ long cnt = writes.incrementAndGet();
- if (cnt % WRITE_LOG_MOD == 0)
- info("Performed " + cnt + " writes.");
+ if (cnt % WRITE_LOG_MOD == 0)
+ info("Performed " + cnt + " writes.");
+ }
}
- }
- };
+ };
/** Atomic sequence read closure. */
private final CIX1<Ignite> seqReadClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteAtomicSequence as = ignite.atomicSequence(TEST_SEQ_NAME, 0, true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteAtomicSequence as = ignite.atomicSequence(TEST_SEQ_NAME, 0, true);
- for (int i = 0; i < operationsPerTx; i++) {
- as.get();
+ for (int i = 0; i < operationsPerTx; i++) {
+ as.get();
- long cnt = reads.incrementAndGet();
+ long cnt = reads.incrementAndGet();
- if (cnt % READ_LOG_MOD == 0)
- info("Performed " + cnt + " reads.");
+ if (cnt % READ_LOG_MOD == 0)
+ info("Performed " + cnt + " reads.");
+ }
}
- }
- };
+ };
/** Atomic stamped write closure. */
private final CIX1<Ignite> stampWriteClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteAtomicStamped<Integer, Integer> as = ignite.atomicStamped(TEST_STAMP_NAME,
- 0, 0, true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteAtomicStamped<Integer, Integer> as = ignite.atomicStamped(TEST_STAMP_NAME,
+ 0, 0, true);
- for (int i = 0; i < operationsPerTx; i++) {
- as.set(RAND.nextInt(MAX_INT), RAND.nextInt(MAX_INT));
+ for (int i = 0; i < operationsPerTx; i++) {
+ as.set(RAND.nextInt(MAX_INT), RAND.nextInt(MAX_INT));
- long cnt = writes.incrementAndGet();
+ long cnt = writes.incrementAndGet();
- if (cnt % WRITE_LOG_MOD == 0)
- info("Performed " + cnt + " writes.");
+ if (cnt % WRITE_LOG_MOD == 0)
+ info("Performed " + cnt + " writes.");
+ }
}
- }
- };
+ };
/** Atomic stamped read closure. */
private final CIX1<Ignite> stampReadClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteAtomicStamped<Integer, Integer> as = ignite.atomicStamped(TEST_STAMP_NAME,
- 0, 0, true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteAtomicStamped<Integer, Integer> as = ignite.atomicStamped(TEST_STAMP_NAME,
+ 0, 0, true);
- for (int i = 0; i < operationsPerTx; i++) {
- as.get();
+ for (int i = 0; i < operationsPerTx; i++) {
+ as.get();
- long cnt = reads.incrementAndGet();
+ long cnt = reads.incrementAndGet();
- if (cnt % READ_LOG_MOD == 0)
- info("Performed " + cnt + " reads.");
+ if (cnt % READ_LOG_MOD == 0)
+ info("Performed " + cnt + " reads.");
+ }
}
- }
- };
+ };
/** Queue write closure. */
private final CIX1<Ignite> queueWriteClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteQueue<Integer> q = ignite.queue(TEST_QUEUE_NAME, 0, colCfg);
+ @Override public void applyx(Ignite ignite) {
+ IgniteQueue<Integer> q = ignite.queue(TEST_QUEUE_NAME, 0, colCfg);
- for (int i = 0; i < operationsPerTx; i++) {
- q.put(RAND.nextInt(MAX_INT));
+ for (int i = 0; i < operationsPerTx; i++) {
+ q.put(RAND.nextInt(MAX_INT));
- long cnt = writes.incrementAndGet();
+ long cnt = writes.incrementAndGet();
- if (cnt % WRITE_LOG_MOD == 0)
- info("Performed " + cnt + " writes.");
+ if (cnt % WRITE_LOG_MOD == 0)
+ info("Performed " + cnt + " writes.");
+ }
}
- }
- };
+ };
/** Queue read closure. */
private final CIX1<Ignite> queueReadClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteQueue<Integer> q = ignite.queue(TEST_QUEUE_NAME, 0, colCfg);
+ @Override public void applyx(Ignite ignite) {
+ IgniteQueue<Integer> q = ignite.queue(TEST_QUEUE_NAME, 0, colCfg);
- for (int i = 0; i < operationsPerTx; i++) {
- q.peek();
+ for (int i = 0; i < operationsPerTx; i++) {
+ q.peek();
- long cnt = reads.incrementAndGet();
+ long cnt = reads.incrementAndGet();
- if (cnt % READ_LOG_MOD == 0)
- info("Performed " + cnt + " reads.");
+ if (cnt % READ_LOG_MOD == 0)
+ info("Performed " + cnt + " reads.");
+ }
}
- }
- };
+ };
/** Count down latch write closure. */
private final CIX1<Ignite> latchWriteClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteCountDownLatch l = ignite.countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, true, true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteCountDownLatch l = ignite.countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, true, true);
- for (int i = 0; i < operationsPerTx; i++) {
- l.countDown();
+ for (int i = 0; i < operationsPerTx; i++) {
+ l.countDown();
- long cnt = writes.incrementAndGet();
+ long cnt = writes.incrementAndGet();
- if (cnt % WRITE_LOG_MOD == 0)
- info("Performed " + cnt + " writes.");
+ if (cnt % WRITE_LOG_MOD == 0)
+ info("Performed " + cnt + " writes.");
+ }
}
- }
- };
+ };
/** Count down latch read closure. */
private final CIX1<Ignite> latchReadClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteCountDownLatch l = ignite.countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, true, true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteCountDownLatch l = ignite.countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, true, true);
- for (int i = 0; i < operationsPerTx; i++) {
- l.count();
+ for (int i = 0; i < operationsPerTx; i++) {
+ l.count();
- long cnt = reads.incrementAndGet();
+ long cnt = reads.incrementAndGet();
- if (cnt % READ_LOG_MOD == 0)
- info("Performed " + cnt + " reads.");
+ if (cnt % READ_LOG_MOD == 0)
+ info("Performed " + cnt + " reads.");
+ }
}
- }
- };
+ };
+
+ /** Semaphore write closure. */
+ private final CIX1<Ignite> semaphoreWriteClos =
+ new CIX1<Ignite>() {
+ @Override public void applyx(Ignite ignite) {
+ IgniteSemaphore s = ignite.semaphore(TEST_SEMAPHORE_NAME, SEMAPHORE_INIT_CNT, false, true);
+
+ for (int i = 0; i < operationsPerTx; i++) {
+ if ((i % 2) == 0)
+ s.release();
+ else
+ s.acquire();
+
+ long cnt = writes.incrementAndGet();
+
+ if (cnt % WRITE_LOG_MOD == 0)
+ info("Performed " + cnt + " writes.");
+ }
+ }
+ };
+
+ /** Semaphore read closure. */
+ private final CIX1<Ignite> semaphoreReadClos =
+ new CIX1<Ignite>() {
+ @Override public void applyx(Ignite ignite) {
+ IgniteSemaphore s = ignite.semaphore(TEST_SEMAPHORE_NAME, SEMAPHORE_INIT_CNT, false, true);
+
+ for (int i = 0; i < operationsPerTx; i++) {
+ s.availablePermits();
+
+ long cnt = reads.incrementAndGet();
+
+ if (cnt % READ_LOG_MOD == 0)
+ info("Performed " + cnt + " reads.");
+ }
+ }
+ };
/**
* @param args Arguments.
@@ -362,6 +409,14 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
test.loadTestIgnite(test.latchWriteClos, test.latchReadClos);
}
+
+ System.gc();
+
+ if (SEMAPHORE) {
+ info("Testing semaphore...");
+
+ test.loadTestIgnite(test.semaphoreWriteClos, test.semaphoreReadClos);
+ }
}
}
@@ -407,7 +462,7 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
@Nullable @Override public Object call() throws Exception {
long start = System.currentTimeMillis();
- while(!done.get()) {
+ while (!done.get()) {
if (tx) {
try (Transaction tx = ignite.transactions().txStart()) {
readClos.apply(ignite);
@@ -447,4 +502,4 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
throw new RuntimeException(e);
}
}
-}
\ No newline at end of file
+}
[3/6] ignite git commit: Fixes formatting issues;
Posted by yz...@apache.org.
Fixes formatting issues;
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/77f9deb7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/77f9deb7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/77f9deb7
Branch: refs/heads/ignite-638
Commit: 77f9deb7f79acb04b73994e83efd4539646073e6
Parents: be332a8
Author: Vladisav Jelisavcic <vl...@gmail.com>
Authored: Sat Oct 3 12:44:02 2015 +0200
Committer: vladisav <vl...@gmail.com>
Committed: Sat Oct 3 12:44:02 2015 +0200
----------------------------------------------------------------------
.../datastructures/GridCacheSemaphoreImpl.java | 158 +++++++++++--------
.../datastructures/GridCacheSemaphoreState.java | 12 +-
2 files changed, 97 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/77f9deb7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index 17efc61..d2a966f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -96,30 +96,61 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
abstract class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
+ /** Thread map. */
protected final ConcurrentMap<Thread, Integer> threadMap;
+
+ /** Total number of threads currently waiting on this semaphore. */
protected int totalWaiters;
- Sync(int permits) {
+ protected Sync(int permits) {
setState(permits);
threadMap = new ConcurrentHashMap<>();
}
+ /**
+ * Sets the estimate of the total current number of threads waiting on this semaphore. This method should only
+ * be called in {@linkplain GridCacheSemaphoreImpl#onUpdate(GridCacheSemaphoreState)}.
+ *
+ * @param waiters Thread count.
+ */
protected synchronized void setWaiters(int waiters) {
totalWaiters = waiters;
}
+ /**
+ * Gets the number of waiting threads.
+ *
+ * @return Number of thread waiting at this semaphore.
+ */
public int getWaiters() {
return totalWaiters;
}
+ /**
+ * Sets the number of permits currently available on this semaphore. This method should only be used in
+ * {@linkplain GridCacheSemaphoreImpl#onUpdate(GridCacheSemaphoreState)}.
+ *
+ * @param permits Number of permits available at this semaphore.
+ */
final synchronized void setPermits(int permits) {
setState(permits);
}
+ /**
+ * Gets the number of permissions currently available.
+ *
+ * @return Number of permits available at this semaphore.
+ */
final int getPermits() {
return getState();
}
+ /**
+ * This method is used by the AQS to test if the current thread should block or not.
+ *
+ * @param acquires Number of permits to acquire.
+ * @return Negative number if thread should block, positive if thread successfully acquires permits.
+ */
final int nonfairTryAcquireShared(int acquires) {
for (; ; ) {
int available = getState();
@@ -137,7 +168,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- protected final boolean tryReleaseShared(int releases) {
+ /** {@inheritDoc} */
+ @Override protected final boolean tryReleaseShared(int releases) {
// Check if some other node updated the state.
// This method is called with release==0 only when trying to wake through update.
if (releases == 0)
@@ -156,20 +188,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- final void reducePermits(int reductions) {
- for (; ; ) {
- int current = getState();
-
- int next = current - reductions;
-
- if (next > current) // underflow
- throw new Error("Permit count underflow");
-
- if (compareAndSetGlobalState(current, next))
- return;
- }
- }
-
+ /**
+ * This method is used internally to implement {@linkplain GridCacheSemaphoreImpl#drainPermits()}.
+ *
+ * @return Number of permits to drain.
+ */
final int drainPermits() {
for (; ; ) {
@@ -180,12 +203,15 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
+ /**
+ * This method is used when thread blocks on this semaphore to synchronize the waiting thread counter across all
+ * nodes.
+ */
protected void getAndIncWaitingCount() {
try {
CU.outTx(
retryTopologySafe(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
+ @Override public Boolean call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheSemaphoreState val = semaphoreView.get(key);
@@ -221,12 +247,14 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
+ /**
+ * This method is used for synchronizing the semaphore state across all nodes.
+ */
protected boolean compareAndSetGlobalState(final int expVal, final int newVal) {
try {
return CU.outTx(
retryTopologySafe(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
+ @Override public Boolean call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheSemaphoreState val = semaphoreView.get(key);
@@ -281,7 +309,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
super(permits);
}
- protected int tryAcquireShared(int acquires) {
+ /** {@inheritDoc} */
+ @Override protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
@@ -296,7 +325,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
super(permits);
}
- protected int tryAcquireShared(int acquires) {
+ /** {@inheritDoc} */
+ @Override protected int tryAcquireShared(int acquires) {
for (; ; ) {
if (hasQueuedPredecessors())
return -1;
@@ -355,8 +385,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
try {
sync = CU.outTx(
retryTopologySafe(new Callable<Sync>() {
- @Override
- public Sync call() throws Exception {
+ @Override public Sync call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheSemaphoreState val = semaphoreView.get(key);
@@ -428,18 +457,18 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
sync.releaseShared(0);
}
- @Override
- public void needCheckNotRemoved() {
+ /** {@inheritDoc} */
+ @Override public void needCheckNotRemoved() {
// No-op.
}
- @Override
- public void acquire() throws IgniteException {
+ /** {@inheritDoc} */
+ @Override public void acquire() throws IgniteException {
acquire(1);
}
- @Override
- public void acquire(int permits) throws IgniteInterruptedException {
+ /** {@inheritDoc} */
+ @Override public void acquire(int permits) throws IgniteInterruptedException {
A.ensure(permits >= 0, "Number of permits must be non-negative.");
try {
@@ -455,8 +484,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public void acquireUninterruptibly() {
+ /** {@inheritDoc} */
+ @Override public void acquireUninterruptibly() {
try {
initializeSemaphore();
@@ -467,8 +496,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public void acquireUninterruptibly(int permits) {
+ /** {@inheritDoc} */
+ @Override public void acquireUninterruptibly(int permits) {
A.ensure(permits >= 0, "Number of permits must be non-negative.");
try {
initializeSemaphore();
@@ -480,15 +509,14 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public int availablePermits() {
+ /** {@inheritDoc} */
+ @Override public int availablePermits() {
int ret;
try {
initializeSemaphore();
ret = CU.outTx(
retryTopologySafe(new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
+ @Override public Integer call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheSemaphoreState val = semaphoreView.get(key);
@@ -512,8 +540,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
return ret;
}
- @Override
- public int drainPermits() {
+ /** {@inheritDoc} */
+ @Override public int drainPermits() {
try {
initializeSemaphore();
@@ -524,8 +552,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public boolean tryAcquire() {
+ /** {@inheritDoc} */
+ @Override public boolean tryAcquire() {
try {
initializeSemaphore();
@@ -536,8 +564,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public boolean tryAcquire(long timeout, TimeUnit unit) throws IgniteException {
+ /** {@inheritDoc} */
+ @Override public boolean tryAcquire(long timeout, TimeUnit unit) throws IgniteException {
try {
initializeSemaphore();
@@ -551,13 +579,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public void release() {
+ /** {@inheritDoc} */
+ @Override public void release() {
release(1);
}
- @Override
- public void release(int permits) {
+ /** {@inheritDoc} */
+ @Override public void release(int permits) {
A.ensure(permits >= 0, "Number of permits must be non-negative.");
try {
@@ -570,8 +598,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public boolean tryAcquire(int permits) {
+ /** {@inheritDoc} */
+ @Override public boolean tryAcquire(int permits) {
A.ensure(permits >= 0, "Number of permits must be non-negative.");
try {
@@ -584,8 +612,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws IgniteInterruptedException {
+ /** {@inheritDoc} */
+ @Override public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws IgniteInterruptedException {
A.ensure(permits >= 0, "Number of permits must be non-negative.");
try {
initializeSemaphore();
@@ -600,13 +628,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public boolean isFair() {
- return false;
+ /** {@inheritDoc} */
+ @Override public boolean isFair() {
+ return isFair;
}
- @Override
- public boolean hasQueuedThreads() {
+ /** {@inheritDoc} */
+ @Override public boolean hasQueuedThreads() {
try {
initializeSemaphore();
@@ -617,8 +645,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public int getQueueLength() {
+ /** {@inheritDoc} */
+ @Override public int getQueueLength() {
try {
initializeSemaphore();
@@ -629,22 +657,22 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(ctx.kernalContext());
out.writeUTF(name);
}
- @Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
IgniteBiTuple<GridKernalContext, String> t = stash.get();
t.set1((GridKernalContext)in.readObject());
t.set2(in.readUTF());
}
- @Override
- public void close() {
+ /** {@inheritDoc} */
+ @Override public void close() {
if (!rmvd) {
try {
ctx.kernalContext().dataStructures().removeSemaphore(name);
http://git-wip-us.apache.org/repos/asf/ignite/blob/77f9deb7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
index a02b7c9..e25649e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
@@ -87,16 +87,14 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
/**
* {@inheritDoc}
*/
- @Override
- public Object clone() throws CloneNotSupportedException {
+ @Override public Object clone() throws CloneNotSupportedException {
return super.clone();
}
/**
* {@inheritDoc}
*/
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(cnt);
out.writeInt(waiters);
out.writeBoolean(fair);
@@ -105,8 +103,7 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
/**
* {@inheritDoc}
*/
- @Override
- public void readExternal(ObjectInput in) throws IOException {
+ @Override public void readExternal(ObjectInput in) throws IOException {
cnt = in.readInt();
waiters = in.readInt();
fair = in.readBoolean();
@@ -115,8 +112,7 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
/**
* {@inheritDoc}
*/
- @Override
- public String toString() {
+ @Override public String toString() {
return S.toString(GridCacheSemaphoreState.class, this);
}
}