You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/10/23 18:10:43 UTC
[2/6] ignite git commit: Fixes formatting issues;
Fixes formatting issues;
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/be332a82
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/be332a82
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/be332a82
Branch: refs/heads/ignite-638
Commit: be332a82711ddd7e9088e00d7f26edd7de407a11
Parents: e9567ad
Author: Vladisav Jelisavcic <vl...@gmail.com>
Authored: Thu Oct 1 20:19:32 2015 +0200
Committer: vladisav <vl...@gmail.com>
Committed: Thu Oct 1 20:19:32 2015 +0200
----------------------------------------------------------------------
.../datastructures/IgniteSemaphoreExample.java | 68 ++--
.../java/org/apache/ignite/IgniteSemaphore.java | 396 +++++++------------
.../datastructures/GridCacheSemaphoreEx.java | 6 +-
.../datastructures/GridCacheSemaphoreImpl.java | 346 +++++++++-------
.../datastructures/GridCacheSemaphoreState.java | 9 +-
.../datastructures/GridCacheSemaphoreValue.java | 115 ------
6 files changed, 396 insertions(+), 544 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
index 2ef242c..5849f5f 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
@@ -1,22 +1,20 @@
package org.apache.ignite.examples.datastructures;
-import org.apache.ignite.*;
-import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.examples.ExampleNodeStartup;
import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSemaphore;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.examples.ExampleNodeStartup;
+import org.apache.ignite.lang.IgniteRunnable;
/**
- * This example demonstrates cache based semaphore.
- * <p>
- * Remote nodes should always be started with special configuration file which
- * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
- * <p>
- * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will
- * start node with {@code examples/config/example-ignite.xml} configuration.
- *
- * @author Vladisav Jelisavcic
+ * This example demonstrates cache based semaphore. <p> Remote nodes should always be started with special configuration
+ * file which enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. <p> Alternatively
+ * you can run {@link ExampleNodeStartup} in another JVM which will start node with {@code
+ * examples/config/example-ignite.xml} configuration.
*/
public class IgniteSemaphoreExample {
/** Cache name. */
@@ -40,7 +38,7 @@ public class IgniteSemaphoreExample {
System.out.println(">>> Cache atomic semaphore example started.");
// Initialize semaphore.
- IgniteSemaphore syncSemaphore = ignite.semaphore(syncName,0,false,true);
+ IgniteSemaphore syncSemaphore = ignite.semaphore(syncName, 0, false, true);
// Make name of semaphore.
final String semaphoreName = UUID.randomUUID().toString();
@@ -50,21 +48,25 @@ public class IgniteSemaphoreExample {
// Make shared resource
final String resourceName = UUID.randomUUID().toString();
+
+ // Get cache view where the resource will be held
IgniteCache<String, Queue<String>> cache = ignite.getOrCreateCache(CACHE_NAME);
+
+ // Put the resource queue the cache
cache.put(resourceName, new LinkedList<>());
// Initialize semaphore.
IgniteSemaphore semaphore = ignite.semaphore(semaphoreName, 0, false, true);
// Initialize mutex.
- IgniteSemaphore mutex = ignite.semaphore(mutexName,1,false,true);
+ IgniteSemaphore mutex = ignite.semaphore(mutexName, 1, false, true);
// Start consumers on all cluster nodes.
for (int i = 0; i < NUM_CONSUMERS; i++)
ignite.compute().withAsync().run(new Consumer(mutexName, semaphoreName, resourceName));
// Start producers on all cluster nodes.
- for(int i = 0; i < NUM_PRODUCERS; i++)
+ for (int i = 0; i < NUM_PRODUCERS; i++)
ignite.compute().withAsync().run(new Producer(mutexName, semaphoreName, resourceName));
System.out.println("Master node is waiting for all other nodes to finish...");
@@ -121,23 +123,33 @@ public class IgniteSemaphoreExample {
/** {@inheritDoc} */
@Override public void run() {
IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
- IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName,0,true,true);
+ IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName, 0, true, true);
- for(int i=0;i<ITEM_COUNT;i++) {
+ for (int i = 0; i < ITEM_COUNT; i++) {
+ // Mutex is used to access shared resource.
mutex.acquire();
- Queue<String> queue = (Queue<String>) Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+ Queue<String> queue = (Queue<String>)Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+
queue.add(Ignition.ignite().cluster().localNode().id().toString());
+
Ignition.ignite().cache(CACHE_NAME).put(resourceName, queue);
+
System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] produced data. Available: " + semaphore.availablePermits());
+ // Mutex is released for others to access the resource.
mutex.release();
+ // Signals others that shared resource is available.
semaphore.release();
}
System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");
- IgniteSemaphore sync = Ignition.ignite().semaphore(syncName, 0, true, true);
+
+ // Gets the syncing semaphore
+ IgniteSemaphore sync = Ignition.ignite().semaphore(syncName, 0, true, true);
+
+ // Signals the master thread
sync.release();
}
}
@@ -159,23 +171,33 @@ public class IgniteSemaphoreExample {
/** {@inheritDoc} */
@Override public void run() {
IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
- IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName,0,true,true);
+ IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName, 0, true, true);
- for(int i=0;i<ITEM_COUNT;i++) {
+ for (int i = 0; i < ITEM_COUNT; i++) {
+ // Block if queue is empty.
semaphore.acquire();
+ // Mutex is used to access shared resource.
mutex.acquire();
- Queue<String> queue = (Queue<String>) Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+ Queue<String> queue = (Queue<String>)Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+
String data = queue.remove();
+
Ignition.ignite().cache(CACHE_NAME).put(resourceName, queue);
+
System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] consumed data generated by producer [nodeId=" + data + "]");
+ // Signals others that shared resource is available.
mutex.release();
}
System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");
- IgniteSemaphore sync = Ignition.ignite().semaphore(syncName, 3, true, true);
+
+ // Gets the syncing semaphore
+ IgniteSemaphore sync = Ignition.ignite().semaphore(syncName, 3, true, true);
+
+ // Signals the master thread
sync.release();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
index 0e29d00..5a4b377 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
@@ -4,17 +4,12 @@ import java.io.Closeable;
import java.util.concurrent.TimeUnit;
/**
- * This interface provides a rich API for working with distributed semaphore.
- * <p>
- * <h1 class="header">Functionality</h1>
- * Distributed semaphore provides functionality similar to {@code java.util.concurrent.Semaphore}.
- * <h1 class="header">Creating Distributed Semaphore</h1>
- * Instance of cache semaphore can be created by calling the following method:
- * {@link Ignite#semaphore(String, int, boolean, boolean)}.
- *
- * @author Vladisav Jelisavcic
+ * This interface provides a rich API for working with distributed semaphore. <p> <h1 class="header">Functionality</h1>
+ * Distributed semaphore provides functionality similar to {@code java.util.concurrent.Semaphore}. <h1
+ * class="header">Creating Distributed Semaphore</h1> Instance of cache semaphore can be created by calling the
+ * following method: {@link Ignite#semaphore(String, int, boolean, boolean)}.
*/
-public interface IgniteSemaphore extends Closeable{
+public interface IgniteSemaphore extends Closeable {
/**
* Gets name of the semaphore.
*
@@ -23,151 +18,110 @@ public interface IgniteSemaphore extends Closeable{
public String name();
/**
- * Acquires a permit from this semaphore, blocking until one is
- * available, or the thread is {@linkplain Thread#interrupt interrupted}.
- *
- * <p>Acquires a permit, if one is available and returns immediately,
- * reducing the number of available permits by one.
- *
- * <p>If no permit is available then the current thread becomes
- * disabled for thread scheduling purposes and lies dormant until
- * one of two things happens:
- * <ul>
- * <li>Some other thread invokes the {@link #release} method for this
- * semaphore and the current thread is next to be assigned a permit; or
- * <li>Some other thread {@linkplain Thread#interrupt interrupts}
- * the current thread.
- * </ul>
- *
- * <p>If the current thread:
- * <ul>
- * <li>has its interrupted status set on entry to this method; or
- * <li>is {@linkplain Thread#interrupt interrupted} while waiting
- * for a permit,
- * </ul>
- * then {@link IgniteInterruptedException} is thrown and the current thread's
- * interrupted status is cleared.
+ * Acquires a permit from this semaphore, blocking until one is available, or the thread is {@linkplain
+ * Thread#interrupt interrupted}.
+ *
+ * <p>Acquires a permit, if one is available and returns immediately, reducing the number of available permits by
+ * one.
+ *
+ * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies
+ * dormant until one of two things happens: <ul> <li>Some other thread invokes the {@link #release} method for this
+ * semaphore and the current thread is next to be assigned a permit; or <li>Some other thread {@linkplain
+ * Thread#interrupt interrupts} the current thread. </ul>
+ *
+ * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+ * Thread#interrupt interrupted} while waiting for a permit, </ul> then {@link IgniteInterruptedException} is thrown
+ * and the current thread's interrupted status is cleared.
*
* @throws IgniteInterruptedException if the current thread is interrupted
*/
public void acquire() throws IgniteInterruptedException;
/**
- * Acquires a permit from this semaphore, blocking until one is
- * available.
- *
- * <p>Acquires a permit, if one is available and returns immediately,
- * reducing the number of available permits by one.
- *
- * <p>If no permit is available then the current thread becomes
- * disabled for thread scheduling purposes and lies dormant until
- * some other thread invokes the {@link #release} method for this
- * semaphore and the current thread is next to be assigned a permit.
- *
- * <p>If the current thread is {@linkplain Thread#interrupt interrupted}
- * while waiting for a permit then it will continue to wait, but the
- * time at which the thread is assigned a permit may change compared to
- * the time it would have received the permit had no interruption
- * occurred. When the thread does return from this method its interrupt
- * status will be set.
+ * Acquires a permit from this semaphore, blocking until one is available.
+ *
+ * <p>Acquires a permit, if one is available and returns immediately, reducing the number of available permits by
+ * one.
+ *
+ * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies
+ * dormant until some other thread invokes the {@link #release} method for this semaphore and the current thread is
+ * next to be assigned a permit.
+ *
+ * <p>If the current thread is {@linkplain Thread#interrupt interrupted} while waiting for a permit then it will
+ * continue to wait, but the time at which the thread is assigned a permit may change compared to the time it would
+ * have received the permit had no interruption occurred. When the thread does return from this method its
+ * interrupt status will be set.
*/
public void acquireUninterruptibly();
/**
- * Acquires a permit from this semaphore, only if one is available at the
- * time of invocation.
- *
- * <p>Acquires a permit, if one is available and returns immediately,
- * with the value {@code true},
- * reducing the number of available permits by one.
- *
- * <p>If no permit is available then this method will return
- * immediately with the value {@code false}.
- *
- * <p>Even when this semaphore has been set to use a
- * fair ordering policy, a call to {@code tryAcquire()} <em>will</em>
- * immediately acquire a permit if one is available, whether or not
- * other threads are currently waiting.
- * This "barging" behavior can be useful in certain
- * circumstances, even though it breaks fairness. If you want to honor
- * the fairness setting, then use
- * {@link #tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit.SECONDS) }
- * which is almost equivalent (it also detects interruption).
- *
- * @return {@code true} if a permit was acquired and {@code false}
- * otherwise
+ * Acquires a permit from this semaphore, only if one is available at the time of invocation.
+ *
+ * <p>Acquires a permit, if one is available and returns immediately, with the value {@code true}, reducing the
+ * number of available permits by one.
+ *
+ * <p>If no permit is available then this method will return immediately with the value {@code false}.
+ *
+ * <p>Even when this semaphore has been set to use a fair ordering policy, a call to {@code tryAcquire()}
+ * <em>will</em> immediately acquire a permit if one is available, whether or not other threads are currently
+ * waiting. This "barging" behavior can be useful in certain circumstances, even though it breaks
+ * fairness. If you want to honor the fairness setting, then use {@link #tryAcquire(long, TimeUnit) tryAcquire(0,
+ * TimeUnit.SECONDS) } which is almost equivalent (it also detects interruption).
+ *
+ * @return {@code true} if a permit was acquired and {@code false} otherwise
*/
public boolean tryAcquire();
/**
- * Acquires a permit from this semaphore, if one becomes available
- * within the given waiting time and the current thread has not
- * been {@linkplain Thread#interrupt interrupted}.
- *
- * <p>Acquires a permit, if one is available and returns immediately,
- * with the value {@code true},
- * reducing the number of available permits by one.
- *
- * <p>If no permit is available then the current thread becomes
- * disabled for thread scheduling purposes and lies dormant until
- * one of three things happens:
- * <ul>
- * <li>Some other thread invokes the {@link #release} method for this
- * semaphore and the current thread is next to be assigned a permit; or
- * <li>Some other thread {@linkplain Thread#interrupt interrupts}
- * the current thread; or
- * <li>The specified waiting time elapses.
- * </ul>
+ * Acquires a permit from this semaphore, if one becomes available within the given waiting time and the current
+ * thread has not been {@linkplain Thread#interrupt interrupted}.
+ *
+ * <p>Acquires a permit, if one is available and returns immediately, with the value {@code true}, reducing the
+ * number of available permits by one.
+ *
+ * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies
+ * dormant until one of three things happens: <ul> <li>Some other thread invokes the {@link #release} method for
+ * this semaphore and the current thread is next to be assigned a permit; or <li>Some other thread {@linkplain
+ * Thread#interrupt interrupts} the current thread; or <li>The specified waiting time elapses. </ul>
*
* <p>If a permit is acquired then the value {@code true} is returned.
*
- * <p>If the current thread:
- * <ul>
- * <li>has its interrupted status set on entry to this method; or
- * <li>is {@linkplain Thread#interrupt interrupted} while waiting
- * to acquire a permit,
- * </ul>
- * then {@link IgniteInterruptedException} is thrown and the current thread's
- * interrupted status is cleared.
+ * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+ * Thread#interrupt interrupted} while waiting to acquire a permit, </ul> then {@link IgniteInterruptedException} is
+ * thrown and the current thread's interrupted status is cleared.
*
- * <p>If the specified waiting time elapses then the value {@code false}
- * is returned. If the time is less than or equal to zero, the method
- * will not wait at all.
+ * <p>If the specified waiting time elapses then the value {@code false} is returned. If the time is less than or
+ * equal to zero, the method will not wait at all.
*
* @param timeout the maximum time to wait for a permit
* @param unit the time unit of the {@code timeout} argument
- * @return {@code true} if a permit was acquired and {@code false}
- * if the waiting time elapsed before a permit was acquired
+ * @return {@code true} if a permit was acquired and {@code false} if the waiting time elapsed before a permit was
+ * acquired
* @throws IgniteInterruptedException if the current thread is interrupted
*/
public boolean tryAcquire(long timeout, TimeUnit unit)
- throws IgniteInterruptedException;
+ throws IgniteInterruptedException;
/**
- * Acquires the given number of permits from this semaphore,
- * blocking until all are available.
+ * Acquires the given number of permits from this semaphore, blocking until all are available.
*
- * <p>Acquires the given number of permits, if they are available,
- * and returns immediately, reducing the number of available permits
- * by the given amount.
+ * <p>Acquires the given number of permits, if they are available, and returns immediately, reducing the number of
+ * available permits by the given amount.
*
- * <p>If insufficient permits are available then the current thread becomes
- * disabled for thread scheduling purposes and lies dormant until
- * some other thread invokes one of the {@link #release() release}
- * methods for this semaphore, the current thread is next to be assigned
- * permits and the number of available permits satisfies this request.
+ * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes
+ * and lies dormant until some other thread invokes one of the {@link #release() release} methods for this
+ * semaphore, the current thread is next to be assigned permits and the number of available permits satisfies this
+ * request.
*
- * <p>If the current thread is {@linkplain Thread#interrupt interrupted}
- * while waiting for permits then it will continue to wait and its
- * position in the queue is not affected. When the thread does return
- * from this method its interrupt status will be set.
+ * <p>If the current thread is {@linkplain Thread#interrupt interrupted} while waiting for permits then it will
+ * continue to wait and its position in the queue is not affected. When the thread does return from this method its
+ * interrupt status will be set.
*
* @param permits the number of permits to acquire
* @throws IllegalArgumentException if {@code permits} is negative
*/
public void acquireUninterruptibly(int permits);
-
/**
* Returns the current number of permits available in this semaphore.
*
@@ -187,131 +141,91 @@ public interface IgniteSemaphore extends Closeable{
/**
* Releases a permit, returning it to the semaphore.
*
- * <p>Releases a permit, increasing the number of available permits by
- * one. If any threads are trying to acquire a permit, then one is
- * selected and given the permit that was just released. That thread
- * is (re)enabled for thread scheduling purposes.
+ * <p>Releases a permit, increasing the number of available permits by one. If any threads are trying to acquire a
+ * permit, then one is selected and given the permit that was just released. That thread is (re)enabled for thread
+ * scheduling purposes.
*
- * <p>There is no requirement that a thread that releases a permit must
- * have acquired that permit by calling {@link #acquire}.
- * Correct usage of a semaphore is established by programming convention
- * in the application.
+ * <p>There is no requirement that a thread that releases a permit must have acquired that permit by calling {@link
+ * #acquire}. Correct usage of a semaphore is established by programming convention in the application.
*/
public void release();
/**
- * Acquires the given number of permits from this semaphore, if all
- * become available within the given waiting time and the current
- * thread has not been {@linkplain Thread#interrupt interrupted}.
+ * Acquires the given number of permits from this semaphore, if all become available within the given waiting time
+ * and the current thread has not been {@linkplain Thread#interrupt interrupted}.
*
- * <p>Acquires the given number of permits, if they are available and
- * returns immediately, with the value {@code true},
- * reducing the number of available permits by the given amount.
- *
- * <p>If insufficient permits are available then
- * the current thread becomes disabled for thread scheduling
- * purposes and lies dormant until one of three things happens:
- * <ul>
- * <li>Some other thread invokes one of the {@link #release() release}
- * methods for this semaphore, the current thread is next to be assigned
- * permits and the number of available permits satisfies this request; or
- * <li>Some other thread {@linkplain Thread#interrupt interrupts}
- * the current thread; or
- * <li>The specified waiting time elapses.
- * </ul>
+ * <p>Acquires the given number of permits, if they are available and returns immediately, with the value {@code
+ * true}, reducing the number of available permits by the given amount.
+ *
+ * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes
+ * and lies dormant until one of three things happens: <ul> <li>Some other thread invokes one of the {@link
+ * #release() release} methods for this semaphore, the current thread is next to be assigned permits and the number
+ * of available permits satisfies this request; or <li>Some other thread {@linkplain Thread#interrupt interrupts}
+ * the current thread; or <li>The specified waiting time elapses. </ul>
*
* <p>If the permits are acquired then the value {@code true} is returned.
*
- * <p>If the current thread:
- * <ul>
- * <li>has its interrupted status set on entry to this method; or
- * <li>is {@linkplain Thread#interrupt interrupted} while waiting
- * to acquire the permits,
- * </ul>
- * then {@link IgniteInterruptedException} is thrown and the current thread's
- * interrupted status is cleared.
- * Any permits that were to be assigned to this thread, are instead
- * assigned to other threads trying to acquire permits, as if
- * the permits had been made available by a call to {@link #release()}.
- *
- * <p>If the specified waiting time elapses then the value {@code false}
- * is returned. If the time is less than or equal to zero, the method
- * will not wait at all. Any permits that were to be assigned to this
- * thread, are instead assigned to other threads trying to acquire
- * permits, as if the permits had been made available by a call to
+ * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+ * Thread#interrupt interrupted} while waiting to acquire the permits, </ul> then {@link IgniteInterruptedException}
+ * is thrown and the current thread's interrupted status is cleared. Any permits that were to be assigned to this
+ * thread, are instead assigned to other threads trying to acquire permits, as if the permits had been made
+ * available by a call to {@link #release()}.
+ *
+ * <p>If the specified waiting time elapses then the value {@code false} is returned. If the time is less than or
+ * equal to zero, the method will not wait at all. Any permits that were to be assigned to this thread, are instead
+ * assigned to other threads trying to acquire permits, as if the permits had been made available by a call to
* {@link #release()}.
*
* @param permits the number of permits to acquire
* @param timeout the maximum time to wait for the permits
* @param unit the time unit of the {@code timeout} argument
- * @return {@code true} if all permits were acquired and {@code false}
- * if the waiting time elapsed before all permits were acquired
+ * @return {@code true} if all permits were acquired and {@code false} if the waiting time elapsed before all
+ * permits were acquired
* @throws IgniteInterruptedException if the current thread is interrupted
* @throws IllegalArgumentException if {@code permits} is negative
*/
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
- throws IgniteInterruptedException;
+ throws IgniteInterruptedException;
/**
- * Acquires the given number of permits from this semaphore, only
- * if all are available at the time of invocation.
- *
- * <p>Acquires the given number of permits, if they are available, and
- * returns immediately, with the value {@code true},
- * reducing the number of available permits by the given amount.
- *
- * <p>If insufficient permits are available then this method will return
- * immediately with the value {@code false} and the number of available
- * permits is unchanged.
- *
- * <p>Even when this semaphore has been set to use a fair ordering
- * policy, a call to {@code tryAcquire} <em>will</em>
- * immediately acquire a permit if one is available, whether or
- * not other threads are currently waiting. This
- * "barging" behavior can be useful in certain
- * circumstances, even though it breaks fairness. If you want to
- * honor the fairness setting, then use {@link #tryAcquire(int,
- * long, TimeUnit) tryAcquire(permits, 0, TimeUnit.SECONDS) }
- * which is almost equivalent (it also detects interruption).
+ * Acquires the given number of permits from this semaphore, only if all are available at the time of invocation.
+ *
+ * <p>Acquires the given number of permits, if they are available, and returns immediately, with the value {@code
+ * true}, reducing the number of available permits by the given amount.
+ *
+ * <p>If insufficient permits are available then this method will return immediately with the value {@code false}
+ * and the number of available permits is unchanged.
+ *
+ * <p>Even when this semaphore has been set to use a fair ordering policy, a call to {@code tryAcquire}
+ * <em>will</em> immediately acquire a permit if one is available, whether or not other threads are currently
+ * waiting. This "barging" behavior can be useful in certain circumstances, even though it breaks
+ * fairness. If you want to honor the fairness setting, then use {@link #tryAcquire(int, long, TimeUnit)
+ * tryAcquire(permits, 0, TimeUnit.SECONDS) } which is almost equivalent (it also detects interruption).
*
* @param permits the number of permits to acquire
- * @return {@code true} if the permits were acquired and
- * {@code false} otherwise
+ * @return {@code true} if the permits were acquired and {@code false} otherwise
* @throws IllegalArgumentException if {@code permits} is negative
*/
public boolean tryAcquire(int permits);
/**
- * Acquires the given number of permits from this semaphore,
- * blocking until all are available,
- * or the thread is {@linkplain Thread#interrupt interrupted}.
- *
- * <p>Acquires the given number of permits, if they are available,
- * and returns immediately, reducing the number of available permits
- * by the given amount.
- *
- * <p>If insufficient permits are available then the current thread becomes
- * disabled for thread scheduling purposes and lies dormant until
- * one of two things happens:
- * <ul>
- * <li>Some other thread invokes one of the {@link #release() release}
- * methods for this semaphore, the current thread is next to be assigned
- * permits and the number of available permits satisfies this request; or
- * <li>Some other thread {@linkplain Thread#interrupt interrupts}
- * the current thread.
- * </ul>
- *
- * <p>If the current thread:
- * <ul>
- * <li>has its interrupted status set on entry to this method; or
- * <li>is {@linkplain Thread#interrupt interrupted} while waiting
- * for a permit,
- * </ul>
- * then {@link IgniteInterruptedException} is thrown and the current thread's
- * interrupted status is cleared.
- * Any permits that were to be assigned to this thread are instead
- * assigned to other threads trying to acquire permits, as if
- * permits had been made available by a call to {@link #release()}.
+ * Acquires the given number of permits from this semaphore, blocking until all are available, or the thread is
+ * {@linkplain Thread#interrupt interrupted}.
+ *
+ * <p>Acquires the given number of permits, if they are available, and returns immediately, reducing the number of
+ * available permits by the given amount.
+ *
+ * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes
+ * and lies dormant until one of two things happens: <ul> <li>Some other thread invokes one of the {@link #release()
+ * release} methods for this semaphore, the current thread is next to be assigned permits and the number of
+ * available permits satisfies this request; or <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+ * current thread. </ul>
+ *
+ * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+ * Thread#interrupt interrupted} while waiting for a permit, </ul> then {@link IgniteInterruptedException} is thrown
+ * and the current thread's interrupted status is cleared. Any permits that were to be assigned to this thread are
+ * instead assigned to other threads trying to acquire permits, as if permits had been made available by a call to
+ * {@link #release()}.
*
* @param permits the number of permits to acquire
* @throws IgniteInterruptedException if the current thread is interrupted
@@ -322,21 +236,16 @@ public interface IgniteSemaphore extends Closeable{
/**
* Releases the given number of permits, returning them to the semaphore.
*
- * <p>Releases the given number of permits, increasing the number of
- * available permits by that amount.
- * If any threads are trying to acquire permits, then one
- * is selected and given the permits that were just released.
- * If the number of available permits satisfies that thread's request
- * then that thread is (re)enabled for thread scheduling purposes;
- * otherwise the thread will wait until sufficient permits are available.
- * If there are still permits available
- * after this thread's request has been satisfied, then those permits
- * are assigned in turn to other threads trying to acquire permits.
- *
- * <p>There is no requirement that a thread that releases a permit must
- * have acquired that permit by calling {@link IgniteSemaphore#acquire acquire}.
- * Correct usage of a semaphore is established by programming convention
- * in the application.
+ * <p>Releases the given number of permits, increasing the number of available permits by that amount. If any
+ * threads are trying to acquire permits, then one is selected and given the permits that were just released. If the
+ * number of available permits satisfies that thread's request then that thread is (re)enabled for thread scheduling
+ * purposes; otherwise the thread will wait until sufficient permits are available. If there are still permits
+ * available after this thread's request has been satisfied, then those permits are assigned in turn to other
+ * threads trying to acquire permits.
+ *
+ * <p>There is no requirement that a thread that releases a permit must have acquired that permit by calling {@link
+ * IgniteSemaphore#acquire acquire}. Correct usage of a semaphore is established by programming convention in the
+ * application.
*
* @param permits the number of permits to release
* @throws IllegalArgumentException if {@code permits} is negative
@@ -351,23 +260,18 @@ public interface IgniteSemaphore extends Closeable{
public boolean isFair();
/**
- * Queries whether any threads are waiting to acquire. Note that
- * because cancellations may occur at any time, a {@code true}
- * return does not guarantee that any other thread will ever
- * acquire. This method is designed primarily for use in
- * monitoring of the system state.
- *
- * @return {@code true} if there may be other threads waiting to
- * acquire the lock
+ * Queries whether any threads are waiting to acquire. Note that because cancellations may occur at any time, a
+ * {@code true} return does not guarantee that any other thread will ever acquire. This method is designed
+ * primarily for use in monitoring of the system state.
+ *
+ * @return {@code true} if there may be other threads waiting to acquire the lock
*/
public boolean hasQueuedThreads();
/**
- * Returns an estimate of the number of threads waiting to acquire.
- * The value is only an estimate because the number of threads may
- * change dynamically while this method traverses internal data
- * structures. This method is designed for use in monitoring of the
- * system state, not for synchronization control.
+ * Returns an estimate of the number of threads waiting to acquire. The value is only an estimate because the number
+ * of threads may change dynamically while this method traverses internal data structures. This method is designed
+ * for use in monitoring of the system state, not for synchronization control.
*
* @return the estimated number of threads waiting for this lock
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
index 0f939d5..8ecbcc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
@@ -3,7 +3,7 @@ package org.apache.ignite.internal.processors.datastructures;
import org.apache.ignite.IgniteSemaphore;
/**
- * Created by vladisav on 20.9.15..
+ * Grid cache semaphore ({@code 'Ex'} stands for external).
*/
public interface GridCacheSemaphoreEx extends IgniteSemaphore, GridCacheRemovable {
/**
@@ -16,7 +16,7 @@ public interface GridCacheSemaphoreEx extends IgniteSemaphore, GridCacheRemovabl
/**
* Callback to notify semaphore on changes.
*
- * @param val Id of the caller and number of permissions to acquire (or release; can be negative).
+ * @param val State containing the number of available permissions.
*/
public void onUpdate(GridCacheSemaphoreState val);
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index 24c3ec5..17efc61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -1,6 +1,20 @@
package org.apache.ignite.internal.processors.datastructures;
-import org.apache.ignite.*;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -11,26 +25,16 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
- * Cache semaphore implementation based on AbstractQueuedSynchronizer.
- * Current implementation supports only unfair and locally fair modes.
- * When fairness set false, this class makes no guarantees about the order in which threads acquire permits.
- * When fairness is set true, the semaphore only guarantees that local threads invoking any of the acquire methods
- * are selected to obtain permits in the order in which their invocation of those methods was processed (FIFO).
- *
- * @author Vladisav Jelisavcic
+ * Cache semaphore implementation based on AbstractQueuedSynchronizer. Current implementation supports only unfair and
+ * locally fair modes. When fairness set false, this class makes no guarantees about the order in which threads acquire
+ * permits. When fairness is set true, the semaphore only guarantees that local threads invoking any of the acquire
+ * methods are selected to obtain permits in the order in which their invocation of those methods was processed (FIFO).
*/
public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Externalizable {
/** */
@@ -38,11 +42,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
/** Deserialization stash. */
private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
- new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
- @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
- return F.t2();
- }
- };
+ new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
+ @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
+ return F.t2();
+ }
+ };
/** Logger. */
private IgniteLogger log;
@@ -50,7 +54,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
/** Semaphore name. */
private String name;
- /** Removed flag.*/
+ /** Removed flag. */
private volatile boolean rmvd;
/** Semaphore key. */
@@ -86,14 +90,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
/**
- * Synchronization implementation for semaphore. Uses AQS state
- * to represent permits. Subclassed into fair and nonfair
- * versions.
+ * Synchronization implementation for semaphore. Uses AQS state to represent permits. Subclassed into fair and
+ * nonfair versions.
*/
abstract class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
- protected ConcurrentMap<Thread,Integer> threadMap;
+ protected final ConcurrentMap<Thread, Integer> threadMap;
protected int totalWaiters;
Sync(int permits) {
@@ -101,7 +104,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
threadMap = new ConcurrentHashMap<>();
}
- protected synchronized void setWaiters(int waiters){
+ protected synchronized void setWaiters(int waiters) {
totalWaiters = waiters;
}
@@ -109,7 +112,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
return totalWaiters;
}
- final synchronized void setPermits(int permits){
+ final synchronized void setPermits(int permits) {
setState(permits);
}
@@ -118,13 +121,14 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
final int nonfairTryAcquireShared(int acquires) {
- for (;;) {
+ for (; ; ) {
int available = getState();
+
int remaining = available - acquires;
if (remaining < 0 || compareAndSetGlobalState(available, remaining)) {
- if(remaining < 0){
- if(!threadMap.containsKey(Thread.currentThread()))
+ if (remaining < 0) {
+ if (!threadMap.containsKey(Thread.currentThread()))
getAndIncWaitingCount();
}
@@ -136,33 +140,41 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
protected final boolean tryReleaseShared(int releases) {
// Check if some other node updated the state.
// This method is called with release==0 only when trying to wake through update.
- if(releases == 0)
+ if (releases == 0)
return true;
- for (;;) {
+ for (; ; ) {
int current = getState();
+
int next = current + releases;
+
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
+
if (compareAndSetGlobalState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
- for (;;) {
+ for (; ; ) {
int current = getState();
+
int next = current - reductions;
+
if (next > current) // underflow
throw new Error("Permit count underflow");
+
if (compareAndSetGlobalState(current, next))
return;
}
}
final int drainPermits() {
- for (;;) {
+ for (; ; ) {
+
int current = getState();
+
if (current == 0 || compareAndSetGlobalState(current, 0))
return current;
}
@@ -171,34 +183,40 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
protected void getAndIncWaitingCount() {
try {
CU.outTx(
- retryTopologySafe(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheSemaphoreState val = semaphoreView.get(key);
+ retryTopologySafe(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheSemaphoreState val = semaphoreView.get(key);
- if (val == null)
- throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
+ if (val == null)
+ throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
- int waiting = val.getWaiters();
- sync.threadMap.put(Thread.currentThread(), waiting);
+ int waiting = val.getWaiters();
- waiting++;
- val.setWaiters(waiting);
- semaphoreView.put(key, val);
- tx.commit();
+ sync.threadMap.put(Thread.currentThread(), waiting);
- return true;
- } catch (Error | Exception e) {
- U.error(log, "Failed to compare and set: " + this, e);
+ waiting++;
- throw e;
- }
+ val.setWaiters(waiting);
+
+ semaphoreView.put(key, val);
+
+ tx.commit();
+
+ return true;
+ }
+ catch (Error | Exception e) {
+ U.error(log, "Failed to compare and set: " + this, e);
+
+ throw e;
}
- }),
- ctx
+ }
+ }),
+ ctx
);
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
@@ -206,44 +224,48 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
protected boolean compareAndSetGlobalState(final int expVal, final int newVal) {
try {
return CU.outTx(
- retryTopologySafe(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheSemaphoreState val = semaphoreView.get(key);
+ retryTopologySafe(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheSemaphoreState val = semaphoreView.get(key);
- if (val == null)
- throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
+ if (val == null)
+ throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
- boolean retVal = val.getCnt() == expVal;
+ boolean retVal = val.getCnt() == expVal;
- if (retVal) {
+ if (retVal) {
/* If current thread is queued, than this call is the call that is going to be unblocked. */
- if(sync.isQueued(Thread.currentThread())) {
-
- int waiting = val.getWaiters() - 1;
- val.setWaiters(waiting);
+ if (sync.isQueued(Thread.currentThread())) {
- sync.threadMap.remove(Thread.currentThread());
- }
+ int waiting = val.getWaiters() - 1;
- val.setCnt(newVal);
+ val.setWaiters(waiting);
- semaphoreView.put(key, val);
- tx.commit();
+ sync.threadMap.remove(Thread.currentThread());
}
- return retVal;
- } catch (Error | Exception e) {
- U.error(log, "Failed to compare and set: " + this, e);
+ val.setCnt(newVal);
+
+ semaphoreView.put(key, val);
- throw e;
+ tx.commit();
}
+
+ return retVal;
+ }
+ catch (Error | Exception e) {
+ U.error(log, "Failed to compare and set: " + this, e);
+
+ throw e;
}
- }),
- ctx
+ }
+ }),
+ ctx
);
- } catch( IgniteCheckedException e){
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
@@ -275,16 +297,17 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
protected int tryAcquireShared(int acquires) {
- for (;;) {
+ for (; ; ) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
+
int remaining = available - acquires;
if (remaining < 0 || compareAndSetGlobalState(available, remaining)) {
- if(remaining < 0){
- if(!threadMap.containsKey(Thread.currentThread()))
+ if (remaining < 0) {
+ if (!threadMap.containsKey(Thread.currentThread()))
getAndIncWaitingCount();
}
return remaining;
@@ -292,7 +315,6 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
-
}
/**
@@ -305,12 +327,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
* @param ctx Cache context.
*/
public GridCacheSemaphoreImpl(String name,
- int initCnt,
- boolean fair,
- GridCacheInternalKey key,
- IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semaphoreView,
- GridCacheContext ctx)
- {
+ int initCnt,
+ boolean fair,
+ GridCacheInternalKey key,
+ IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semaphoreView,
+ GridCacheContext ctx) {
assert name != null;
assert key != null;
assert semaphoreView != null;
@@ -333,28 +354,30 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
if (initGuard.compareAndSet(false, true)) {
try {
sync = CU.outTx(
- retryTopologySafe(new Callable<Sync>() {
- @Override
- public Sync call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheSemaphoreState val = semaphoreView.get(key);
+ retryTopologySafe(new Callable<Sync>() {
+ @Override
+ public Sync call() throws Exception {
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheSemaphoreState val = semaphoreView.get(key);
- if (val == null) {
- if (log.isDebugEnabled())
- log.debug("Failed to find semaphore with given name: " + name);
+ if (val == null) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to find semaphore with given name: " + name);
- return null;
- }
+ return null;
+ }
- final int count = val.getCnt();
- tx.commit();
+ final int count = val.getCnt();
- return val.isFair() ? new FairSync(count) : new NonfairSync(count);
- }
+ tx.commit();
+
+ return val.isFair() ? new FairSync(count) : new NonfairSync(count);
}
- }),
- ctx
+ }
+ }),
+ ctx
);
+
if (log.isDebugEnabled())
log.debug("Initialized internal sync structure: " + sync);
}
@@ -370,17 +393,20 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
-
/** {@inheritDoc} */
@Override public String name() {
return name;
}
/** {@inheritDoc} */
- @Override public GridCacheInternalKey key() { return key; }
+ @Override public GridCacheInternalKey key() {
+ return key;
+ }
/** {@inheritDoc} */
- @Override public boolean removed(){ return rmvd; }
+ @Override public boolean removed() {
+ return rmvd;
+ }
/** {@inheritDoc} */
@Override public boolean onRemoved() {
@@ -389,7 +415,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
/** {@inheritDoc} */
@Override public void onUpdate(GridCacheSemaphoreState val) {
- if(sync == null)
+ if (sync == null)
return;
// Update permission count.
@@ -415,25 +441,28 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
@Override
public void acquire(int permits) throws IgniteInterruptedException {
A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
try {
initializeSemaphore();
- sync.acquireSharedInterruptibly(permits);
- } catch (IgniteCheckedException e) {
+ sync.acquireSharedInterruptibly(permits);
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
}
-
@Override
public void acquireUninterruptibly() {
try {
initializeSemaphore();
- sync.acquireShared(1);
- } catch (IgniteCheckedException e) {
+ sync.acquireShared(1);
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
@@ -443,38 +472,41 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
A.ensure(permits >= 0, "Number of permits must be non-negative.");
try {
initializeSemaphore();
- sync.acquireShared(permits);
- } catch (IgniteCheckedException e) {
+ sync.acquireShared(permits);
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
@Override
- public int availablePermits(){
+ public int availablePermits() {
int ret;
try {
initializeSemaphore();
ret = CU.outTx(
- retryTopologySafe(new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheSemaphoreState val = semaphoreView.get(key);
+ retryTopologySafe(new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheSemaphoreState val = semaphoreView.get(key);
- if (val == null)
- throw new IgniteException("Failed to find semaphore with given name: " + name);
+ if (val == null)
+ throw new IgniteException("Failed to find semaphore with given name: " + name);
- int count = val.getCnt();
- tx.rollback();
+ int count = val.getCnt();
- return count;
- }
+ tx.rollback();
+
+ return count;
}
- }),
- ctx
+ }
+ }),
+ ctx
);
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
return ret;
@@ -484,9 +516,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
public int drainPermits() {
try {
initializeSemaphore();
- return sync.drainPermits();
- } catch (IgniteCheckedException e) {
+ return sync.drainPermits();
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
@@ -495,9 +528,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
public boolean tryAcquire() {
try {
initializeSemaphore();
- return sync.nonfairTryAcquireShared(1) >= 0;
- } catch (IgniteCheckedException e) {
+ return sync.nonfairTryAcquireShared(1) >= 0;
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
@@ -506,11 +540,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
public boolean tryAcquire(long timeout, TimeUnit unit) throws IgniteException {
try {
initializeSemaphore();
- return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
- } catch (IgniteCheckedException e) {
+ return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
}
@@ -523,11 +559,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
@Override
public void release(int permits) {
A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
try {
initializeSemaphore();
- sync.releaseShared(permits);
- } catch (IgniteCheckedException e) {
+ sync.releaseShared(permits);
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
@@ -535,11 +573,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
@Override
public boolean tryAcquire(int permits) {
A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
try {
initializeSemaphore();
- return sync.nonfairTryAcquireShared(permits) >= 0;
- } catch (IgniteCheckedException e) {
+ return sync.nonfairTryAcquireShared(permits) >= 0;
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
@@ -549,11 +589,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
A.ensure(permits >= 0, "Number of permits must be non-negative.");
try {
initializeSemaphore();
- return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
- } catch (IgniteCheckedException e) {
+ return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
}
@@ -567,8 +609,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
public boolean hasQueuedThreads() {
try {
initializeSemaphore();
- return sync.getWaiters()!=0;
- } catch (IgniteCheckedException e) {
+
+ return sync.getWaiters() != 0;
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
@@ -577,8 +621,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
public int getQueueLength() {
try {
initializeSemaphore();
+
return sync.getWaiters();
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
index cf44b7d..a02b7c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
@@ -1,18 +1,14 @@
package org.apache.ignite.internal.processors.datastructures;
-import org.apache.ignite.internal.processors.cache.GridCacheInternal;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-
+import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.util.typedef.internal.S;
/**
* Grid cache semaphore state.
- *
- * @author Vladisav Jelisavcic
*/
public class GridCacheSemaphoreState implements GridCacheInternal, Externalizable, Cloneable {
/** */
@@ -33,7 +29,6 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
*/
private boolean fair;
-
/**
* Constructor.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
deleted file mode 100644
index 689b647..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package org.apache.ignite.internal.processors.datastructures;
-
-import org.apache.ignite.internal.processors.cache.GridCacheInternal;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-
-/**
- * Created by vladisav on 20.9.15..
- */
-public class GridCacheSemaphoreValue implements GridCacheInternal, Externalizable, Cloneable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * Permission count.
- */
- private int cnt;
-
- /**
- * Semaphore ID.
- */
- private long semaphoreId;
-
- /**
- * Constructor.
- *
- * @param cnt Number of permissions.
- * @param
- */
- public GridCacheSemaphoreValue(int cnt, long semaphoreId) {
- this.cnt = cnt;
-
- this.semaphoreId = semaphoreId;
- }
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public GridCacheSemaphoreValue() {
- // No-op.
- }
-
- /**
- * @param cnt New count.
- */
- public void set(int cnt) {
- this.cnt = cnt;
- }
-
- /**
- * @return Current count.
- */
- public int get() {
- return cnt;
- }
-
- /**
- * @return true if number of permissions to be added is positive
- */
- public boolean isRelease(){
- return cnt>0;
- }
-
- /**
- * @return true if permission count should be lowered
- */
- public boolean isAwait(){
- return cnt<0;
- }
-
- /**
- * @return Semaphore ID.
- */
- public long semaphoreId() {
- return semaphoreId;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Object clone() throws CloneNotSupportedException {
- return super.clone();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(cnt);
- out.writeLong(semaphoreId);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void readExternal(ObjectInput in) throws IOException {
- cnt = in.readInt();
- semaphoreId = in.readLong();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public String toString() {
- return S.toString(GridCacheSemaphoreValue.class, this);
- }
-}
\ No newline at end of file