You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/10/23 18:10:42 UTC

[1/6] ignite git commit: Implements distributed semaphore ignite-638

Repository: ignite
Updated Branches:
  refs/heads/ignite-638 [created] 5f38a18e1


Implements distributed semaphore ignite-638

This interface provides a rich API for working with distributed semaphore.
Distributed semaphore provides functionality similar to java.util.concurrent.Semaphore.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e9567ade
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e9567ade
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e9567ade

Branch: refs/heads/ignite-638
Commit: e9567adec4057b0a0528735d4934440f4cd9fee3
Parents: b08e0b2
Author: vladisav <vl...@gmail.com>
Authored: Fri Sep 25 11:14:18 2015 +0200
Committer: vladisav <vl...@gmail.com>
Committed: Thu Oct 1 04:26:48 2015 +0200

----------------------------------------------------------------------
 .../datastructures/IgniteSemaphoreExample.java  | 183 ++++++
 .../src/main/java/org/apache/ignite/Ignite.java |  14 +
 .../java/org/apache/ignite/IgniteSemaphore.java | 390 ++++++++++++
 .../apache/ignite/internal/IgniteKernal.java    |  19 +
 .../datastructures/DataStructuresProcessor.java | 161 ++++-
 .../datastructures/GridCacheSemaphoreEx.java    |  22 +
 .../datastructures/GridCacheSemaphoreImpl.java  | 619 +++++++++++++++++++
 .../datastructures/GridCacheSemaphoreState.java | 128 ++++
 .../datastructures/GridCacheSemaphoreValue.java | 115 ++++
 .../ignite/testframework/junits/IgniteMock.java |  10 +
 .../junits/multijvm/IgniteProcessProxy.java     |   7 +
 .../org/apache/ignite/IgniteSpringBean.java     |  11 +
 12 files changed, 1677 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
new file mode 100644
index 0000000..2ef242c
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
@@ -0,0 +1,183 @@
+package org.apache.ignite.examples.datastructures;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.examples.ExampleNodeStartup;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.UUID;
+
+/**
+ * This example demonstrates cache based semaphore.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-ignite.xml} configuration.
+ *
+ * @author Vladisav Jelisavcic
+ */
+public class IgniteSemaphoreExample {
+    /** Cache name. */
+    private static final String CACHE_NAME = IgniteSemaphoreExample.class.getSimpleName();
+
+    /** Number of items for each producer/consumer to produce/consume. */
+    private static final int ITEM_COUNT = 100;
+
+    /** Number of producers. */
+    private static final int NUM_PRODUCERS = 10;
+
+    /** Number of consumers. */
+    private static final int NUM_CONSUMERS = 10;
+
+    /** Synchronization semaphore name. */
+    private static final String syncName = IgniteSemaphoreExample.class.getSimpleName();
+
+    public static void main(String[] args) {
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println();
+            System.out.println(">>> Cache atomic semaphore example started.");
+
+            // Initialize semaphore.
+            IgniteSemaphore syncSemaphore = ignite.semaphore(syncName,0,false,true);
+
+            // Make name of semaphore.
+            final String semaphoreName = UUID.randomUUID().toString();
+
+            // Make name of mutex
+            final String mutexName = UUID.randomUUID().toString();
+
+            // Make shared resource
+            final String resourceName = UUID.randomUUID().toString();
+            IgniteCache<String, Queue<String>> cache = ignite.getOrCreateCache(CACHE_NAME);
+            cache.put(resourceName, new LinkedList<>());
+
+            // Initialize semaphore.
+            IgniteSemaphore semaphore = ignite.semaphore(semaphoreName, 0, false, true);
+
+            // Initialize mutex.
+            IgniteSemaphore mutex = ignite.semaphore(mutexName,1,false,true);
+
+            // Start consumers on all cluster nodes.
+            for (int i = 0; i < NUM_CONSUMERS; i++)
+                ignite.compute().withAsync().run(new Consumer(mutexName, semaphoreName, resourceName));
+
+            // Start producers on all cluster nodes.
+            for(int i = 0; i < NUM_PRODUCERS; i++)
+                ignite.compute().withAsync().run(new Producer(mutexName, semaphoreName, resourceName));
+
+            System.out.println("Master node is waiting for all other nodes to finish...");
+
+            // Wait for everyone to finish.
+            syncSemaphore.acquire(NUM_CONSUMERS + NUM_PRODUCERS);
+        }
+
+        System.out.flush();
+        System.out.println();
+        System.out.println("Finished semaphore example...");
+        System.out.println("Check all nodes for output (this node is also part of the cluster).");
+    }
+
+    /**
+     * Closure which simply waits on the latch on all nodes.
+     */
+    private abstract static class SemaphoreExampleClosure implements IgniteRunnable {
+        /** Semaphore name. */
+        protected final String semaphoreName;
+
+        /** Mutex name. */
+        protected final String mutexName;
+
+        /** Resource name. */
+        protected final String resourceName;
+
+        /**
+         * @param mutexName Mutex name.
+         * @param semaphoreName Semaphore name.
+         * @param resourceName Resource name.
+         */
+        SemaphoreExampleClosure(String mutexName, String semaphoreName, String resourceName) {
+            this.semaphoreName = semaphoreName;
+            this.mutexName = mutexName;
+            this.resourceName = resourceName;
+        }
+    }
+
+    /**
+     * Closure which simply signals the semaphore.
+     */
+    private static class Producer extends SemaphoreExampleClosure {
+
+        /**
+         * @param mutexName Mutex name.
+         * @param semaphoreName Semaphore name.
+         * @param resourceName Resource name.
+         */
+        public Producer(String mutexName, String semaphoreName, String resourceName) {
+            super(mutexName, semaphoreName, resourceName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
+            IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName,0,true,true);
+
+            for(int i=0;i<ITEM_COUNT;i++) {
+                mutex.acquire();
+
+                Queue<String> queue = (Queue<String>) Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+                queue.add(Ignition.ignite().cluster().localNode().id().toString());
+                Ignition.ignite().cache(CACHE_NAME).put(resourceName, queue);
+                System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] produced data. Available: " + semaphore.availablePermits());
+
+                mutex.release();
+
+                semaphore.release();
+            }
+
+            System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");
+            IgniteSemaphore sync =  Ignition.ignite().semaphore(syncName, 0, true, true);
+            sync.release();
+        }
+    }
+
+    /**
+     * Closure which simply waits on semaphore.
+     */
+    private static class Consumer extends SemaphoreExampleClosure {
+
+        /**
+         * @param mutexName Mutex name.
+         * @param semaphoreName Semaphore name.
+         * @param resourceName Resource name.
+         */
+        public Consumer(String mutexName, String semaphoreName, String resourceName) {
+            super(mutexName, semaphoreName, resourceName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
+            IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName,0,true,true);
+
+            for(int i=0;i<ITEM_COUNT;i++) {
+                semaphore.acquire();
+
+                mutex.acquire();
+
+                Queue<String> queue = (Queue<String>) Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+                String data = queue.remove();
+                Ignition.ignite().cache(CACHE_NAME).put(resourceName, queue);
+                System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] consumed data generated by producer [nodeId=" + data + "]");
+
+                mutex.release();
+            }
+
+            System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");
+            IgniteSemaphore sync =  Ignition.ignite().semaphore(syncName, 3, true, true);
+            sync.release();
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/Ignite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index 0afccd0..dffb126 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -420,6 +420,20 @@ public interface Ignite extends AutoCloseable {
         throws IgniteException;
 
     /**
+     * Gets or creates semaphore. If semaphore is not found in cache and {@code create} flag
+     * is {@code true}, it is created using provided name and count parameter.
+     *
+     * @param name Name of the semaphore.
+     * @param cnt Count for new semaphore creation. Ignored if {@code create} flag is {@code false}.
+     * @param fair {@code True} to enable fairness.
+     * @param create Boolean flag indicating whether data structure should be created if does not exist.
+     * @return Semaphore for the given name.
+     * @throws IgniteException If semaphore could not be fetched or created.
+     */
+    public IgniteSemaphore semaphore(String name, int cnt, boolean fair, boolean create)
+            throws IgniteException;
+
+    /**
      * Will get a named queue from cache and create one if it has not been created yet and {@code cfg} is not
      * {@code null}.
      * If queue is present already, queue properties will not be changed. Use

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
new file mode 100644
index 0000000..0e29d00
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
@@ -0,0 +1,390 @@
+package org.apache.ignite;
+
+import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This interface provides a rich API for working with distributed semaphore.
+ * <p>
+ * <h1 class="header">Functionality</h1>
+ * Distributed semaphore provides functionality similar to {@code java.util.concurrent.Semaphore}.
+ * <h1 class="header">Creating Distributed Semaphore</h1>
+ * Instance of cache semaphore can be created by calling the following method:
+ * {@link Ignite#semaphore(String, int, boolean, boolean)}.
+ *
+ * @author Vladisav Jelisavcic
+ */
+public interface IgniteSemaphore extends Closeable{
+    /**
+     * Gets name of the semaphore.
+     *
+     * @return Name of the semaphore.
+     */
+    public String name();
+
+    /**
+     * Acquires a permit from this semaphore, blocking until one is
+     * available, or the thread is {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately,
+     * reducing the number of available permits by one.
+     *
+     * <p>If no permit is available then the current thread becomes
+     * disabled for thread scheduling purposes and lies dormant until
+     * one of two things happens:
+     * <ul>
+     * <li>Some other thread invokes the {@link #release} method for this
+     * semaphore and the current thread is next to be assigned a permit; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread.
+     * </ul>
+     *
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+     * for a permit,
+     * </ul>
+     * then {@link IgniteInterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
+     *
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     */
+    public void acquire() throws IgniteInterruptedException;
+
+    /**
+     * Acquires a permit from this semaphore, blocking until one is
+     * available.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately,
+     * reducing the number of available permits by one.
+     *
+     * <p>If no permit is available then the current thread becomes
+     * disabled for thread scheduling purposes and lies dormant until
+     * some other thread invokes the {@link #release} method for this
+     * semaphore and the current thread is next to be assigned a permit.
+     *
+     * <p>If the current thread is {@linkplain Thread#interrupt interrupted}
+     * while waiting for a permit then it will continue to wait, but the
+     * time at which the thread is assigned a permit may change compared to
+     * the time it would have received the permit had no interruption
+     * occurred.  When the thread does return from this method its interrupt
+     * status will be set.
+     */
+    public void acquireUninterruptibly();
+
+    /**
+     * Acquires a permit from this semaphore, only if one is available at the
+     * time of invocation.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately,
+     * with the value {@code true},
+     * reducing the number of available permits by one.
+     *
+     * <p>If no permit is available then this method will return
+     * immediately with the value {@code false}.
+     *
+     * <p>Even when this semaphore has been set to use a
+     * fair ordering policy, a call to {@code tryAcquire()} <em>will</em>
+     * immediately acquire a permit if one is available, whether or not
+     * other threads are currently waiting.
+     * This &quot;barging&quot; behavior can be useful in certain
+     * circumstances, even though it breaks fairness. If you want to honor
+     * the fairness setting, then use
+     * {@link #tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit.SECONDS) }
+     * which is almost equivalent (it also detects interruption).
+     *
+     * @return {@code true} if a permit was acquired and {@code false}
+     *         otherwise
+     */
+    public boolean tryAcquire();
+
+    /**
+     * Acquires a permit from this semaphore, if one becomes available
+     * within the given waiting time and the current thread has not
+     * been {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately,
+     * with the value {@code true},
+     * reducing the number of available permits by one.
+     *
+     * <p>If no permit is available then the current thread becomes
+     * disabled for thread scheduling purposes and lies dormant until
+     * one of three things happens:
+     * <ul>
+     * <li>Some other thread invokes the {@link #release} method for this
+     * semaphore and the current thread is next to be assigned a permit; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread; or
+     * <li>The specified waiting time elapses.
+     * </ul>
+     *
+     * <p>If a permit is acquired then the value {@code true} is returned.
+     *
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+     * to acquire a permit,
+     * </ul>
+     * then {@link IgniteInterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
+     *
+     * <p>If the specified waiting time elapses then the value {@code false}
+     * is returned.  If the time is less than or equal to zero, the method
+     * will not wait at all.
+     *
+     * @param timeout the maximum time to wait for a permit
+     * @param unit the time unit of the {@code timeout} argument
+     * @return {@code true} if a permit was acquired and {@code false}
+     *         if the waiting time elapsed before a permit was acquired
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     */
+    public boolean tryAcquire(long timeout, TimeUnit unit)
+            throws IgniteInterruptedException;
+
+    /**
+     * Acquires the given number of permits from this semaphore,
+     * blocking until all are available.
+     *
+     * <p>Acquires the given number of permits, if they are available,
+     * and returns immediately, reducing the number of available permits
+     * by the given amount.
+     *
+     * <p>If insufficient permits are available then the current thread becomes
+     * disabled for thread scheduling purposes and lies dormant until
+     * some other thread invokes one of the {@link #release() release}
+     * methods for this semaphore, the current thread is next to be assigned
+     * permits and the number of available permits satisfies this request.
+     *
+     * <p>If the current thread is {@linkplain Thread#interrupt interrupted}
+     * while waiting for permits then it will continue to wait and its
+     * position in the queue is not affected.  When the thread does return
+     * from this method its interrupt status will be set.
+     *
+     * @param permits the number of permits to acquire
+     * @throws IllegalArgumentException if {@code permits} is negative
+     */
+    public void acquireUninterruptibly(int permits);
+
+
+    /**
+     * Returns the current number of permits available in this semaphore.
+     *
+     * <p>This method is typically used for debugging and testing purposes.
+     *
+     * @return the number of permits available in this semaphore
+     */
+    public int availablePermits();
+
+    /**
+     * Acquires and returns all permits that are immediately available.
+     *
+     * @return the number of permits acquired
+     */
+    public int drainPermits();
+
+    /**
+     * Releases a permit, returning it to the semaphore.
+     *
+     * <p>Releases a permit, increasing the number of available permits by
+     * one.  If any threads are trying to acquire a permit, then one is
+     * selected and given the permit that was just released.  That thread
+     * is (re)enabled for thread scheduling purposes.
+     *
+     * <p>There is no requirement that a thread that releases a permit must
+     * have acquired that permit by calling {@link #acquire}.
+     * Correct usage of a semaphore is established by programming convention
+     * in the application.
+     */
+    public void release();
+
+    /**
+     * Acquires the given number of permits from this semaphore, if all
+     * become available within the given waiting time and the current
+     * thread has not been {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires the given number of permits, if they are available and
+     * returns immediately, with the value {@code true},
+     * reducing the number of available permits by the given amount.
+     *
+     * <p>If insufficient permits are available then
+     * the current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until one of three things happens:
+     * <ul>
+     * <li>Some other thread invokes one of the {@link #release() release}
+     * methods for this semaphore, the current thread is next to be assigned
+     * permits and the number of available permits satisfies this request; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread; or
+     * <li>The specified waiting time elapses.
+     * </ul>
+     *
+     * <p>If the permits are acquired then the value {@code true} is returned.
+     *
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+     * to acquire the permits,
+     * </ul>
+     * then {@link IgniteInterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
+     * Any permits that were to be assigned to this thread, are instead
+     * assigned to other threads trying to acquire permits, as if
+     * the permits had been made available by a call to {@link #release()}.
+     *
+     * <p>If the specified waiting time elapses then the value {@code false}
+     * is returned.  If the time is less than or equal to zero, the method
+     * will not wait at all.  Any permits that were to be assigned to this
+     * thread, are instead assigned to other threads trying to acquire
+     * permits, as if the permits had been made available by a call to
+     * {@link #release()}.
+     *
+     * @param permits the number of permits to acquire
+     * @param timeout the maximum time to wait for the permits
+     * @param unit the time unit of the {@code timeout} argument
+     * @return {@code true} if all permits were acquired and {@code false}
+     *         if the waiting time elapsed before all permits were acquired
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     * @throws IllegalArgumentException if {@code permits} is negative
+     */
+    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
+            throws IgniteInterruptedException;
+
+    /**
+     * Acquires the given number of permits from this semaphore, only
+     * if all are available at the time of invocation.
+     *
+     * <p>Acquires the given number of permits, if they are available, and
+     * returns immediately, with the value {@code true},
+     * reducing the number of available permits by the given amount.
+     *
+     * <p>If insufficient permits are available then this method will return
+     * immediately with the value {@code false} and the number of available
+     * permits is unchanged.
+     *
+     * <p>Even when this semaphore has been set to use a fair ordering
+     * policy, a call to {@code tryAcquire} <em>will</em>
+     * immediately acquire a permit if one is available, whether or
+     * not other threads are currently waiting.  This
+     * &quot;barging&quot; behavior can be useful in certain
+     * circumstances, even though it breaks fairness. If you want to
+     * honor the fairness setting, then use {@link #tryAcquire(int,
+     * long, TimeUnit) tryAcquire(permits, 0, TimeUnit.SECONDS) }
+     * which is almost equivalent (it also detects interruption).
+     *
+     * @param permits the number of permits to acquire
+     * @return {@code true} if the permits were acquired and
+     *         {@code false} otherwise
+     * @throws IllegalArgumentException if {@code permits} is negative
+     */
+    public boolean tryAcquire(int permits);
+
+    /**
+     * Acquires the given number of permits from this semaphore,
+     * blocking until all are available,
+     * or the thread is {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires the given number of permits, if they are available,
+     * and returns immediately, reducing the number of available permits
+     * by the given amount.
+     *
+     * <p>If insufficient permits are available then the current thread becomes
+     * disabled for thread scheduling purposes and lies dormant until
+     * one of two things happens:
+     * <ul>
+     * <li>Some other thread invokes one of the {@link #release() release}
+     * methods for this semaphore, the current thread is next to be assigned
+     * permits and the number of available permits satisfies this request; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread.
+     * </ul>
+     *
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+     * for a permit,
+     * </ul>
+     * then {@link IgniteInterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
+     * Any permits that were to be assigned to this thread are instead
+     * assigned to other threads trying to acquire permits, as if
+     * permits had been made available by a call to {@link #release()}.
+     *
+     * @param permits the number of permits to acquire
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     * @throws IllegalArgumentException if {@code permits} is negative
+     */
+    public void acquire(int permits) throws IgniteInterruptedException;
+
+    /**
+     * Releases the given number of permits, returning them to the semaphore.
+     *
+     * <p>Releases the given number of permits, increasing the number of
+     * available permits by that amount.
+     * If any threads are trying to acquire permits, then one
+     * is selected and given the permits that were just released.
+     * If the number of available permits satisfies that thread's request
+     * then that thread is (re)enabled for thread scheduling purposes;
+     * otherwise the thread will wait until sufficient permits are available.
+     * If there are still permits available
+     * after this thread's request has been satisfied, then those permits
+     * are assigned in turn to other threads trying to acquire permits.
+     *
+     * <p>There is no requirement that a thread that releases a permit must
+     * have acquired that permit by calling {@link IgniteSemaphore#acquire acquire}.
+     * Correct usage of a semaphore is established by programming convention
+     * in the application.
+     *
+     * @param permits the number of permits to release
+     * @throws IllegalArgumentException if {@code permits} is negative
+     */
+    public void release(int permits);
+
+    /**
+     * Returns {@code true} if this semaphore has fairness set true.
+     *
+     * @return {@code true} if this semaphore has fairness set true
+     */
+    public boolean isFair();
+
+    /**
+     * Queries whether any threads are waiting to acquire. Note that
+     * because cancellations may occur at any time, a {@code true}
+     * return does not guarantee that any other thread will ever
+     * acquire.  This method is designed primarily for use in
+     * monitoring of the system state.
+     *
+     * @return {@code true} if there may be other threads waiting to
+     *         acquire the lock
+     */
+    public boolean hasQueuedThreads();
+
+    /**
+     * Returns an estimate of the number of threads waiting to acquire.
+     * The value is only an estimate because the number of threads may
+     * change dynamically while this method traverses internal data
+     * structures.  This method is designed for use in monitoring of the
+     * system state, not for synchronization control.
+     *
+     * @return the estimated number of threads waiting for this lock
+     */
+    public int getQueueLength();
+
+    /**
+     * Gets {@code removed} status of the semaphore.
+     *
+     * @return {@code True} if semaphore was removed from cache, {@code false} otherwise.
+     */
+    public boolean removed();
+
+    /**
+     * Removes this semaphore.
+     *
+     * @throws IgniteException If operation failed.
+     */
+    @Override public void close();
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index c02dc59..756278a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteEvents;
 import org.apache.ignite.IgniteException;
@@ -2890,6 +2891,24 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public IgniteSemaphore semaphore(String name,
+       int cnt,
+       boolean fair,
+       boolean create) {
+        guard();
+
+        try {
+            return ctx.dataStructures().semaphore(name, cnt, fair, create);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public <T> IgniteQueue<T> queue(String name,
         int cap,
         CollectionConfiguration cfg)

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index ef2c543..220dec2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -42,6 +42,7 @@ import org.apache.ignite.IgniteAtomicSequence;
 import org.apache.ignite.IgniteAtomicStamped;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteQueue;
@@ -89,6 +90,7 @@ import static org.apache.ignite.internal.processors.datastructures.DataStructure
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.COUNT_DOWN_LATCH;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.QUEUE;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SET;
+import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SEMAPHORE;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
@@ -131,6 +133,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     /** Cache contains only {@code GridCacheCountDownLatchValue}. */
     private IgniteInternalCache<GridCacheInternalKey, GridCacheCountDownLatchValue> cntDownLatchView;
 
+    /** Cache contains only {@code GridCacheSemaphoreState}. */
+    private IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semaphoreView;
+
     /** Cache contains only {@code GridCacheAtomicReferenceValue}. */
     private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicReferenceValue> atomicRefView;
 
@@ -187,6 +192,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
             cntDownLatchView = atomicsCache;
 
+            semaphoreView = atomicsCache;
+
             atomicLongView = atomicsCache;
 
             atomicRefView = atomicsCache;
@@ -1176,6 +1183,133 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Gets or creates semaphore. If semaphore is not found in cache,
+     * it is created using provided name and count parameter.
+     *
+     * @param name Name of the semaphore.
+     * @param cnt Initial count.
+     * @param fair {@code True} Fairness parameter.
+     * @param create If {@code true} semaphore will be created in case it is not in cache,
+     *      if it is {@code false} all parameters except {@code name} are ignored.
+     * @return Semaphore for the given name or {@code null} if it is not found and
+     *      {@code create} is false.
+     * @throws IgniteCheckedException If operation failed.
+     */
+    public IgniteSemaphore semaphore(final String name,
+                                               final int cnt,
+                                               final boolean fair,
+                                               final boolean create)
+            throws IgniteCheckedException
+    {
+        A.notNull(name, "name");
+
+        awaitInitialization();
+
+        if (create)
+            A.ensure(cnt >= 0, "count can not be negative");
+
+        checkAtomicsConfiguration();
+
+        startQuery();
+
+        return getAtomic(new IgniteOutClosureX<IgniteSemaphore>() {
+            @Override public IgniteSemaphore applyx() throws IgniteCheckedException {
+                GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
+
+                dsCacheCtx.gate().enter();
+
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class);
+
+                    // Check that count down hasn't been created in other thread yet.
+                    GridCacheSemaphoreEx semaphore = cast(dsMap.get(key), GridCacheSemaphoreEx.class);
+
+                    if (semaphore != null) {
+                        assert val != null;
+
+                        return semaphore;
+                    }
+
+                    if (val == null && !create)
+                        return null;
+
+                    if (val == null) {
+                        val = new GridCacheSemaphoreState(cnt, 0);
+
+                        dsView.put(key, val);
+                    }
+
+                    semaphore = new GridCacheSemaphoreImpl(name, val.getCnt(),
+                            fair,
+                            key,
+                            semaphoreView,
+                            dsCacheCtx);
+
+                    dsMap.put(key, semaphore);
+
+                    tx.commit();
+
+                    return semaphore;
+                }
+                catch (Error | Exception e) {
+                    dsMap.remove(key);
+
+                    U.error(log, "Failed to create count down latch: " + name, e);
+
+                    throw e;
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, new DataStructureInfo(name, SEMAPHORE, null), create, GridCacheSemaphoreEx.class);
+    }
+
+    /**
+     * Removes semaphore from cache.
+     *
+     * @param name Name of the semaphore.
+     * @throws IgniteCheckedException If operation failed.
+     */
+    public void removeSemaphore(final String name) throws IgniteCheckedException {
+        assert name != null;
+        assert dsCacheCtx != null;
+
+        awaitInitialization();
+
+        removeDataStructure(new IgniteOutClosureX<Void>() {
+            @Override
+            public Void applyx() throws IgniteCheckedException {
+                GridCacheInternal key = new GridCacheInternalKeyImpl(name);
+
+                dsCacheCtx.gate().enter();
+
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    // Check correctness type of removable object.
+                    GridCacheSemaphoreState val =
+                            cast(dsView.get(key), GridCacheSemaphoreState.class);
+
+                    if (val != null) {
+                        if (val.getCnt() < 0) {
+                            throw new IgniteCheckedException("Failed to remove semaphore " +
+                                    "with blocked threads. ");
+                        }
+
+                        dsView.remove(key);
+
+                        tx.commit();
+                    } else
+                        tx.setRollbackOnly();
+
+                    return null;
+                } finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, name, SEMAPHORE, null);
+    }
+
+    /**
      * Remove internal entry by key from cache.
      *
      * @param key Internal entry key.
@@ -1222,7 +1356,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         /** {@inheritDoc} */
         @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
             if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED)
-                return evt.getValue() instanceof GridCacheCountDownLatchValue;
+                return evt.getValue() instanceof GridCacheCountDownLatchValue ||
+                        evt.getValue() instanceof GridCacheSemaphoreState;
             else {
                 assert evt.getEventType() == EventType.REMOVED : evt;
 
@@ -1297,6 +1432,25 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                                 ", actual=" + latch.getClass() + ", value=" + latch + ']');
                         }
                     }
+                    else if(val0 instanceof GridCacheSemaphoreState) {
+                        GridCacheInternalKey key = evt.getKey();
+
+                        // Notify semaphore on changes.
+                        final GridCacheRemovable semaphore = dsMap.get(key);
+
+                        GridCacheSemaphoreState val = (GridCacheSemaphoreState)val0;
+
+                        if (semaphore instanceof GridCacheSemaphoreEx) {
+                            final GridCacheSemaphoreEx semaphore0 = (GridCacheSemaphoreEx)semaphore;
+
+                            semaphore0.onUpdate(val);
+                        }
+                        else if (semaphore != null) {
+                            U.error(log, "Failed to cast object " +
+                                    "[expected=" + IgniteSemaphore.class.getSimpleName() +
+                                    ", actual=" + semaphore.getClass() + ", value=" + semaphore + ']');
+                        }
+                    }
 
                 }
                 else {
@@ -1514,7 +1668,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         QUEUE(IgniteQueue.class.getSimpleName()),
 
         /** */
-        SET(IgniteSet.class.getSimpleName());
+        SET(IgniteSet.class.getSimpleName()),
+
+        /** */
+        SEMAPHORE(IgniteSemaphore.class.getSimpleName());
 
         /** */
         private static final DataStructureType[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
new file mode 100644
index 0000000..0f939d5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
@@ -0,0 +1,22 @@
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.IgniteSemaphore;
+
+/**
+ * Created by vladisav on 20.9.15..
+ */
+public interface GridCacheSemaphoreEx extends IgniteSemaphore, GridCacheRemovable {
+    /**
+     * Get current semaphore key.
+     *
+     * @return Semaphore key.
+     */
+    public GridCacheInternalKey key();
+
+    /**
+     * Callback to notify semaphore on changes.
+     *
+     * @param val Id of the caller and number of permissions to acquire (or release; can be negative).
+     */
+    public void onUpdate(GridCacheSemaphoreState val);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
new file mode 100644
index 0000000..24c3ec5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -0,0 +1,619 @@
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Cache semaphore implementation based on AbstractQueuedSynchronizer.
+ * Current implementation supports only unfair and locally fair modes.
+ * When fairness set false, this class makes no guarantees about the order in which threads acquire permits.
+ * When fairness is set true, the semaphore only guarantees that local threads invoking any of the acquire methods
+ * are selected to obtain permits in the order in which their invocation of those methods was processed (FIFO).
+ *
+ * @author Vladisav Jelisavcic
+ */
+public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Deserialization stash. */
+    private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
+            new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
+                @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
+                    return F.t2();
+                }
+            };
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Semaphore name. */
+    private String name;
+
+    /** Removed flag.*/
+    private volatile boolean rmvd;
+
+    /** Semaphore key. */
+    private GridCacheInternalKey key;
+
+    /** Semaphore projection. */
+    private IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semaphoreView;
+
+    /** Cache context. */
+    private GridCacheContext ctx;
+
+    /** Fairness flag. */
+    private boolean isFair;
+
+    /** Initial count. */
+    private transient final int initCnt;
+
+    /** Initialization guard. */
+    private final AtomicBoolean initGuard = new AtomicBoolean();
+
+    /** Initialization latch. */
+    private final CountDownLatch initLatch = new CountDownLatch(1);
+
+    /** Internal synchronization object. */
+    private Sync sync;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridCacheSemaphoreImpl() {
+        // No-op.
+        initCnt = 0;
+    }
+
+    /**
+     * Synchronization implementation for semaphore.  Uses AQS state
+     * to represent permits. Subclassed into fair and nonfair
+     * versions.
+     */
+    abstract class Sync extends AbstractQueuedSynchronizer {
+        private static final long serialVersionUID = 1192457210091910933L;
+
+        protected ConcurrentMap<Thread,Integer> threadMap;
+        protected int totalWaiters;
+
+        Sync(int permits) {
+            setState(permits);
+            threadMap = new ConcurrentHashMap<>();
+        }
+
+        protected synchronized void setWaiters(int waiters){
+            totalWaiters = waiters;
+        }
+
+        public int getWaiters() {
+            return totalWaiters;
+        }
+
+        final synchronized void setPermits(int permits){
+            setState(permits);
+        }
+
+        final int getPermits() {
+            return getState();
+        }
+
+        final int nonfairTryAcquireShared(int acquires) {
+            for (;;) {
+                int available = getState();
+                int remaining = available - acquires;
+
+                if (remaining < 0 || compareAndSetGlobalState(available, remaining)) {
+                    if(remaining < 0){
+                        if(!threadMap.containsKey(Thread.currentThread()))
+                            getAndIncWaitingCount();
+                    }
+
+                    return remaining;
+                }
+            }
+        }
+
+        protected final boolean tryReleaseShared(int releases) {
+            // Check if some other node updated the state.
+            // This method is called with release==0 only when trying to wake through update.
+            if(releases == 0)
+                return true;
+
+            for (;;) {
+                int current = getState();
+                int next = current + releases;
+                if (next < current) // overflow
+                    throw new Error("Maximum permit count exceeded");
+                if (compareAndSetGlobalState(current, next))
+                    return true;
+            }
+        }
+
+        final void reducePermits(int reductions) {
+            for (;;) {
+                int current = getState();
+                int next = current - reductions;
+                if (next > current) // underflow
+                    throw new Error("Permit count underflow");
+                if (compareAndSetGlobalState(current, next))
+                    return;
+            }
+        }
+
+        final int drainPermits() {
+            for (;;) {
+                int current = getState();
+                if (current == 0 || compareAndSetGlobalState(current, 0))
+                    return current;
+            }
+        }
+
+        protected void getAndIncWaitingCount() {
+            try {
+                CU.outTx(
+                        retryTopologySafe(new Callable<Boolean>() {
+                            @Override
+                            public Boolean call() throws Exception {
+                                try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+                                    GridCacheSemaphoreState val = semaphoreView.get(key);
+
+                                    if (val == null)
+                                        throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
+
+                                    int waiting = val.getWaiters();
+                                    sync.threadMap.put(Thread.currentThread(), waiting);
+
+                                    waiting++;
+                                    val.setWaiters(waiting);
+                                    semaphoreView.put(key, val);
+                                    tx.commit();
+
+                                    return true;
+                                } catch (Error | Exception e) {
+                                    U.error(log, "Failed to compare and set: " + this, e);
+
+                                    throw e;
+                                }
+                            }
+                        }),
+                        ctx
+                );
+            } catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+        }
+
+        protected boolean compareAndSetGlobalState(final int expVal, final int newVal) {
+            try {
+                return CU.outTx(
+                        retryTopologySafe(new Callable<Boolean>() {
+                            @Override
+                            public Boolean call() throws Exception {
+                                try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+                                    GridCacheSemaphoreState val = semaphoreView.get(key);
+
+                                    if (val == null)
+                                        throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
+
+                                    boolean retVal = val.getCnt() == expVal;
+
+                                    if (retVal) {
+                                        /* If current thread is queued, than this call is the call that is going to be unblocked. */
+                                        if(sync.isQueued(Thread.currentThread())) {
+
+                                            int waiting = val.getWaiters() - 1;
+                                            val.setWaiters(waiting);
+
+                                            sync.threadMap.remove(Thread.currentThread());
+                                        }
+
+                                        val.setCnt(newVal);
+
+                                        semaphoreView.put(key, val);
+                                        tx.commit();
+                                    }
+
+                                    return retVal;
+                                } catch (Error | Exception e) {
+                                    U.error(log, "Failed to compare and set: " + this, e);
+
+                                    throw e;
+                                }
+                            }
+                        }),
+                        ctx
+                );
+            } catch( IgniteCheckedException e){
+                throw U.convertException(e);
+            }
+        }
+    }
+
+    /**
+     * NonFair version
+     */
+    final class NonfairSync extends Sync {
+        private static final long serialVersionUID = 7983135489326435495L;
+
+        NonfairSync(int permits) {
+            super(permits);
+        }
+
+        protected int tryAcquireShared(int acquires) {
+            return nonfairTryAcquireShared(acquires);
+        }
+    }
+
+    /**
+     * Fair version
+     */
+    final class FairSync extends Sync {
+        private static final long serialVersionUID = 3468129658421667L;
+
+        FairSync(int permits) {
+            super(permits);
+        }
+
+        protected int tryAcquireShared(int acquires) {
+            for (;;) {
+                if (hasQueuedPredecessors())
+                    return -1;
+
+                int available = getState();
+                int remaining = available - acquires;
+
+                if (remaining < 0 || compareAndSetGlobalState(available, remaining)) {
+                    if(remaining < 0){
+                        if(!threadMap.containsKey(Thread.currentThread()))
+                            getAndIncWaitingCount();
+                    }
+                    return remaining;
+                }
+            }
+        }
+
+
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param name Semaphore name.
+     * @param initCnt Initial count.
+     * @param key Semaphore key.
+     * @param semaphoreView Semaphore projection.
+     * @param ctx Cache context.
+     */
+    public GridCacheSemaphoreImpl(String name,
+                                       int initCnt,
+                                       boolean fair,
+                                       GridCacheInternalKey key,
+                                       IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semaphoreView,
+                                       GridCacheContext ctx)
+    {
+        assert name != null;
+        assert key != null;
+        assert semaphoreView != null;
+        assert ctx != null;
+
+        this.name = name;
+        this.initCnt = initCnt;
+        this.key = key;
+        this.semaphoreView = semaphoreView;
+        this.ctx = ctx;
+        this.isFair = fair;
+
+        log = ctx.logger(getClass());
+    }
+
+    /**
+     * @throws IgniteCheckedException If operation failed.
+     */
+    private void initializeSemaphore() throws IgniteCheckedException {
+        if (initGuard.compareAndSet(false, true)) {
+            try {
+                sync = CU.outTx(
+                        retryTopologySafe(new Callable<Sync>() {
+                            @Override
+                            public Sync call() throws Exception {
+                                try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+                                    GridCacheSemaphoreState val = semaphoreView.get(key);
+
+                                    if (val == null) {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Failed to find semaphore with given name: " + name);
+
+                                        return null;
+                                    }
+
+                                    final int count = val.getCnt();
+                                    tx.commit();
+
+                                    return val.isFair() ? new FairSync(count) : new NonfairSync(count);
+                                }
+                            }
+                        }),
+                        ctx
+                );
+                if (log.isDebugEnabled())
+                    log.debug("Initialized internal sync structure: " + sync);
+            }
+            finally {
+                initLatch.countDown();
+            }
+        }
+        else {
+            U.await(initLatch);
+
+            if (sync == null)
+                throw new IgniteCheckedException("Internal latch has not been properly initialized.");
+        }
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheInternalKey key() { return key; }
+
+    /** {@inheritDoc} */
+    @Override public boolean removed(){ return rmvd; }
+
+    /** {@inheritDoc} */
+    @Override public boolean onRemoved() {
+        return rmvd = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onUpdate(GridCacheSemaphoreState val) {
+        if(sync == null)
+            return;
+
+        // Update permission count.
+        sync.setPermits(val.getCnt());
+
+        // Update waiters count.
+        sync.setWaiters(val.getWaiters());
+
+        // Try to notify any waiting threads.
+        sync.releaseShared(0);
+    }
+
+    @Override
+    public void needCheckNotRemoved() {
+        // No-op.
+    }
+
+    @Override
+    public void acquire() throws IgniteException {
+        acquire(1);
+    }
+
+    @Override
+    public void acquire(int permits) throws IgniteInterruptedException {
+        A.ensure(permits >= 0, "Number of permits must be non-negative.");
+        try {
+            initializeSemaphore();
+            sync.acquireSharedInterruptibly(permits);
+
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        } catch (InterruptedException e) {
+            throw new IgniteInterruptedException(e);
+        }
+    }
+
+
+    @Override
+    public void acquireUninterruptibly() {
+        try {
+            initializeSemaphore();
+            sync.acquireShared(1);
+
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    @Override
+    public void acquireUninterruptibly(int permits) {
+        A.ensure(permits >= 0, "Number of permits must be non-negative.");
+        try {
+            initializeSemaphore();
+            sync.acquireShared(permits);
+
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    @Override
+    public int availablePermits(){
+        int ret;
+        try {
+            initializeSemaphore();
+            ret = CU.outTx(
+                    retryTopologySafe(new Callable<Integer>() {
+                        @Override
+                        public Integer call() throws Exception {
+                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+                                GridCacheSemaphoreState val = semaphoreView.get(key);
+
+                                if (val == null)
+                                    throw new IgniteException("Failed to find semaphore with given name: " + name);
+
+                                int count = val.getCnt();
+                                tx.rollback();
+
+                                return count;
+                            }
+                        }
+                    }),
+                    ctx
+            );
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        return ret;
+    }
+
+    @Override
+    public int drainPermits() {
+        try {
+            initializeSemaphore();
+            return sync.drainPermits();
+
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    @Override
+    public boolean tryAcquire() {
+        try {
+            initializeSemaphore();
+            return sync.nonfairTryAcquireShared(1) >= 0;
+
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    @Override
+    public boolean tryAcquire(long timeout, TimeUnit unit) throws IgniteException {
+        try {
+            initializeSemaphore();
+            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        } catch (InterruptedException e) {
+            throw new IgniteInterruptedException(e);
+        }
+    }
+
+    @Override
+    public void release() {
+        release(1);
+    }
+
+    @Override
+    public void release(int permits) {
+        A.ensure(permits >= 0, "Number of permits must be non-negative.");
+        try {
+            initializeSemaphore();
+            sync.releaseShared(permits);
+
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    @Override
+    public boolean tryAcquire(int permits) {
+        A.ensure(permits >= 0, "Number of permits must be non-negative.");
+        try {
+            initializeSemaphore();
+            return sync.nonfairTryAcquireShared(permits) >= 0;
+
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    @Override
+    public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws IgniteInterruptedException {
+        A.ensure(permits >= 0, "Number of permits must be non-negative.");
+        try {
+            initializeSemaphore();
+            return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
+
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        } catch (InterruptedException e) {
+            throw new IgniteInterruptedException(e);
+        }
+    }
+
+    @Override
+    public boolean isFair() {
+        return false;
+    }
+
+    @Override
+    public boolean hasQueuedThreads() {
+        try {
+            initializeSemaphore();
+            return sync.getWaiters()!=0;
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    @Override
+    public int getQueueLength() {
+        try {
+            initializeSemaphore();
+            return sync.getWaiters();
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(ctx.kernalContext());
+        out.writeUTF(name);
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        IgniteBiTuple<GridKernalContext, String> t = stash.get();
+
+        t.set1((GridKernalContext)in.readObject());
+        t.set2(in.readUTF());
+    }
+
+    @Override
+    public void close() {
+        if (!rmvd) {
+            try {
+                ctx.kernalContext().dataStructures().removeSemaphore(name);
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+        }
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheSemaphoreImpl.class, this);
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
new file mode 100644
index 0000000..cf44b7d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
@@ -0,0 +1,128 @@
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+
+/**
+ * Grid cache semaphore state.
+ *
+ * @author Vladisav Jelisavcic
+ */
+public class GridCacheSemaphoreState implements GridCacheInternal, Externalizable, Cloneable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Permission count.
+     */
+    private int cnt;
+
+    /**
+     * Waiter id.
+     */
+    private int waiters;
+
+    /**
+     * Fairness flag.
+     */
+    private boolean fair;
+
+
+    /**
+     * Constructor.
+     *
+     * @param cnt Number of permissions.
+     */
+    public GridCacheSemaphoreState(int cnt, int waiters) {
+        this.cnt = cnt;
+        this.waiters = waiters;
+        this.fair = false;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cnt Number of permissions.
+     */
+    public GridCacheSemaphoreState(int cnt, int waiters, boolean fair) {
+        this.cnt = cnt;
+        this.waiters = waiters;
+        this.fair = fair;
+    }
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridCacheSemaphoreState() {
+        // No-op.
+    }
+
+    /**
+     * @param cnt New count.
+     */
+    public void setCnt(int cnt) {
+        this.cnt = cnt;
+    }
+
+    /**
+     * @return Current count.
+     */
+    public int getCnt() {
+        return cnt;
+    }
+
+    public int getWaiters() {
+        return waiters;
+    }
+
+    public void setWaiters(int id) {
+        this.waiters = id;
+    }
+
+    public boolean isFair() {
+        return fair;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(cnt);
+        out.writeInt(waiters);
+        out.writeBoolean(fair);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void readExternal(ObjectInput in) throws IOException {
+        cnt = in.readInt();
+        waiters = in.readInt();
+        fair = in.readBoolean();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String toString() {
+        return S.toString(GridCacheSemaphoreState.class, this);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
new file mode 100644
index 0000000..689b647
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
@@ -0,0 +1,115 @@
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Created by vladisav on 20.9.15..
+ */
+public class GridCacheSemaphoreValue implements GridCacheInternal, Externalizable, Cloneable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Permission count.
+     */
+    private int cnt;
+
+    /**
+     * Semaphore ID.
+     */
+    private long semaphoreId;
+
+    /**
+     * Constructor.
+     *
+     * @param cnt Number of permissions.
+     * @param
+     */
+    public GridCacheSemaphoreValue(int cnt, long semaphoreId) {
+        this.cnt = cnt;
+
+        this.semaphoreId = semaphoreId;
+    }
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridCacheSemaphoreValue() {
+        // No-op.
+    }
+
+    /**
+     * @param cnt New count.
+     */
+    public void set(int cnt) {
+        this.cnt = cnt;
+    }
+
+    /**
+     * @return Current count.
+     */
+    public int get() {
+        return cnt;
+    }
+
+    /**
+     * @return true if number of permissions to be added is positive
+     */
+    public boolean isRelease(){
+        return cnt>0;
+    }
+
+    /**
+     * @return true if permission count should be lowered
+     */
+    public boolean isAwait(){
+        return cnt<0;
+    }
+
+    /**
+     * @return Semaphore ID.
+     */
+    public long semaphoreId() {
+        return semaphoreId;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(cnt);
+        out.writeLong(semaphoreId);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void readExternal(ObjectInput in) throws IOException {
+        cnt = in.readInt();
+        semaphoreId = in.readLong();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String toString() {
+        return S.toString(GridCacheSemaphoreValue.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
index 964753d..b633a6a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCluster;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteEvents;
 import org.apache.ignite.IgniteFileSystem;
@@ -308,6 +309,15 @@ public class IgniteMock implements Ignite {
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public IgniteSemaphore semaphore(String name,
+        int cnt,
+        boolean fair,
+        boolean create)
+    {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public <T> IgniteQueue<T> queue(String name,
         int cap,
         CollectionConfiguration cfg)

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
index ec7dab7..495fd6b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
@@ -34,6 +34,7 @@ import org.apache.ignite.IgniteAtomicStamped;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteEvents;
 import org.apache.ignite.IgniteException;
@@ -536,6 +537,12 @@ public class IgniteProcessProxy implements IgniteEx {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteSemaphore semaphore(String name, int cnt, boolean fair,
+        boolean create) throws IgniteException {
+        throw new UnsupportedOperationException("Operation isn't supported yet.");
+    }
+
+    /** {@inheritDoc} */
     @Override public <T> IgniteQueue<T> queue(String name, int cap,
         @Nullable CollectionConfiguration cfg) throws IgniteException {
         throw new UnsupportedOperationException("Operation isn't supported yet.");

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
index 42514e3..4867a10 100644
--- a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
+++ b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
@@ -398,6 +398,17 @@ public class IgniteSpringBean implements Ignite, DisposableBean, InitializingBea
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public IgniteSemaphore semaphore(String name,
+        int cnt,
+        boolean fair,
+        boolean create)
+    {
+        assert g != null;
+
+        return g.semaphore(name, cnt, fair, create);
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public <T> IgniteQueue<T> queue(String name,
         int cap,
         CollectionConfiguration cfg)


[5/6] ignite git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite into master-sem-review-1

Posted by yz...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite into master-sem-review-1


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ad99064e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ad99064e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ad99064e

Branch: refs/heads/ignite-638
Commit: ad99064e87c42d91dfc303d8d87861d2ed003bed
Parents: 8c6852d 0bc1d6f
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri Oct 23 17:40:05 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Oct 23 17:40:05 2015 +0300

----------------------------------------------------------------------
 DEVNOTES.txt                                    |    3 +-
 RELEASE_NOTES.txt                               |    2 +
 assembly/dependencies-fabric.xml                |    2 +-
 assembly/dependencies-hadoop.xml                |    1 +
 assembly/release-fabric-lgpl.xml                |   63 +
 assembly/release-fabric.xml                     |   10 +-
 assembly/release-hadoop-lgpl.xml                |   39 +
 examples-lgpl/README.txt                        |   27 +
 examples-lgpl/config/example-cache.xml          |   73 +
 examples-lgpl/config/example-ignite.xml         |   83 ++
 examples-lgpl/config/hibernate/README.txt       |    8 +
 .../hibernate/example-hibernate-L2-cache.xml    |   64 +
 examples-lgpl/pom-standalone.xml                |  186 +++
 examples-lgpl/pom.xml                           |  128 ++
 .../hibernate/HibernateL2CacheExample.java      |  245 ++++
 .../examples/datagrid/hibernate/Post.java       |  130 ++
 .../examples/datagrid/hibernate/User.java       |  154 ++
 .../datagrid/hibernate/package-info.java        |   22 +
 .../hibernate/CacheHibernatePersonStore.java    |  122 ++
 .../hibernate/CacheHibernateStoreExample.java   |  151 ++
 .../datagrid/store/hibernate/Person.hbm.xml     |   34 +
 .../datagrid/store/hibernate/hibernate.cfg.xml  |   41 +
 .../datagrid/store/hibernate/package-info.java  |   22 +
 .../misc/schedule/ComputeScheduleExample.java   |   82 ++
 .../examples/misc/schedule/package-info.java    |   22 +
 .../misc/schedule/ComputeScheduleExample.java   |   68 +
 .../java8/misc/schedule/package-info.java       |   22 +
 .../ignite/examples/java8/package-info.java     |   23 +
 .../scalar/examples/ScalarScheduleExample.scala |   66 +
 ...ibernateL2CacheExampleMultiNodeSelfTest.java |   31 +
 .../HibernateL2CacheExampleSelfTest.java        |   33 +
 .../IgniteLgplExamplesSelfTestSuite.java        |   48 +
 ...ibernateL2CacheExampleMultiNodeSelfTest.java |   29 +
 .../HibernateL2CacheExampleSelfTest.java        |   37 +
 .../IgniteLgplExamplesJ8SelfTestSuite.java      |   46 +
 .../ScalarLgplExamplesMultiNodeSelfTest.scala   |   33 +
 .../examples/ScalarLgplExamplesSelfTest.scala   |   36 +
 .../ScalarLgplExamplesSelfTestSuite.scala       |   37 +
 examples/config/hibernate/README.txt            |    8 -
 .../hibernate/example-hibernate-L2-cache.xml    |   64 -
 examples/pom-standalone.xml                     |   12 -
 examples/pom.xml                                |   12 -
 examples/schema-import/pom-standalone.xml       |   90 ++
 examples/schema-import/pom.xml                  |   13 +-
 .../hibernate/HibernateL2CacheExample.java      |  245 ----
 .../examples/datagrid/hibernate/Post.java       |  130 --
 .../examples/datagrid/hibernate/User.java       |  154 --
 .../datagrid/hibernate/package-info.java        |   22 -
 .../hibernate/CacheHibernatePersonStore.java    |  122 --
 .../hibernate/CacheHibernateStoreExample.java   |  151 --
 .../datagrid/store/hibernate/Person.hbm.xml     |   34 -
 .../datagrid/store/hibernate/hibernate.cfg.xml  |   41 -
 .../datagrid/store/hibernate/package-info.java  |   22 -
 .../misc/schedule/ComputeScheduleExample.java   |   82 --
 .../examples/misc/schedule/package-info.java    |   22 -
 .../misc/schedule/ComputeScheduleExample.java   |   68 -
 .../java8/misc/schedule/package-info.java       |   22 -
 .../scalar/examples/ScalarScheduleExample.scala |   66 -
 ...ibernateL2CacheExampleMultiNodeSelfTest.java |   31 -
 .../HibernateL2CacheExampleSelfTest.java        |   33 -
 .../testsuites/IgniteExamplesSelfTestSuite.java |    4 -
 ...ibernateL2CacheExampleMultiNodeSelfTest.java |   29 -
 .../HibernateL2CacheExampleSelfTest.java        |   37 -
 .../tests/examples/ScalarExamplesSelfTest.scala |    5 -
 modules/apache-license-gen/README.txt           |   33 +
 modules/apache-license-gen/pom.xml              |    3 +
 .../JettyRestProcessorAbstractSelfTest.java     |  252 +++-
 .../org/apache/ignite/IgniteFileSystem.java     |    2 +
 .../apache/ignite/IgniteSystemProperties.java   |    3 +
 .../org/apache/ignite/IgniteTransactions.java   |    4 -
 .../discovery/GridDiscoveryManager.java         |   57 +-
 .../internal/portable/PortableContext.java      |   19 +-
 .../portable/api/PortableMarshaller.java        |   14 +-
 .../cache/DynamicCacheDescriptor.java           |   17 +
 .../processors/cache/GridCacheAdapter.java      |  357 +++--
 .../cache/GridCacheClearAllRunnable.java        |   18 +-
 .../cache/GridCacheConcurrentMap.java           |    4 +-
 .../processors/cache/GridCacheContext.java      |    2 +-
 .../processors/cache/GridCacheIoManager.java    |   29 +-
 .../processors/cache/GridCacheMvccManager.java  |   20 +-
 .../GridCachePartitionExchangeManager.java      |   72 +-
 .../processors/cache/GridCacheProcessor.java    |   57 +-
 .../processors/cache/GridCacheProxyImpl.java    |   14 +-
 .../processors/cache/IgniteCacheProxy.java      |    2 +-
 .../processors/cache/IgniteInternalCache.java   |   19 +-
 .../dht/GridClientPartitionTopology.java        |   13 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |    6 +-
 .../cache/distributed/dht/GridDhtGetFuture.java |    4 +-
 .../distributed/dht/GridDhtLocalPartition.java  |    3 +-
 .../distributed/dht/GridDhtLockRequest.java     |    5 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   12 +-
 .../dht/GridPartitionedGetFuture.java           |    5 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   17 +-
 .../colocated/GridDhtColocatedLockFuture.java   |   11 +-
 .../GridDhtPartitionDemandMessage.java          |    4 +-
 .../GridDhtPartitionSupplyMessage.java          |    3 +-
 .../GridDhtPartitionsExchangeFuture.java        |   12 +-
 .../preloader/GridDhtPartitionsFullMessage.java |   12 +-
 .../GridDhtPartitionsSingleMessage.java         |   11 +-
 .../dht/preloader/GridDhtPreloader.java         |   18 +-
 .../distributed/near/GridNearCacheAdapter.java  |   21 +-
 .../near/GridNearCacheClearAllRunnable.java     |    9 +-
 .../distributed/near/GridNearGetFuture.java     |    2 +
 .../distributed/near/GridNearLockFuture.java    |   11 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   24 +-
 .../cache/query/GridCacheQueryManager.java      |   33 +-
 .../cache/query/GridCacheSqlIndexMetadata.java  |    7 +-
 .../cache/query/GridCacheSqlMetadata.java       |   22 +-
 .../datastructures/DataStructuresProcessor.java |   48 +-
 .../processors/igfs/IgfsDataManager.java        |    2 -
 .../processors/igfs/IgfsDeleteWorker.java       |  102 +-
 .../internal/processors/igfs/IgfsFileInfo.java  |   13 +-
 .../internal/processors/igfs/IgfsImpl.java      |  228 +--
 .../processors/igfs/IgfsMetaManager.java        |  909 ++++++++----
 .../processors/igfs/IgfsOutputStreamImpl.java   |    2 +
 .../internal/processors/igfs/IgfsUtils.java     |   23 +
 .../processors/rest/GridRestCommand.java        |    8 +-
 .../processors/rest/GridRestProcessor.java      |  364 ++++-
 .../handlers/cache/GridCacheCommandHandler.java |  364 ++---
 .../handlers/query/QueryCommandHandler.java     |  195 ++-
 .../top/GridTopologyCommandHandler.java         |   31 +-
 .../rest/request/RestQueryRequest.java          |  175 +++
 .../rest/request/RestSqlQueryRequest.java       |  125 --
 .../ignite/internal/util/GridJavaProcess.java   |   12 +-
 .../ignite/internal/util/IgniteUtils.java       |    2 +-
 .../ignite/internal/util/lang/GridFunc.java     |   12 +
 .../ignite/internal/util/nio/GridNioServer.java |    2 +-
 .../apache/ignite/marshaller/Marshaller.java    |    2 +-
 .../optimized/OptimizedMarshallerUtils.java     |    6 +-
 .../communication/tcp/TcpCommunicationSpi.java  |   22 +-
 .../ignite/spi/deployment/DeploymentSpi.java    |    8 +-
 modules/core/src/test/config/tests.properties   |    3 +
 .../ignite/igfs/IgfsEventsAbstractSelfTest.java |    6 +-
 .../cache/CacheAffinityCallSelfTest.java        |    4 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |  486 ++++---
 .../cache/GridCacheClearSelfTest.java           |  308 ++++
 .../GridCacheDeploymentOffHeapSelfTest.java     |   15 +
 .../IgniteCacheConfigurationTemplateTest.java   |   31 +
 .../cache/IgniteCacheEntryListenerTxTest.java   |    4 +
 .../cache/IgniteCachePutAllRestartTest.java     |    4 +-
 .../CacheAbstractRestartSelfTest.java           |  247 ++++
 .../CacheGetFutureHangsSelfTest.java            |  159 +--
 ...NearDisabledAtomicInvokeRestartSelfTest.java |  179 +++
 ...abledTransactionalInvokeRestartSelfTest.java |  173 +++
 ...edTransactionalWriteReadRestartSelfTest.java |  124 ++
 .../CacheNoValueClassOnServerNodeTest.java      |    1 +
 ...niteCacheClientNodeChangingTopologyTest.java |   10 +-
 .../distributed/IgniteCacheCreatePutTest.java   |  125 ++
 .../dht/GridCacheDhtEntrySelfTest.java          |    2 +-
 .../dht/GridCacheDhtPreloadPerformanceTest.java |  133 ++
 .../dht/GridCacheTxNodeFailureSelfTest.java     |    2 +
 .../dht/GridNearCacheTxNodeFailureSelfTest.java |    4 +
 ...gniteAtomicLongChangingTopologySelfTest.java |  159 ++-
 ...tomicClientOnlyMultiNodeFullApiSelfTest.java |   71 +-
 ...ledFairAffinityMultiNodeFullApiSelfTest.java |    8 +-
 ...icOffHeapTieredMultiNodeFullApiSelfTest.java |    7 +-
 .../near/GridCacheNearTxExceptionSelfTest.java  |    4 +
 .../near/GridCacheNearTxForceKeyTest.java       |    2 +-
 ...achePartitionedMultiNodeFullApiSelfTest.java |  129 +-
 .../replicated/GridReplicatedTxPreloadTest.java |    7 +-
 ...bledFairAffinityMultiJvmFullApiSelfTest.java |    5 +
 ...tomicNearEnabledMultiJvmFullApiSelfTest.java |    5 +
 .../DataStreamerMultiThreadedSelfTest.java      |    4 +-
 .../DataStreamerMultinodeCreateCacheTest.java   |    2 +
 .../processors/igfs/IgfsAbstractSelfTest.java   |  812 ++++++++---
 .../igfs/IgfsDataManagerSelfTest.java           |   13 +-
 .../igfs/IgfsMetaManagerSelfTest.java           |  170 +--
 .../processors/igfs/IgfsProcessorSelfTest.java  |   12 +-
 .../nio/IgniteExceptionInNioWorkerSelfTest.java |  105 ++
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |    2 +
 .../testframework/junits/GridAbstractTest.java  |  116 +-
 .../junits/IgniteTestResources.java             |    8 +-
 .../junits/common/GridCommonAbstractTest.java   |   15 +-
 .../junits/multijvm/AffinityProcessProxy.java   |  440 ++++--
 .../multijvm/IgniteCacheProcessProxy.java       | 1346 ++++++++++++++----
 .../multijvm/IgniteClusterProcessProxy.java     |  115 +-
 .../multijvm/IgniteEventsProcessProxy.java      |   50 +-
 .../junits/multijvm/IgniteNodeRunner.java       |   39 +-
 .../junits/multijvm/IgniteProcessProxy.java     |  107 +-
 .../ignite/testsuites/IgniteBasicTestSuite.java |    3 +
 .../IgniteCacheFullApiSelfTestSuite.java        |    8 +-
 .../IgniteCacheLoadConsistencyTestSuite.java    |   42 +
 .../testsuites/IgniteCacheTestSuite4.java       |    5 +
 .../ignite/testsuites/IgniteIgfsTestSuite.java  |    6 +
 modules/extdata/uri/pom.xml                     |   11 +-
 .../hadoop/igfs/HadoopIgfsWrapper.java          |   54 +-
 .../HadoopIgfs20FileSystemAbstractSelfTest.java |   27 +-
 .../IgniteHadoopFileSystemAbstractSelfTest.java |    2 +-
 .../CacheHibernateBlobStoreSelfTest.java        |    6 +-
 .../cache/CacheConfigurationP2PTest.java        |    3 +
 .../cache/SqlFieldsQuerySelfTest.java           |  172 +++
 .../IgniteCacheQuerySelfTestSuite.java          |    2 +
 .../main/cpp/common/project/vs/common.vcxproj   |    4 +-
 .../http/jetty/GridJettyJsonConfig.java         |  158 +-
 .../http/jetty/GridJettyRestHandler.java        |  186 +--
 .../spi/deployment/uri/UriDeploymentSpi.java    |   93 +-
 .../scanners/http/UriDeploymentHttpScanner.java |   10 +-
 .../http/GridHttpDeploymentSelfTest.java        |  132 +-
 .../visor/commands/kill/VisorKillCommand.scala  |    2 +-
 .../scala/org/apache/ignite/visor/visor.scala   |    1 -
 modules/yardstick/pom.xml                       |   10 +-
 pom.xml                                         |   88 +-
 202 files changed, 10323 insertions(+), 4135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ad99064e/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/ad99064e/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
----------------------------------------------------------------------


[6/6] ignite git commit: review

Posted by yz...@apache.org.
review


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5f38a18e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5f38a18e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5f38a18e

Branch: refs/heads/ignite-638
Commit: 5f38a18e1dcb03523d151e2693f7bd7c2fedca81
Parents: ad99064
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri Oct 23 19:09:35 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Oct 23 19:09:35 2015 +0300

----------------------------------------------------------------------
 .../datastructures/IgniteSemaphoreExample.java  | 69 +++++++++++++-------
 .../src/main/java/org/apache/ignite/Ignite.java |  4 +-
 .../java/org/apache/ignite/IgniteSemaphore.java | 29 ++++++--
 .../datastructures/DataStructuresProcessor.java | 32 +++++----
 .../datastructures/GridCacheSemaphoreEx.java    | 17 +++++
 .../datastructures/GridCacheSemaphoreImpl.java  | 57 +++++++++++-----
 .../datastructures/GridCacheSemaphoreState.java | 55 +++++++++-------
 .../IgniteSemaphoreAbstractSelfTest.java        |  1 +
 .../IgnitePartitionedSemaphoreSelfTest.java     |  1 -
 9 files changed, 182 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5f38a18e/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
index ece0ffc..1c078b0 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.examples.datastructures;
 
 import java.util.UUID;
@@ -8,17 +25,17 @@ import org.apache.ignite.examples.ExampleNodeStartup;
 import org.apache.ignite.lang.IgniteRunnable;
 
 /**
- * This example demonstrates cache based semaphore. <p> Remote nodes should always be started with special configuration
- * file which enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. <p> Alternatively
- * you can run {@link ExampleNodeStartup} in another JVM which will start node with {@code
+ * This example demonstrates cache based semaphore.
+ * <p>
+ * Remote nodes should always be started with special configuration
+ * file which enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node with {@code
  * examples/config/example-ignite.xml} configuration.
  */
 public class IgniteSemaphoreExample {
-    /** Cache name. */
-    private static final String CACHE_NAME = IgniteSemaphoreExample.class.getSimpleName();
-
     /** Number of items for each producer/consumer to produce/consume. */
-    private static final int ITEM_COUNT = 100;
+    private static final int OPS_COUNT = 100;
 
     /** Number of producers. */
     private static final int NUM_PRODUCERS = 10;
@@ -27,15 +44,20 @@ public class IgniteSemaphoreExample {
     private static final int NUM_CONSUMERS = 10;
 
     /** Synchronization semaphore name. */
-    private static final String syncName = IgniteSemaphoreExample.class.getSimpleName();
+    private static final String SEM_NAME = IgniteSemaphoreExample.class.getSimpleName();
 
+    /**
+     * Executes example.
+     *
+     * @param args Command line arguments, none required.
+     */
     public static void main(String[] args) {
         try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
             System.out.println();
             System.out.println(">>> Cache atomic semaphore example started.");
 
             // Initialize semaphore.
-            IgniteSemaphore syncSemaphore = ignite.semaphore(syncName, 0, false, true);
+            IgniteSemaphore syncSemaphore = ignite.semaphore(SEM_NAME, 0, false, true);
 
             // Make name of semaphore.
             final String semaphoreName = UUID.randomUUID().toString();
@@ -82,7 +104,6 @@ public class IgniteSemaphoreExample {
      * Closure which simply signals the semaphore.
      */
     private static class Producer extends SemaphoreExampleClosure {
-
         /**
          * @param semaphoreName Semaphore name.
          */
@@ -94,20 +115,21 @@ public class IgniteSemaphoreExample {
         @Override public void run() {
             IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
 
-            for (int i = 0; i < ITEM_COUNT; i++) {
-                System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "]. Available: " + semaphore.availablePermits());
+            for (int i = 0; i < OPS_COUNT; i++) {
+                System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() +
+                    ", available=" + semaphore.availablePermits() + ']');
 
                 // Signals others that shared resource is available.
                 semaphore.release();
             }
 
-            System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");
+            System.out.println("Producer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']');
 
             // Gets the syncing semaphore
-            IgniteSemaphore sync = Ignition.ignite().semaphore(syncName, 0, true, true);
+            IgniteSemaphore sem = Ignition.ignite().semaphore(SEM_NAME, 0, true, true);
 
             // Signals the master thread
-            sync.release();
+            sem.release();
         }
     }
 
@@ -115,7 +137,6 @@ public class IgniteSemaphoreExample {
      * Closure which simply waits on semaphore.
      */
     private static class Consumer extends SemaphoreExampleClosure {
-
         /**
          * @param semaphoreName Semaphore name.
          */
@@ -125,23 +146,23 @@ public class IgniteSemaphoreExample {
 
         /** {@inheritDoc} */
         @Override public void run() {
-            IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
+            IgniteSemaphore sem = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
 
-            for (int i = 0; i < ITEM_COUNT; i++) {
+            for (int i = 0; i < OPS_COUNT; i++) {
                 // Block if no permits are available.
-                semaphore.acquire();
+                sem.acquire();
 
-                System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "]. Available: " + semaphore.availablePermits());
+                System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() +
+                    ", available=" + sem.availablePermits() + ']');
             }
 
-            System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");
+            System.out.println("Consumer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']');
 
             // Gets the syncing semaphore
-            IgniteSemaphore sync = Ignition.ignite().semaphore(syncName, 3, true, true);
+            IgniteSemaphore sync = Ignition.ignite().semaphore(SEM_NAME, 3, true, true);
 
-            // Signals the master thread
+            // Signals the master thread.
             sync.release();
         }
     }
 }
-

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f38a18e/modules/core/src/main/java/org/apache/ignite/Ignite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index dffb126..5437903 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -431,7 +431,7 @@ public interface Ignite extends AutoCloseable {
      * @throws IgniteException If semaphore could not be fetched or created.
      */
     public IgniteSemaphore semaphore(String name, int cnt, boolean fair, boolean create)
-            throws IgniteException;
+        throws IgniteException;
 
     /**
      * Will get a named queue from cache and create one if it has not been created yet and {@code cfg} is not
@@ -491,4 +491,4 @@ public interface Ignite extends AutoCloseable {
      * @return Affinity.
      */
     public <K> Affinity<K> affinity(String cacheName);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f38a18e/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
index 5a4b377..1db79a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
@@ -1,13 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite;
 
 import java.io.Closeable;
 import java.util.concurrent.TimeUnit;
 
 /**
- * This interface provides a rich API for working with distributed semaphore. <p> <h1 class="header">Functionality</h1>
- * Distributed semaphore provides functionality similar to {@code java.util.concurrent.Semaphore}. <h1
- * class="header">Creating Distributed Semaphore</h1> Instance of cache semaphore can be created by calling the
- * following method: {@link Ignite#semaphore(String, int, boolean, boolean)}.
+ * This interface provides a rich API for working with distributed semaphore.
+ * <p>
+ * <h1 class="header">Functionality</h1>
+ * Distributed semaphore provides functionality similar to {@code java.util.concurrent.Semaphore}.
+ * <h1 class="header">Creating Distributed Semaphore</h1>
+ * Instance of cache semaphore can be created by calling the following method:
+ * {@link Ignite#semaphore(String, int, boolean, boolean)}.
  */
 public interface IgniteSemaphore extends Closeable {
     /**
@@ -291,4 +311,3 @@ public interface IgniteSemaphore extends Closeable {
      */
     @Override public void close();
 }
-

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f38a18e/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 82d6725..76db4ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -1250,11 +1250,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                         dsView.put(key, val);
                     }
 
-                    semaphore = new GridCacheSemaphoreImpl(name, val.getCount(),
-                            fair,
-                            key,
-                            semaphoreView,
-                            dsCacheCtx);
+                    semaphore = new GridCacheSemaphoreImpl(
+                        name, val.getCount(),
+                        fair,
+                        key,
+                        semaphoreView,
+                        dsCacheCtx);
 
                     dsMap.put(key, semaphore);
 
@@ -1299,19 +1300,19 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                     GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class);
 
                     if (val != null) {
-                        if (val.getCount() < 0) {
-                            throw new IgniteCheckedException("Failed to remove semaphore " +
-                                    "with blocked threads. ");
-                        }
+                        if (val.getCount() < 0)
+                            throw new IgniteCheckedException("Failed to remove semaphore with blocked threads. ");
 
                         dsView.remove(key);
 
                         tx.commit();
-                    } else
+                    }
+                    else
                         tx.setRollbackOnly();
 
                     return null;
-                } finally {
+                }
+                finally {
                     dsCacheCtx.gate().leave();
                 }
             }
@@ -1327,7 +1328,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException If removing failed or class of object is different to expected class.
      */
     private <R> boolean removeInternal(final GridCacheInternal key, final Class<R> cls) throws IgniteCheckedException {
-        return CU.outTx(new Callable<Boolean>() {
+        return CU.outTx(
+            new Callable<Boolean>() {
                 @Override public Boolean call() throws Exception {
                     try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
                         // Check correctness type of removable object.
@@ -1365,7 +1367,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
             if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED)
                 return evt.getValue() instanceof GridCacheCountDownLatchValue ||
-                        evt.getValue() instanceof GridCacheSemaphoreState;
+                    evt.getValue() instanceof GridCacheSemaphoreState;
             else {
                 assert evt.getEventType() == EventType.REMOVED : evt;
 
@@ -1573,7 +1575,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                 catch (ClusterGroupEmptyCheckedException e) {
                     throw new IgniteCheckedException(e);
                 }
-                catch (IgniteTxRollbackCheckedException | CachePartialUpdateCheckedException | ClusterTopologyCheckedException e) {
+                catch (IgniteTxRollbackCheckedException |
+                    CachePartialUpdateCheckedException |
+                    ClusterTopologyCheckedException e) {
                     if (cnt++ == MAX_UPDATE_RETRIES)
                         throw e;
                     else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f38a18e/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
index 8ecbcc5..76d8bb7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.processors.datastructures;
 
 import org.apache.ignite.IgniteSemaphore;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f38a18e/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index 78d923a..85ba9f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.processors.datastructures;
 
 import java.io.Externalizable;
@@ -90,8 +107,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     }
 
     /**
-     * Synchronization implementation for semaphore.  Uses AQS state to represent permits. Subclassed into fair and
-     * nonfair versions.
+     * Synchronization implementation for semaphore.
+     * Uses AQS state to represent permits. Subclassed into fair and nonfair versions.
      */
     abstract class Sync extends AbstractQueuedSynchronizer {
         private static final long serialVersionUID = 1192457210091910933L;
@@ -195,7 +212,6 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
          */
         final int drainPermits() {
             for (; ; ) {
-
                 int current = getState();
 
                 if (current == 0 || compareAndSetGlobalState(current, 0))
@@ -212,7 +228,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
                 CU.outTx(
                     retryTopologySafe(new Callable<Boolean>() {
                         @Override public Boolean call() throws Exception {
-                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+                            try (
+                                IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)
+                            ) {
                                 GridCacheSemaphoreState val = semaphoreView.get(key);
 
                                 if (val == null)
@@ -255,18 +273,22 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
                 return CU.outTx(
                     retryTopologySafe(new Callable<Boolean>() {
                         @Override public Boolean call() throws Exception {
-                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+                            try (
+                                IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView,
+                                    PESSIMISTIC, REPEATABLE_READ)
+                            ) {
                                 GridCacheSemaphoreState val = semaphoreView.get(key);
 
                                 if (val == null)
-                                    throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
+                                    throw new IgniteCheckedException("Failed to find semaphore with given name: " +
+                                        name);
 
                                 boolean retVal = val.getCount() == expVal;
 
                                 if (retVal) {
-                                        /* If current thread is queued, than this call is the call that is going to be unblocked. */
+                                    // If current thread is queued, than this call is
+                                    // the call that is going to be unblocked.
                                     if (sync.isQueued(Thread.currentThread())) {
-
                                         int waiting = val.getWaiters() - 1;
 
                                         val.setWaiters(waiting);
@@ -300,7 +322,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     }
 
     /**
-     * NonFair version
+     * NonFair version.
      */
     final class NonfairSync extends Sync {
         private static final long serialVersionUID = 7983135489326435495L;
@@ -356,12 +378,14 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
      * @param semaphoreView Semaphore projection.
      * @param ctx Cache context.
      */
-    public GridCacheSemaphoreImpl(String name,
+    public GridCacheSemaphoreImpl(
+        String name,
         int initCnt,
         boolean fair,
         GridCacheInternalKey key,
         IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semaphoreView,
-        GridCacheContext ctx) {
+        GridCacheContext ctx
+    ) {
         assert name != null;
         assert key != null;
         assert semaphoreView != null;
@@ -381,7 +405,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
      * @throws IgniteCheckedException If operation failed.
      */
     private void initializeSemaphore() throws IgniteCheckedException {
-        if (initGuard.compareAndSet(false, true)) {
+        if (!initGuard.get() && initGuard.compareAndSet(false, true)) {
             try {
                 sync = CU.outTx(
                     retryTopologySafe(new Callable<Sync>() {
@@ -499,6 +523,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     /** {@inheritDoc} */
     @Override public void acquireUninterruptibly(int permits) {
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
         try {
             initializeSemaphore();
 
@@ -514,10 +539,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         int ret;
         try {
             initializeSemaphore();
+
             ret = CU.outTx(
                 retryTopologySafe(new Callable<Integer>() {
                     @Override public Integer call() throws Exception {
-                        try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+                        try (
+                            IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)
+                        ) {
                             GridCacheSemaphoreState val = semaphoreView.get(key);
 
                             if (val == null)
@@ -537,6 +565,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
+
         return ret;
     }
 
@@ -688,6 +717,4 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     @Override public String toString() {
         return S.toString(GridCacheSemaphoreImpl.class, this);
     }
-
 }
-

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f38a18e/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
index 1109b53..ce0da0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.processors.datastructures;
 
 import java.io.Externalizable;
@@ -14,19 +31,13 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
     /** */
     private static final long serialVersionUID = 0L;
 
-    /**
-     * Permission count.
-     */
+    /** Permission count. */
     private int count;
 
-    /**
-     * Waiter id.
-     */
+    /** Waiter ID. */
     private int waiters;
 
-    /**
-     * Fairness flag.
-     */
+    /** Fairness flag. */
     private boolean fair;
 
     /**
@@ -72,48 +83,48 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
         return count;
     }
 
+    /**
+     * @return Waiters.
+     */
     public int getWaiters() {
         return waiters;
     }
 
+    /**
+     * @param id Waiters.
+     */
     public void setWaiters(int id) {
         this.waiters = id;
     }
 
+    /**
+     * @return Fair flag.
+     */
     public boolean isFair() {
         return fair;
     }
 
-    /**
-     * {@inheritDoc}
-     */
+    /** {@inheritDoc} */
     @Override public Object clone() throws CloneNotSupportedException {
         return super.clone();
     }
 
-    /**
-     * {@inheritDoc}
-     */
+    /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeInt(count);
         out.writeInt(waiters);
         out.writeBoolean(fair);
     }
 
-    /**
-     * {@inheritDoc}
-     */
+    /** {@inheritDoc} */
     @Override public void readExternal(ObjectInput in) throws IOException {
         count = in.readInt();
         waiters = in.readInt();
         fair = in.readBoolean();
     }
 
-    /**
-     * {@inheritDoc}
-     */
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheSemaphoreState.class, this);
     }
 }
-

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f38a18e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
index 977a414..24959ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
@@ -60,6 +60,7 @@ public abstract class IgniteSemaphoreAbstractSelfTest extends IgniteAtomicsAbstr
     /** */
     private static final Random RND = new Random();
 
+    /** */
     @Rule
     public final ExpectedException exception = ExpectedException.none();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f38a18e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java
index 059e80a..c302cad 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java
@@ -30,5 +30,4 @@ public class IgnitePartitionedSemaphoreSelfTest extends IgniteSemaphoreAbstractS
     @Override protected CacheMode atomicsCacheMode() {
         return PARTITIONED;
     }
-
 }


[2/6] ignite git commit: Fixes formatting issues;

Posted by yz...@apache.org.
Fixes formatting issues;


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/be332a82
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/be332a82
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/be332a82

Branch: refs/heads/ignite-638
Commit: be332a82711ddd7e9088e00d7f26edd7de407a11
Parents: e9567ad
Author: Vladisav Jelisavcic <vl...@gmail.com>
Authored: Thu Oct 1 20:19:32 2015 +0200
Committer: vladisav <vl...@gmail.com>
Committed: Thu Oct 1 20:19:32 2015 +0200

----------------------------------------------------------------------
 .../datastructures/IgniteSemaphoreExample.java  |  68 ++--
 .../java/org/apache/ignite/IgniteSemaphore.java | 396 +++++++------------
 .../datastructures/GridCacheSemaphoreEx.java    |   6 +-
 .../datastructures/GridCacheSemaphoreImpl.java  | 346 +++++++++-------
 .../datastructures/GridCacheSemaphoreState.java |   9 +-
 .../datastructures/GridCacheSemaphoreValue.java | 115 ------
 6 files changed, 396 insertions(+), 544 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
index 2ef242c..5849f5f 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
@@ -1,22 +1,20 @@
 package org.apache.ignite.examples.datastructures;
 
-import org.apache.ignite.*;
-import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.examples.ExampleNodeStartup;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSemaphore;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.examples.ExampleNodeStartup;
+import org.apache.ignite.lang.IgniteRunnable;
 
 /**
- * This example demonstrates cache based semaphore.
- * <p>
- * Remote nodes should always be started with special configuration file which
- * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
- * <p>
- * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will
- * start node with {@code examples/config/example-ignite.xml} configuration.
- *
- * @author Vladisav Jelisavcic
+ * This example demonstrates cache based semaphore. <p> Remote nodes should always be started with special configuration
+ * file which enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. <p> Alternatively
+ * you can run {@link ExampleNodeStartup} in another JVM which will start node with {@code
+ * examples/config/example-ignite.xml} configuration.
  */
 public class IgniteSemaphoreExample {
     /** Cache name. */
@@ -40,7 +38,7 @@ public class IgniteSemaphoreExample {
             System.out.println(">>> Cache atomic semaphore example started.");
 
             // Initialize semaphore.
-            IgniteSemaphore syncSemaphore = ignite.semaphore(syncName,0,false,true);
+            IgniteSemaphore syncSemaphore = ignite.semaphore(syncName, 0, false, true);
 
             // Make name of semaphore.
             final String semaphoreName = UUID.randomUUID().toString();
@@ -50,21 +48,25 @@ public class IgniteSemaphoreExample {
 
             // Make shared resource
             final String resourceName = UUID.randomUUID().toString();
+
+            // Get cache view where the resource will be held
             IgniteCache<String, Queue<String>> cache = ignite.getOrCreateCache(CACHE_NAME);
+
+            // Put the resource queue the cache
             cache.put(resourceName, new LinkedList<>());
 
             // Initialize semaphore.
             IgniteSemaphore semaphore = ignite.semaphore(semaphoreName, 0, false, true);
 
             // Initialize mutex.
-            IgniteSemaphore mutex = ignite.semaphore(mutexName,1,false,true);
+            IgniteSemaphore mutex = ignite.semaphore(mutexName, 1, false, true);
 
             // Start consumers on all cluster nodes.
             for (int i = 0; i < NUM_CONSUMERS; i++)
                 ignite.compute().withAsync().run(new Consumer(mutexName, semaphoreName, resourceName));
 
             // Start producers on all cluster nodes.
-            for(int i = 0; i < NUM_PRODUCERS; i++)
+            for (int i = 0; i < NUM_PRODUCERS; i++)
                 ignite.compute().withAsync().run(new Producer(mutexName, semaphoreName, resourceName));
 
             System.out.println("Master node is waiting for all other nodes to finish...");
@@ -121,23 +123,33 @@ public class IgniteSemaphoreExample {
         /** {@inheritDoc} */
         @Override public void run() {
             IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
-            IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName,0,true,true);
+            IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName, 0, true, true);
 
-            for(int i=0;i<ITEM_COUNT;i++) {
+            for (int i = 0; i < ITEM_COUNT; i++) {
+                // Mutex is used to access shared resource.
                 mutex.acquire();
 
-                Queue<String> queue = (Queue<String>) Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+                Queue<String> queue = (Queue<String>)Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+
                 queue.add(Ignition.ignite().cluster().localNode().id().toString());
+
                 Ignition.ignite().cache(CACHE_NAME).put(resourceName, queue);
+
                 System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] produced data. Available: " + semaphore.availablePermits());
 
+                // Mutex is released for others to access the resource.
                 mutex.release();
 
+                // Signals others that shared resource is available.
                 semaphore.release();
             }
 
             System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");
-            IgniteSemaphore sync =  Ignition.ignite().semaphore(syncName, 0, true, true);
+
+            // Gets the syncing semaphore
+            IgniteSemaphore sync = Ignition.ignite().semaphore(syncName, 0, true, true);
+
+            // Signals the master thread
             sync.release();
         }
     }
@@ -159,23 +171,33 @@ public class IgniteSemaphoreExample {
         /** {@inheritDoc} */
         @Override public void run() {
             IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
-            IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName,0,true,true);
+            IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName, 0, true, true);
 
-            for(int i=0;i<ITEM_COUNT;i++) {
+            for (int i = 0; i < ITEM_COUNT; i++) {
+                // Block if queue is empty.
                 semaphore.acquire();
 
+                // Mutex is used to access shared resource.
                 mutex.acquire();
 
-                Queue<String> queue = (Queue<String>) Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+                Queue<String> queue = (Queue<String>)Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+
                 String data = queue.remove();
+
                 Ignition.ignite().cache(CACHE_NAME).put(resourceName, queue);
+
                 System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] consumed data generated by producer [nodeId=" + data + "]");
 
+                // Signals others that shared resource is available.
                 mutex.release();
             }
 
             System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");
-            IgniteSemaphore sync =  Ignition.ignite().semaphore(syncName, 3, true, true);
+
+            // Gets the syncing semaphore
+            IgniteSemaphore sync = Ignition.ignite().semaphore(syncName, 3, true, true);
+
+            // Signals the master thread
             sync.release();
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
index 0e29d00..5a4b377 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
@@ -4,17 +4,12 @@ import java.io.Closeable;
 import java.util.concurrent.TimeUnit;
 
 /**
- * This interface provides a rich API for working with distributed semaphore.
- * <p>
- * <h1 class="header">Functionality</h1>
- * Distributed semaphore provides functionality similar to {@code java.util.concurrent.Semaphore}.
- * <h1 class="header">Creating Distributed Semaphore</h1>
- * Instance of cache semaphore can be created by calling the following method:
- * {@link Ignite#semaphore(String, int, boolean, boolean)}.
- *
- * @author Vladisav Jelisavcic
+ * This interface provides a rich API for working with distributed semaphore. <p> <h1 class="header">Functionality</h1>
+ * Distributed semaphore provides functionality similar to {@code java.util.concurrent.Semaphore}. <h1
+ * class="header">Creating Distributed Semaphore</h1> Instance of cache semaphore can be created by calling the
+ * following method: {@link Ignite#semaphore(String, int, boolean, boolean)}.
  */
-public interface IgniteSemaphore extends Closeable{
+public interface IgniteSemaphore extends Closeable {
     /**
      * Gets name of the semaphore.
      *
@@ -23,151 +18,110 @@ public interface IgniteSemaphore extends Closeable{
     public String name();
 
     /**
-     * Acquires a permit from this semaphore, blocking until one is
-     * available, or the thread is {@linkplain Thread#interrupt interrupted}.
-     *
-     * <p>Acquires a permit, if one is available and returns immediately,
-     * reducing the number of available permits by one.
-     *
-     * <p>If no permit is available then the current thread becomes
-     * disabled for thread scheduling purposes and lies dormant until
-     * one of two things happens:
-     * <ul>
-     * <li>Some other thread invokes the {@link #release} method for this
-     * semaphore and the current thread is next to be assigned a permit; or
-     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
-     * the current thread.
-     * </ul>
-     *
-     * <p>If the current thread:
-     * <ul>
-     * <li>has its interrupted status set on entry to this method; or
-     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
-     * for a permit,
-     * </ul>
-     * then {@link IgniteInterruptedException} is thrown and the current thread's
-     * interrupted status is cleared.
+     * Acquires a permit from this semaphore, blocking until one is available, or the thread is {@linkplain
+     * Thread#interrupt interrupted}.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately, reducing the number of available permits by
+     * one.
+     *
+     * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies
+     * dormant until one of two things happens: <ul> <li>Some other thread invokes the {@link #release} method for this
+     * semaphore and the current thread is next to be assigned a permit; or <li>Some other thread {@linkplain
+     * Thread#interrupt interrupts} the current thread. </ul>
+     *
+     * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+     * Thread#interrupt interrupted} while waiting for a permit, </ul> then {@link IgniteInterruptedException} is thrown
+     * and the current thread's interrupted status is cleared.
      *
      * @throws IgniteInterruptedException if the current thread is interrupted
      */
     public void acquire() throws IgniteInterruptedException;
 
     /**
-     * Acquires a permit from this semaphore, blocking until one is
-     * available.
-     *
-     * <p>Acquires a permit, if one is available and returns immediately,
-     * reducing the number of available permits by one.
-     *
-     * <p>If no permit is available then the current thread becomes
-     * disabled for thread scheduling purposes and lies dormant until
-     * some other thread invokes the {@link #release} method for this
-     * semaphore and the current thread is next to be assigned a permit.
-     *
-     * <p>If the current thread is {@linkplain Thread#interrupt interrupted}
-     * while waiting for a permit then it will continue to wait, but the
-     * time at which the thread is assigned a permit may change compared to
-     * the time it would have received the permit had no interruption
-     * occurred.  When the thread does return from this method its interrupt
-     * status will be set.
+     * Acquires a permit from this semaphore, blocking until one is available.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately, reducing the number of available permits by
+     * one.
+     *
+     * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies
+     * dormant until some other thread invokes the {@link #release} method for this semaphore and the current thread is
+     * next to be assigned a permit.
+     *
+     * <p>If the current thread is {@linkplain Thread#interrupt interrupted} while waiting for a permit then it will
+     * continue to wait, but the time at which the thread is assigned a permit may change compared to the time it would
+     * have received the permit had no interruption occurred.  When the thread does return from this method its
+     * interrupt status will be set.
      */
     public void acquireUninterruptibly();
 
     /**
-     * Acquires a permit from this semaphore, only if one is available at the
-     * time of invocation.
-     *
-     * <p>Acquires a permit, if one is available and returns immediately,
-     * with the value {@code true},
-     * reducing the number of available permits by one.
-     *
-     * <p>If no permit is available then this method will return
-     * immediately with the value {@code false}.
-     *
-     * <p>Even when this semaphore has been set to use a
-     * fair ordering policy, a call to {@code tryAcquire()} <em>will</em>
-     * immediately acquire a permit if one is available, whether or not
-     * other threads are currently waiting.
-     * This &quot;barging&quot; behavior can be useful in certain
-     * circumstances, even though it breaks fairness. If you want to honor
-     * the fairness setting, then use
-     * {@link #tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit.SECONDS) }
-     * which is almost equivalent (it also detects interruption).
-     *
-     * @return {@code true} if a permit was acquired and {@code false}
-     *         otherwise
+     * Acquires a permit from this semaphore, only if one is available at the time of invocation.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately, with the value {@code true}, reducing the
+     * number of available permits by one.
+     *
+     * <p>If no permit is available then this method will return immediately with the value {@code false}.
+     *
+     * <p>Even when this semaphore has been set to use a fair ordering policy, a call to {@code tryAcquire()}
+     * <em>will</em> immediately acquire a permit if one is available, whether or not other threads are currently
+     * waiting. This &quot;barging&quot; behavior can be useful in certain circumstances, even though it breaks
+     * fairness. If you want to honor the fairness setting, then use {@link #tryAcquire(long, TimeUnit) tryAcquire(0,
+     * TimeUnit.SECONDS) } which is almost equivalent (it also detects interruption).
+     *
+     * @return {@code true} if a permit was acquired and {@code false} otherwise
      */
     public boolean tryAcquire();
 
     /**
-     * Acquires a permit from this semaphore, if one becomes available
-     * within the given waiting time and the current thread has not
-     * been {@linkplain Thread#interrupt interrupted}.
-     *
-     * <p>Acquires a permit, if one is available and returns immediately,
-     * with the value {@code true},
-     * reducing the number of available permits by one.
-     *
-     * <p>If no permit is available then the current thread becomes
-     * disabled for thread scheduling purposes and lies dormant until
-     * one of three things happens:
-     * <ul>
-     * <li>Some other thread invokes the {@link #release} method for this
-     * semaphore and the current thread is next to be assigned a permit; or
-     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
-     * the current thread; or
-     * <li>The specified waiting time elapses.
-     * </ul>
+     * Acquires a permit from this semaphore, if one becomes available within the given waiting time and the current
+     * thread has not been {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately, with the value {@code true}, reducing the
+     * number of available permits by one.
+     *
+     * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies
+     * dormant until one of three things happens: <ul> <li>Some other thread invokes the {@link #release} method for
+     * this semaphore and the current thread is next to be assigned a permit; or <li>Some other thread {@linkplain
+     * Thread#interrupt interrupts} the current thread; or <li>The specified waiting time elapses. </ul>
      *
      * <p>If a permit is acquired then the value {@code true} is returned.
      *
-     * <p>If the current thread:
-     * <ul>
-     * <li>has its interrupted status set on entry to this method; or
-     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
-     * to acquire a permit,
-     * </ul>
-     * then {@link IgniteInterruptedException} is thrown and the current thread's
-     * interrupted status is cleared.
+     * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+     * Thread#interrupt interrupted} while waiting to acquire a permit, </ul> then {@link IgniteInterruptedException} is
+     * thrown and the current thread's interrupted status is cleared.
      *
-     * <p>If the specified waiting time elapses then the value {@code false}
-     * is returned.  If the time is less than or equal to zero, the method
-     * will not wait at all.
+     * <p>If the specified waiting time elapses then the value {@code false} is returned.  If the time is less than or
+     * equal to zero, the method will not wait at all.
      *
      * @param timeout the maximum time to wait for a permit
      * @param unit the time unit of the {@code timeout} argument
-     * @return {@code true} if a permit was acquired and {@code false}
-     *         if the waiting time elapsed before a permit was acquired
+     * @return {@code true} if a permit was acquired and {@code false} if the waiting time elapsed before a permit was
+     * acquired
      * @throws IgniteInterruptedException if the current thread is interrupted
      */
     public boolean tryAcquire(long timeout, TimeUnit unit)
-            throws IgniteInterruptedException;
+        throws IgniteInterruptedException;
 
     /**
-     * Acquires the given number of permits from this semaphore,
-     * blocking until all are available.
+     * Acquires the given number of permits from this semaphore, blocking until all are available.
      *
-     * <p>Acquires the given number of permits, if they are available,
-     * and returns immediately, reducing the number of available permits
-     * by the given amount.
+     * <p>Acquires the given number of permits, if they are available, and returns immediately, reducing the number of
+     * available permits by the given amount.
      *
-     * <p>If insufficient permits are available then the current thread becomes
-     * disabled for thread scheduling purposes and lies dormant until
-     * some other thread invokes one of the {@link #release() release}
-     * methods for this semaphore, the current thread is next to be assigned
-     * permits and the number of available permits satisfies this request.
+     * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes
+     * and lies dormant until some other thread invokes one of the {@link #release() release} methods for this
+     * semaphore, the current thread is next to be assigned permits and the number of available permits satisfies this
+     * request.
      *
-     * <p>If the current thread is {@linkplain Thread#interrupt interrupted}
-     * while waiting for permits then it will continue to wait and its
-     * position in the queue is not affected.  When the thread does return
-     * from this method its interrupt status will be set.
+     * <p>If the current thread is {@linkplain Thread#interrupt interrupted} while waiting for permits then it will
+     * continue to wait and its position in the queue is not affected.  When the thread does return from this method its
+     * interrupt status will be set.
      *
      * @param permits the number of permits to acquire
      * @throws IllegalArgumentException if {@code permits} is negative
      */
     public void acquireUninterruptibly(int permits);
 
-
     /**
      * Returns the current number of permits available in this semaphore.
      *
@@ -187,131 +141,91 @@ public interface IgniteSemaphore extends Closeable{
     /**
      * Releases a permit, returning it to the semaphore.
      *
-     * <p>Releases a permit, increasing the number of available permits by
-     * one.  If any threads are trying to acquire a permit, then one is
-     * selected and given the permit that was just released.  That thread
-     * is (re)enabled for thread scheduling purposes.
+     * <p>Releases a permit, increasing the number of available permits by one.  If any threads are trying to acquire a
+     * permit, then one is selected and given the permit that was just released.  That thread is (re)enabled for thread
+     * scheduling purposes.
      *
-     * <p>There is no requirement that a thread that releases a permit must
-     * have acquired that permit by calling {@link #acquire}.
-     * Correct usage of a semaphore is established by programming convention
-     * in the application.
+     * <p>There is no requirement that a thread that releases a permit must have acquired that permit by calling {@link
+     * #acquire}. Correct usage of a semaphore is established by programming convention in the application.
      */
     public void release();
 
     /**
-     * Acquires the given number of permits from this semaphore, if all
-     * become available within the given waiting time and the current
-     * thread has not been {@linkplain Thread#interrupt interrupted}.
+     * Acquires the given number of permits from this semaphore, if all become available within the given waiting time
+     * and the current thread has not been {@linkplain Thread#interrupt interrupted}.
      *
-     * <p>Acquires the given number of permits, if they are available and
-     * returns immediately, with the value {@code true},
-     * reducing the number of available permits by the given amount.
-     *
-     * <p>If insufficient permits are available then
-     * the current thread becomes disabled for thread scheduling
-     * purposes and lies dormant until one of three things happens:
-     * <ul>
-     * <li>Some other thread invokes one of the {@link #release() release}
-     * methods for this semaphore, the current thread is next to be assigned
-     * permits and the number of available permits satisfies this request; or
-     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
-     * the current thread; or
-     * <li>The specified waiting time elapses.
-     * </ul>
+     * <p>Acquires the given number of permits, if they are available and returns immediately, with the value {@code
+     * true}, reducing the number of available permits by the given amount.
+     *
+     * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes
+     * and lies dormant until one of three things happens: <ul> <li>Some other thread invokes one of the {@link
+     * #release() release} methods for this semaphore, the current thread is next to be assigned permits and the number
+     * of available permits satisfies this request; or <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread; or <li>The specified waiting time elapses. </ul>
      *
      * <p>If the permits are acquired then the value {@code true} is returned.
      *
-     * <p>If the current thread:
-     * <ul>
-     * <li>has its interrupted status set on entry to this method; or
-     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
-     * to acquire the permits,
-     * </ul>
-     * then {@link IgniteInterruptedException} is thrown and the current thread's
-     * interrupted status is cleared.
-     * Any permits that were to be assigned to this thread, are instead
-     * assigned to other threads trying to acquire permits, as if
-     * the permits had been made available by a call to {@link #release()}.
-     *
-     * <p>If the specified waiting time elapses then the value {@code false}
-     * is returned.  If the time is less than or equal to zero, the method
-     * will not wait at all.  Any permits that were to be assigned to this
-     * thread, are instead assigned to other threads trying to acquire
-     * permits, as if the permits had been made available by a call to
+     * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+     * Thread#interrupt interrupted} while waiting to acquire the permits, </ul> then {@link IgniteInterruptedException}
+     * is thrown and the current thread's interrupted status is cleared. Any permits that were to be assigned to this
+     * thread, are instead assigned to other threads trying to acquire permits, as if the permits had been made
+     * available by a call to {@link #release()}.
+     *
+     * <p>If the specified waiting time elapses then the value {@code false} is returned.  If the time is less than or
+     * equal to zero, the method will not wait at all.  Any permits that were to be assigned to this thread, are instead
+     * assigned to other threads trying to acquire permits, as if the permits had been made available by a call to
      * {@link #release()}.
      *
      * @param permits the number of permits to acquire
      * @param timeout the maximum time to wait for the permits
      * @param unit the time unit of the {@code timeout} argument
-     * @return {@code true} if all permits were acquired and {@code false}
-     *         if the waiting time elapsed before all permits were acquired
+     * @return {@code true} if all permits were acquired and {@code false} if the waiting time elapsed before all
+     * permits were acquired
      * @throws IgniteInterruptedException if the current thread is interrupted
      * @throws IllegalArgumentException if {@code permits} is negative
      */
     public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
-            throws IgniteInterruptedException;
+        throws IgniteInterruptedException;
 
     /**
-     * Acquires the given number of permits from this semaphore, only
-     * if all are available at the time of invocation.
-     *
-     * <p>Acquires the given number of permits, if they are available, and
-     * returns immediately, with the value {@code true},
-     * reducing the number of available permits by the given amount.
-     *
-     * <p>If insufficient permits are available then this method will return
-     * immediately with the value {@code false} and the number of available
-     * permits is unchanged.
-     *
-     * <p>Even when this semaphore has been set to use a fair ordering
-     * policy, a call to {@code tryAcquire} <em>will</em>
-     * immediately acquire a permit if one is available, whether or
-     * not other threads are currently waiting.  This
-     * &quot;barging&quot; behavior can be useful in certain
-     * circumstances, even though it breaks fairness. If you want to
-     * honor the fairness setting, then use {@link #tryAcquire(int,
-     * long, TimeUnit) tryAcquire(permits, 0, TimeUnit.SECONDS) }
-     * which is almost equivalent (it also detects interruption).
+     * Acquires the given number of permits from this semaphore, only if all are available at the time of invocation.
+     *
+     * <p>Acquires the given number of permits, if they are available, and returns immediately, with the value {@code
+     * true}, reducing the number of available permits by the given amount.
+     *
+     * <p>If insufficient permits are available then this method will return immediately with the value {@code false}
+     * and the number of available permits is unchanged.
+     *
+     * <p>Even when this semaphore has been set to use a fair ordering policy, a call to {@code tryAcquire}
+     * <em>will</em> immediately acquire a permit if one is available, whether or not other threads are currently
+     * waiting.  This &quot;barging&quot; behavior can be useful in certain circumstances, even though it breaks
+     * fairness. If you want to honor the fairness setting, then use {@link #tryAcquire(int, long, TimeUnit)
+     * tryAcquire(permits, 0, TimeUnit.SECONDS) } which is almost equivalent (it also detects interruption).
      *
      * @param permits the number of permits to acquire
-     * @return {@code true} if the permits were acquired and
-     *         {@code false} otherwise
+     * @return {@code true} if the permits were acquired and {@code false} otherwise
      * @throws IllegalArgumentException if {@code permits} is negative
      */
     public boolean tryAcquire(int permits);
 
     /**
-     * Acquires the given number of permits from this semaphore,
-     * blocking until all are available,
-     * or the thread is {@linkplain Thread#interrupt interrupted}.
-     *
-     * <p>Acquires the given number of permits, if they are available,
-     * and returns immediately, reducing the number of available permits
-     * by the given amount.
-     *
-     * <p>If insufficient permits are available then the current thread becomes
-     * disabled for thread scheduling purposes and lies dormant until
-     * one of two things happens:
-     * <ul>
-     * <li>Some other thread invokes one of the {@link #release() release}
-     * methods for this semaphore, the current thread is next to be assigned
-     * permits and the number of available permits satisfies this request; or
-     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
-     * the current thread.
-     * </ul>
-     *
-     * <p>If the current thread:
-     * <ul>
-     * <li>has its interrupted status set on entry to this method; or
-     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
-     * for a permit,
-     * </ul>
-     * then {@link IgniteInterruptedException} is thrown and the current thread's
-     * interrupted status is cleared.
-     * Any permits that were to be assigned to this thread are instead
-     * assigned to other threads trying to acquire permits, as if
-     * permits had been made available by a call to {@link #release()}.
+     * Acquires the given number of permits from this semaphore, blocking until all are available, or the thread is
+     * {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires the given number of permits, if they are available, and returns immediately, reducing the number of
+     * available permits by the given amount.
+     *
+     * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes
+     * and lies dormant until one of two things happens: <ul> <li>Some other thread invokes one of the {@link #release()
+     * release} methods for this semaphore, the current thread is next to be assigned permits and the number of
+     * available permits satisfies this request; or <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+     * current thread. </ul>
+     *
+     * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+     * Thread#interrupt interrupted} while waiting for a permit, </ul> then {@link IgniteInterruptedException} is thrown
+     * and the current thread's interrupted status is cleared. Any permits that were to be assigned to this thread are
+     * instead assigned to other threads trying to acquire permits, as if permits had been made available by a call to
+     * {@link #release()}.
      *
      * @param permits the number of permits to acquire
      * @throws IgniteInterruptedException if the current thread is interrupted
@@ -322,21 +236,16 @@ public interface IgniteSemaphore extends Closeable{
     /**
      * Releases the given number of permits, returning them to the semaphore.
      *
-     * <p>Releases the given number of permits, increasing the number of
-     * available permits by that amount.
-     * If any threads are trying to acquire permits, then one
-     * is selected and given the permits that were just released.
-     * If the number of available permits satisfies that thread's request
-     * then that thread is (re)enabled for thread scheduling purposes;
-     * otherwise the thread will wait until sufficient permits are available.
-     * If there are still permits available
-     * after this thread's request has been satisfied, then those permits
-     * are assigned in turn to other threads trying to acquire permits.
-     *
-     * <p>There is no requirement that a thread that releases a permit must
-     * have acquired that permit by calling {@link IgniteSemaphore#acquire acquire}.
-     * Correct usage of a semaphore is established by programming convention
-     * in the application.
+     * <p>Releases the given number of permits, increasing the number of available permits by that amount. If any
+     * threads are trying to acquire permits, then one is selected and given the permits that were just released. If the
+     * number of available permits satisfies that thread's request then that thread is (re)enabled for thread scheduling
+     * purposes; otherwise the thread will wait until sufficient permits are available. If there are still permits
+     * available after this thread's request has been satisfied, then those permits are assigned in turn to other
+     * threads trying to acquire permits.
+     *
+     * <p>There is no requirement that a thread that releases a permit must have acquired that permit by calling {@link
+     * IgniteSemaphore#acquire acquire}. Correct usage of a semaphore is established by programming convention in the
+     * application.
      *
      * @param permits the number of permits to release
      * @throws IllegalArgumentException if {@code permits} is negative
@@ -351,23 +260,18 @@ public interface IgniteSemaphore extends Closeable{
     public boolean isFair();
 
     /**
-     * Queries whether any threads are waiting to acquire. Note that
-     * because cancellations may occur at any time, a {@code true}
-     * return does not guarantee that any other thread will ever
-     * acquire.  This method is designed primarily for use in
-     * monitoring of the system state.
-     *
-     * @return {@code true} if there may be other threads waiting to
-     *         acquire the lock
+     * Queries whether any threads are waiting to acquire. Note that because cancellations may occur at any time, a
+     * {@code true} return does not guarantee that any other thread will ever acquire.  This method is designed
+     * primarily for use in monitoring of the system state.
+     *
+     * @return {@code true} if there may be other threads waiting to acquire the lock
      */
     public boolean hasQueuedThreads();
 
     /**
-     * Returns an estimate of the number of threads waiting to acquire.
-     * The value is only an estimate because the number of threads may
-     * change dynamically while this method traverses internal data
-     * structures.  This method is designed for use in monitoring of the
-     * system state, not for synchronization control.
+     * Returns an estimate of the number of threads waiting to acquire. The value is only an estimate because the number
+     * of threads may change dynamically while this method traverses internal data structures.  This method is designed
+     * for use in monitoring of the system state, not for synchronization control.
      *
      * @return the estimated number of threads waiting for this lock
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
index 0f939d5..8ecbcc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
@@ -3,7 +3,7 @@ package org.apache.ignite.internal.processors.datastructures;
 import org.apache.ignite.IgniteSemaphore;
 
 /**
- * Created by vladisav on 20.9.15..
+ * Grid cache semaphore ({@code 'Ex'} stands for external).
  */
 public interface GridCacheSemaphoreEx extends IgniteSemaphore, GridCacheRemovable {
     /**
@@ -16,7 +16,7 @@ public interface GridCacheSemaphoreEx extends IgniteSemaphore, GridCacheRemovabl
     /**
      * Callback to notify semaphore on changes.
      *
-     * @param val Id of the caller and number of permissions to acquire (or release; can be negative).
+     * @param val State containing the number of available permissions.
      */
     public void onUpdate(GridCacheSemaphoreState val);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index 24c3ec5..17efc61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -1,6 +1,20 @@
 package org.apache.ignite.internal.processors.datastructures;
 
-import org.apache.ignite.*;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -11,26 +25,16 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
 /**
- * Cache semaphore implementation based on AbstractQueuedSynchronizer.
- * Current implementation supports only unfair and locally fair modes.
- * When fairness set false, this class makes no guarantees about the order in which threads acquire permits.
- * When fairness is set true, the semaphore only guarantees that local threads invoking any of the acquire methods
- * are selected to obtain permits in the order in which their invocation of those methods was processed (FIFO).
- *
- * @author Vladisav Jelisavcic
+ * Cache semaphore implementation based on AbstractQueuedSynchronizer. Current implementation supports only unfair and
+ * locally fair modes. When fairness set false, this class makes no guarantees about the order in which threads acquire
+ * permits. When fairness is set true, the semaphore only guarantees that local threads invoking any of the acquire
+ * methods are selected to obtain permits in the order in which their invocation of those methods was processed (FIFO).
  */
 public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Externalizable {
     /** */
@@ -38,11 +42,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
 
     /** Deserialization stash. */
     private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
-            new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
-                @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
-                    return F.t2();
-                }
-            };
+        new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
+            @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
+                return F.t2();
+            }
+        };
 
     /** Logger. */
     private IgniteLogger log;
@@ -50,7 +54,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     /** Semaphore name. */
     private String name;
 
-    /** Removed flag.*/
+    /** Removed flag. */
     private volatile boolean rmvd;
 
     /** Semaphore key. */
@@ -86,14 +90,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     }
 
     /**
-     * Synchronization implementation for semaphore.  Uses AQS state
-     * to represent permits. Subclassed into fair and nonfair
-     * versions.
+     * Synchronization implementation for semaphore.  Uses AQS state to represent permits. Subclassed into fair and
+     * nonfair versions.
      */
     abstract class Sync extends AbstractQueuedSynchronizer {
         private static final long serialVersionUID = 1192457210091910933L;
 
-        protected ConcurrentMap<Thread,Integer> threadMap;
+        protected final ConcurrentMap<Thread, Integer> threadMap;
         protected int totalWaiters;
 
         Sync(int permits) {
@@ -101,7 +104,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
             threadMap = new ConcurrentHashMap<>();
         }
 
-        protected synchronized void setWaiters(int waiters){
+        protected synchronized void setWaiters(int waiters) {
             totalWaiters = waiters;
         }
 
@@ -109,7 +112,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
             return totalWaiters;
         }
 
-        final synchronized void setPermits(int permits){
+        final synchronized void setPermits(int permits) {
             setState(permits);
         }
 
@@ -118,13 +121,14 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         }
 
         final int nonfairTryAcquireShared(int acquires) {
-            for (;;) {
+            for (; ; ) {
                 int available = getState();
+
                 int remaining = available - acquires;
 
                 if (remaining < 0 || compareAndSetGlobalState(available, remaining)) {
-                    if(remaining < 0){
-                        if(!threadMap.containsKey(Thread.currentThread()))
+                    if (remaining < 0) {
+                        if (!threadMap.containsKey(Thread.currentThread()))
                             getAndIncWaitingCount();
                     }
 
@@ -136,33 +140,41 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         protected final boolean tryReleaseShared(int releases) {
             // Check if some other node updated the state.
             // This method is called with release==0 only when trying to wake through update.
-            if(releases == 0)
+            if (releases == 0)
                 return true;
 
-            for (;;) {
+            for (; ; ) {
                 int current = getState();
+
                 int next = current + releases;
+
                 if (next < current) // overflow
                     throw new Error("Maximum permit count exceeded");
+
                 if (compareAndSetGlobalState(current, next))
                     return true;
             }
         }
 
         final void reducePermits(int reductions) {
-            for (;;) {
+            for (; ; ) {
                 int current = getState();
+
                 int next = current - reductions;
+
                 if (next > current) // underflow
                     throw new Error("Permit count underflow");
+
                 if (compareAndSetGlobalState(current, next))
                     return;
             }
         }
 
         final int drainPermits() {
-            for (;;) {
+            for (; ; ) {
+
                 int current = getState();
+
                 if (current == 0 || compareAndSetGlobalState(current, 0))
                     return current;
             }
@@ -171,34 +183,40 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         protected void getAndIncWaitingCount() {
             try {
                 CU.outTx(
-                        retryTopologySafe(new Callable<Boolean>() {
-                            @Override
-                            public Boolean call() throws Exception {
-                                try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
-                                    GridCacheSemaphoreState val = semaphoreView.get(key);
+                    retryTopologySafe(new Callable<Boolean>() {
+                        @Override
+                        public Boolean call() throws Exception {
+                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+                                GridCacheSemaphoreState val = semaphoreView.get(key);
 
-                                    if (val == null)
-                                        throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
+                                if (val == null)
+                                    throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
 
-                                    int waiting = val.getWaiters();
-                                    sync.threadMap.put(Thread.currentThread(), waiting);
+                                int waiting = val.getWaiters();
 
-                                    waiting++;
-                                    val.setWaiters(waiting);
-                                    semaphoreView.put(key, val);
-                                    tx.commit();
+                                sync.threadMap.put(Thread.currentThread(), waiting);
 
-                                    return true;
-                                } catch (Error | Exception e) {
-                                    U.error(log, "Failed to compare and set: " + this, e);
+                                waiting++;
 
-                                    throw e;
-                                }
+                                val.setWaiters(waiting);
+
+                                semaphoreView.put(key, val);
+
+                                tx.commit();
+
+                                return true;
+                            }
+                            catch (Error | Exception e) {
+                                U.error(log, "Failed to compare and set: " + this, e);
+
+                                throw e;
                             }
-                        }),
-                        ctx
+                        }
+                    }),
+                    ctx
                 );
-            } catch (IgniteCheckedException e) {
+            }
+            catch (IgniteCheckedException e) {
                 throw U.convertException(e);
             }
         }
@@ -206,44 +224,48 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         protected boolean compareAndSetGlobalState(final int expVal, final int newVal) {
             try {
                 return CU.outTx(
-                        retryTopologySafe(new Callable<Boolean>() {
-                            @Override
-                            public Boolean call() throws Exception {
-                                try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
-                                    GridCacheSemaphoreState val = semaphoreView.get(key);
+                    retryTopologySafe(new Callable<Boolean>() {
+                        @Override
+                        public Boolean call() throws Exception {
+                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+                                GridCacheSemaphoreState val = semaphoreView.get(key);
 
-                                    if (val == null)
-                                        throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
+                                if (val == null)
+                                    throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
 
-                                    boolean retVal = val.getCnt() == expVal;
+                                boolean retVal = val.getCnt() == expVal;
 
-                                    if (retVal) {
+                                if (retVal) {
                                         /* If current thread is queued, than this call is the call that is going to be unblocked. */
-                                        if(sync.isQueued(Thread.currentThread())) {
-
-                                            int waiting = val.getWaiters() - 1;
-                                            val.setWaiters(waiting);
+                                    if (sync.isQueued(Thread.currentThread())) {
 
-                                            sync.threadMap.remove(Thread.currentThread());
-                                        }
+                                        int waiting = val.getWaiters() - 1;
 
-                                        val.setCnt(newVal);
+                                        val.setWaiters(waiting);
 
-                                        semaphoreView.put(key, val);
-                                        tx.commit();
+                                        sync.threadMap.remove(Thread.currentThread());
                                     }
 
-                                    return retVal;
-                                } catch (Error | Exception e) {
-                                    U.error(log, "Failed to compare and set: " + this, e);
+                                    val.setCnt(newVal);
+
+                                    semaphoreView.put(key, val);
 
-                                    throw e;
+                                    tx.commit();
                                 }
+
+                                return retVal;
+                            }
+                            catch (Error | Exception e) {
+                                U.error(log, "Failed to compare and set: " + this, e);
+
+                                throw e;
                             }
-                        }),
-                        ctx
+                        }
+                    }),
+                    ctx
                 );
-            } catch( IgniteCheckedException e){
+            }
+            catch (IgniteCheckedException e) {
                 throw U.convertException(e);
             }
         }
@@ -275,16 +297,17 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         }
 
         protected int tryAcquireShared(int acquires) {
-            for (;;) {
+            for (; ; ) {
                 if (hasQueuedPredecessors())
                     return -1;
 
                 int available = getState();
+
                 int remaining = available - acquires;
 
                 if (remaining < 0 || compareAndSetGlobalState(available, remaining)) {
-                    if(remaining < 0){
-                        if(!threadMap.containsKey(Thread.currentThread()))
+                    if (remaining < 0) {
+                        if (!threadMap.containsKey(Thread.currentThread()))
                             getAndIncWaitingCount();
                     }
                     return remaining;
@@ -292,7 +315,6 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
             }
         }
 
-
     }
 
     /**
@@ -305,12 +327,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
      * @param ctx Cache context.
      */
     public GridCacheSemaphoreImpl(String name,
-                                       int initCnt,
-                                       boolean fair,
-                                       GridCacheInternalKey key,
-                                       IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semaphoreView,
-                                       GridCacheContext ctx)
-    {
+        int initCnt,
+        boolean fair,
+        GridCacheInternalKey key,
+        IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semaphoreView,
+        GridCacheContext ctx) {
         assert name != null;
         assert key != null;
         assert semaphoreView != null;
@@ -333,28 +354,30 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         if (initGuard.compareAndSet(false, true)) {
             try {
                 sync = CU.outTx(
-                        retryTopologySafe(new Callable<Sync>() {
-                            @Override
-                            public Sync call() throws Exception {
-                                try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
-                                    GridCacheSemaphoreState val = semaphoreView.get(key);
+                    retryTopologySafe(new Callable<Sync>() {
+                        @Override
+                        public Sync call() throws Exception {
+                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+                                GridCacheSemaphoreState val = semaphoreView.get(key);
 
-                                    if (val == null) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Failed to find semaphore with given name: " + name);
+                                if (val == null) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to find semaphore with given name: " + name);
 
-                                        return null;
-                                    }
+                                    return null;
+                                }
 
-                                    final int count = val.getCnt();
-                                    tx.commit();
+                                final int count = val.getCnt();
 
-                                    return val.isFair() ? new FairSync(count) : new NonfairSync(count);
-                                }
+                                tx.commit();
+
+                                return val.isFair() ? new FairSync(count) : new NonfairSync(count);
                             }
-                        }),
-                        ctx
+                        }
+                    }),
+                    ctx
                 );
+
                 if (log.isDebugEnabled())
                     log.debug("Initialized internal sync structure: " + sync);
             }
@@ -370,17 +393,20 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         }
     }
 
-
     /** {@inheritDoc} */
     @Override public String name() {
         return name;
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheInternalKey key() { return key; }
+    @Override public GridCacheInternalKey key() {
+        return key;
+    }
 
     /** {@inheritDoc} */
-    @Override public boolean removed(){ return rmvd; }
+    @Override public boolean removed() {
+        return rmvd;
+    }
 
     /** {@inheritDoc} */
     @Override public boolean onRemoved() {
@@ -389,7 +415,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
 
     /** {@inheritDoc} */
     @Override public void onUpdate(GridCacheSemaphoreState val) {
-        if(sync == null)
+        if (sync == null)
             return;
 
         // Update permission count.
@@ -415,25 +441,28 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     @Override
     public void acquire(int permits) throws IgniteInterruptedException {
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
         try {
             initializeSemaphore();
-            sync.acquireSharedInterruptibly(permits);
 
-        } catch (IgniteCheckedException e) {
+            sync.acquireSharedInterruptibly(permits);
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
-        } catch (InterruptedException e) {
+        }
+        catch (InterruptedException e) {
             throw new IgniteInterruptedException(e);
         }
     }
 
-
     @Override
     public void acquireUninterruptibly() {
         try {
             initializeSemaphore();
-            sync.acquireShared(1);
 
-        } catch (IgniteCheckedException e) {
+            sync.acquireShared(1);
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
     }
@@ -443,38 +472,41 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
         try {
             initializeSemaphore();
-            sync.acquireShared(permits);
 
-        } catch (IgniteCheckedException e) {
+            sync.acquireShared(permits);
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
     }
 
     @Override
-    public int availablePermits(){
+    public int availablePermits() {
         int ret;
         try {
             initializeSemaphore();
             ret = CU.outTx(
-                    retryTopologySafe(new Callable<Integer>() {
-                        @Override
-                        public Integer call() throws Exception {
-                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
-                                GridCacheSemaphoreState val = semaphoreView.get(key);
+                retryTopologySafe(new Callable<Integer>() {
+                    @Override
+                    public Integer call() throws Exception {
+                        try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+                            GridCacheSemaphoreState val = semaphoreView.get(key);
 
-                                if (val == null)
-                                    throw new IgniteException("Failed to find semaphore with given name: " + name);
+                            if (val == null)
+                                throw new IgniteException("Failed to find semaphore with given name: " + name);
 
-                                int count = val.getCnt();
-                                tx.rollback();
+                            int count = val.getCnt();
 
-                                return count;
-                            }
+                            tx.rollback();
+
+                            return count;
                         }
-                    }),
-                    ctx
+                    }
+                }),
+                ctx
             );
-        } catch (IgniteCheckedException e) {
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
         return ret;
@@ -484,9 +516,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     public int drainPermits() {
         try {
             initializeSemaphore();
-            return sync.drainPermits();
 
-        } catch (IgniteCheckedException e) {
+            return sync.drainPermits();
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
     }
@@ -495,9 +528,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     public boolean tryAcquire() {
         try {
             initializeSemaphore();
-            return sync.nonfairTryAcquireShared(1) >= 0;
 
-        } catch (IgniteCheckedException e) {
+            return sync.nonfairTryAcquireShared(1) >= 0;
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
     }
@@ -506,11 +540,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     public boolean tryAcquire(long timeout, TimeUnit unit) throws IgniteException {
         try {
             initializeSemaphore();
-            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
 
-        } catch (IgniteCheckedException e) {
+            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
-        } catch (InterruptedException e) {
+        }
+        catch (InterruptedException e) {
             throw new IgniteInterruptedException(e);
         }
     }
@@ -523,11 +559,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     @Override
     public void release(int permits) {
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
         try {
             initializeSemaphore();
-            sync.releaseShared(permits);
 
-        } catch (IgniteCheckedException e) {
+            sync.releaseShared(permits);
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
     }
@@ -535,11 +573,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     @Override
     public boolean tryAcquire(int permits) {
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
         try {
             initializeSemaphore();
-            return sync.nonfairTryAcquireShared(permits) >= 0;
 
-        } catch (IgniteCheckedException e) {
+            return sync.nonfairTryAcquireShared(permits) >= 0;
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
     }
@@ -549,11 +589,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
         try {
             initializeSemaphore();
-            return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
 
-        } catch (IgniteCheckedException e) {
+            return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
-        } catch (InterruptedException e) {
+        }
+        catch (InterruptedException e) {
             throw new IgniteInterruptedException(e);
         }
     }
@@ -567,8 +609,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     public boolean hasQueuedThreads() {
         try {
             initializeSemaphore();
-            return sync.getWaiters()!=0;
-        } catch (IgniteCheckedException e) {
+
+            return sync.getWaiters() != 0;
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
     }
@@ -577,8 +621,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     public int getQueueLength() {
         try {
             initializeSemaphore();
+
             return sync.getWaiters();
-        } catch (IgniteCheckedException e) {
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
index cf44b7d..a02b7c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
@@ -1,18 +1,14 @@
 package org.apache.ignite.internal.processors.datastructures;
 
-import org.apache.ignite.internal.processors.cache.GridCacheInternal;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-
+import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  * Grid cache semaphore state.
- *
- * @author Vladisav Jelisavcic
  */
 public class GridCacheSemaphoreState implements GridCacheInternal, Externalizable, Cloneable {
     /** */
@@ -33,7 +29,6 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
      */
     private boolean fair;
 
-
     /**
      * Constructor.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
deleted file mode 100644
index 689b647..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package org.apache.ignite.internal.processors.datastructures;
-
-import org.apache.ignite.internal.processors.cache.GridCacheInternal;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-
-/**
- * Created by vladisav on 20.9.15..
- */
-public class GridCacheSemaphoreValue implements GridCacheInternal, Externalizable, Cloneable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /**
-     * Permission count.
-     */
-    private int cnt;
-
-    /**
-     * Semaphore ID.
-     */
-    private long semaphoreId;
-
-    /**
-     * Constructor.
-     *
-     * @param cnt Number of permissions.
-     * @param
-     */
-    public GridCacheSemaphoreValue(int cnt, long semaphoreId) {
-        this.cnt = cnt;
-
-        this.semaphoreId = semaphoreId;
-    }
-
-    /**
-     * Empty constructor required for {@link Externalizable}.
-     */
-    public GridCacheSemaphoreValue() {
-        // No-op.
-    }
-
-    /**
-     * @param cnt New count.
-     */
-    public void set(int cnt) {
-        this.cnt = cnt;
-    }
-
-    /**
-     * @return Current count.
-     */
-    public int get() {
-        return cnt;
-    }
-
-    /**
-     * @return true if number of permissions to be added is positive
-     */
-    public boolean isRelease(){
-        return cnt>0;
-    }
-
-    /**
-     * @return true if permission count should be lowered
-     */
-    public boolean isAwait(){
-        return cnt<0;
-    }
-
-    /**
-     * @return Semaphore ID.
-     */
-    public long semaphoreId() {
-        return semaphoreId;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Object clone() throws CloneNotSupportedException {
-        return super.clone();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeInt(cnt);
-        out.writeLong(semaphoreId);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void readExternal(ObjectInput in) throws IOException {
-        cnt = in.readInt();
-        semaphoreId = in.readLong();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public String toString() {
-        return S.toString(GridCacheSemaphoreValue.class, this);
-    }
-}
\ No newline at end of file


[4/6] ignite git commit: Adds test coverage; Simplifies example;

Posted by yz...@apache.org.
Adds test coverage; Simplifies example;


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8c6852d0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8c6852d0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8c6852d0

Branch: refs/heads/ignite-638
Commit: 8c6852d07481c3943584f676b48b8c938342e640
Parents: 77f9deb
Author: Vladisav Jelisavcic <vl...@gmail.com>
Authored: Thu Oct 15 01:17:02 2015 +0200
Committer: vladisav <vl...@gmail.com>
Committed: Thu Oct 15 01:17:02 2015 +0200

----------------------------------------------------------------------
 .../datastructures/IgniteSemaphoreExample.java  |  78 +----
 .../datastructures/DataStructuresProcessor.java |  26 +-
 .../datastructures/GridCacheSemaphoreImpl.java  |  12 +-
 .../datastructures/GridCacheSemaphoreState.java |  28 +-
 .../IgniteClientReconnectAtomicsTest.java       |  44 ++-
 .../IgniteClientDataStructuresAbstractTest.java |  59 +++-
 .../IgniteDataStructureUniqueNameTest.java      |  14 +-
 .../IgniteSemaphoreAbstractSelfTest.java        | 328 +++++++++++++++++++
 .../local/IgniteLocalSemaphoreSelfTest.java     |  98 ++++++
 .../IgnitePartitionedSemaphoreSelfTest.java     |  34 ++
 .../IgniteReplicatedSemaphoreSelfTest.java      |  33 ++
 .../cache/GridCacheDataStructuresLoadTest.java  | 283 +++++++++-------
 12 files changed, 812 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
index 5849f5f..ece0ffc 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
@@ -1,10 +1,7 @@
 package org.apache.ignite.examples.datastructures;
 
-import java.util.LinkedList;
-import java.util.Queue;
 import java.util.UUID;
 import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.examples.ExampleNodeStartup;
@@ -43,31 +40,16 @@ public class IgniteSemaphoreExample {
             // Make name of semaphore.
             final String semaphoreName = UUID.randomUUID().toString();
 
-            // Make name of mutex
-            final String mutexName = UUID.randomUUID().toString();
-
-            // Make shared resource
-            final String resourceName = UUID.randomUUID().toString();
-
-            // Get cache view where the resource will be held
-            IgniteCache<String, Queue<String>> cache = ignite.getOrCreateCache(CACHE_NAME);
-
-            // Put the resource queue the cache
-            cache.put(resourceName, new LinkedList<>());
-
             // Initialize semaphore.
             IgniteSemaphore semaphore = ignite.semaphore(semaphoreName, 0, false, true);
 
-            // Initialize mutex.
-            IgniteSemaphore mutex = ignite.semaphore(mutexName, 1, false, true);
-
             // Start consumers on all cluster nodes.
             for (int i = 0; i < NUM_CONSUMERS; i++)
-                ignite.compute().withAsync().run(new Consumer(mutexName, semaphoreName, resourceName));
+                ignite.compute().withAsync().run(new Consumer(semaphoreName));
 
             // Start producers on all cluster nodes.
             for (int i = 0; i < NUM_PRODUCERS; i++)
-                ignite.compute().withAsync().run(new Producer(mutexName, semaphoreName, resourceName));
+                ignite.compute().withAsync().run(new Producer(semaphoreName));
 
             System.out.println("Master node is waiting for all other nodes to finish...");
 
@@ -88,21 +70,11 @@ public class IgniteSemaphoreExample {
         /** Semaphore name. */
         protected final String semaphoreName;
 
-        /** Mutex name. */
-        protected final String mutexName;
-
-        /** Resource name. */
-        protected final String resourceName;
-
         /**
-         * @param mutexName Mutex name.
          * @param semaphoreName Semaphore name.
-         * @param resourceName Resource name.
          */
-        SemaphoreExampleClosure(String mutexName, String semaphoreName, String resourceName) {
+        SemaphoreExampleClosure(String semaphoreName) {
             this.semaphoreName = semaphoreName;
-            this.mutexName = mutexName;
-            this.resourceName = resourceName;
         }
     }
 
@@ -112,33 +84,18 @@ public class IgniteSemaphoreExample {
     private static class Producer extends SemaphoreExampleClosure {
 
         /**
-         * @param mutexName Mutex name.
          * @param semaphoreName Semaphore name.
-         * @param resourceName Resource name.
          */
-        public Producer(String mutexName, String semaphoreName, String resourceName) {
-            super(mutexName, semaphoreName, resourceName);
+        public Producer(String semaphoreName) {
+            super(semaphoreName);
         }
 
         /** {@inheritDoc} */
         @Override public void run() {
             IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
-            IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName, 0, true, true);
 
             for (int i = 0; i < ITEM_COUNT; i++) {
-                // Mutex is used to access shared resource.
-                mutex.acquire();
-
-                Queue<String> queue = (Queue<String>)Ignition.ignite().cache(CACHE_NAME).get(resourceName);
-
-                queue.add(Ignition.ignite().cluster().localNode().id().toString());
-
-                Ignition.ignite().cache(CACHE_NAME).put(resourceName, queue);
-
-                System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] produced data. Available: " + semaphore.availablePermits());
-
-                // Mutex is released for others to access the resource.
-                mutex.release();
+                System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "]. Available: " + semaphore.availablePermits());
 
                 // Signals others that shared resource is available.
                 semaphore.release();
@@ -160,36 +117,21 @@ public class IgniteSemaphoreExample {
     private static class Consumer extends SemaphoreExampleClosure {
 
         /**
-         * @param mutexName Mutex name.
          * @param semaphoreName Semaphore name.
-         * @param resourceName Resource name.
          */
-        public Consumer(String mutexName, String semaphoreName, String resourceName) {
-            super(mutexName, semaphoreName, resourceName);
+        public Consumer(String semaphoreName) {
+            super(semaphoreName);
         }
 
         /** {@inheritDoc} */
         @Override public void run() {
             IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
-            IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName, 0, true, true);
 
             for (int i = 0; i < ITEM_COUNT; i++) {
-                // Block if queue is empty.
+                // Block if no permits are available.
                 semaphore.acquire();
 
-                // Mutex is used to access shared resource.
-                mutex.acquire();
-
-                Queue<String> queue = (Queue<String>)Ignition.ignite().cache(CACHE_NAME).get(resourceName);
-
-                String data = queue.remove();
-
-                Ignition.ignite().cache(CACHE_NAME).put(resourceName, queue);
-
-                System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] consumed data generated by producer [nodeId=" + data + "]");
-
-                // Signals others that shared resource is available.
-                mutex.release();
+                System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "]. Available: " + semaphore.availablePermits());
             }
 
             System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 220dec2..b97b24f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -1195,19 +1195,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
      *      {@code create} is false.
      * @throws IgniteCheckedException If operation failed.
      */
-    public IgniteSemaphore semaphore(final String name,
-                                               final int cnt,
-                                               final boolean fair,
-                                               final boolean create)
-            throws IgniteCheckedException
-    {
+    public IgniteSemaphore semaphore(final String name, final int cnt, final boolean fair, final boolean create)
+        throws IgniteCheckedException {
         A.notNull(name, "name");
 
         awaitInitialization();
 
-        if (create)
-            A.ensure(cnt >= 0, "count can not be negative");
-
         checkAtomicsConfiguration();
 
         startQuery();
@@ -1234,12 +1227,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                         return null;
 
                     if (val == null) {
-                        val = new GridCacheSemaphoreState(cnt, 0);
+                        val = new GridCacheSemaphoreState(cnt, 0, fair);
 
                         dsView.put(key, val);
                     }
 
-                    semaphore = new GridCacheSemaphoreImpl(name, val.getCnt(),
+                    semaphore = new GridCacheSemaphoreImpl(name, val.getCount(),
                             fair,
                             key,
                             semaphoreView,
@@ -1278,19 +1271,17 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         awaitInitialization();
 
         removeDataStructure(new IgniteOutClosureX<Void>() {
-            @Override
-            public Void applyx() throws IgniteCheckedException {
+            @Override public Void applyx() throws IgniteCheckedException {
                 GridCacheInternal key = new GridCacheInternalKeyImpl(name);
 
                 dsCacheCtx.gate().enter();
 
                 try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
                     // Check correctness type of removable object.
-                    GridCacheSemaphoreState val =
-                            cast(dsView.get(key), GridCacheSemaphoreState.class);
+                    GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class);
 
                     if (val != null) {
-                        if (val.getCnt() < 0) {
+                        if (val.getCount() < 0) {
                             throw new IgniteCheckedException("Failed to remove semaphore " +
                                     "with blocked threads. ");
                         }
@@ -1318,8 +1309,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException If removing failed or class of object is different to expected class.
      */
     private <R> boolean removeInternal(final GridCacheInternal key, final Class<R> cls) throws IgniteCheckedException {
-        return CU.outTx(
-            new Callable<Boolean>() {
+        return CU.outTx(new Callable<Boolean>() {
                 @Override public Boolean call() throws Exception {
                     try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
                         // Check correctness type of removable object.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index d2a966f..78d923a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -261,7 +261,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
                                 if (val == null)
                                     throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
 
-                                boolean retVal = val.getCnt() == expVal;
+                                boolean retVal = val.getCount() == expVal;
 
                                 if (retVal) {
                                         /* If current thread is queued, than this call is the call that is going to be unblocked. */
@@ -274,7 +274,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
                                         sync.threadMap.remove(Thread.currentThread());
                                     }
 
-                                    val.setCnt(newVal);
+                                    val.setCount(newVal);
 
                                     semaphoreView.put(key, val);
 
@@ -396,7 +396,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
                                     return null;
                                 }
 
-                                final int count = val.getCnt();
+                                final int count = val.getCount();
 
                                 tx.commit();
 
@@ -448,7 +448,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
             return;
 
         // Update permission count.
-        sync.setPermits(val.getCnt());
+        sync.setPermits(val.getCount());
 
         // Update waiters count.
         sync.setWaiters(val.getWaiters());
@@ -463,7 +463,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     }
 
     /** {@inheritDoc} */
-    @Override public void acquire() throws IgniteException {
+    @Override public void acquire() throws IgniteInterruptedException {
         acquire(1);
     }
 
@@ -523,7 +523,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
                             if (val == null)
                                 throw new IgniteException("Failed to find semaphore with given name: " + name);
 
-                            int count = val.getCnt();
+                            int count = val.getCount();
 
                             tx.rollback();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
index e25649e..1109b53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
@@ -17,7 +17,7 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
     /**
      * Permission count.
      */
-    private int cnt;
+    private int count;
 
     /**
      * Waiter id.
@@ -32,10 +32,10 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
     /**
      * Constructor.
      *
-     * @param cnt Number of permissions.
+     * @param count Number of permissions.
      */
-    public GridCacheSemaphoreState(int cnt, int waiters) {
-        this.cnt = cnt;
+    public GridCacheSemaphoreState(int count, int waiters) {
+        this.count = count;
         this.waiters = waiters;
         this.fair = false;
     }
@@ -43,10 +43,10 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
     /**
      * Constructor.
      *
-     * @param cnt Number of permissions.
+     * @param count Number of permissions.
      */
-    public GridCacheSemaphoreState(int cnt, int waiters, boolean fair) {
-        this.cnt = cnt;
+    public GridCacheSemaphoreState(int count, int waiters, boolean fair) {
+        this.count = count;
         this.waiters = waiters;
         this.fair = fair;
     }
@@ -59,17 +59,17 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
     }
 
     /**
-     * @param cnt New count.
+     * @param count New count.
      */
-    public void setCnt(int cnt) {
-        this.cnt = cnt;
+    public void setCount(int count) {
+        this.count = count;
     }
 
     /**
      * @return Current count.
      */
-    public int getCnt() {
-        return cnt;
+    public int getCount() {
+        return count;
     }
 
     public int getWaiters() {
@@ -95,7 +95,7 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
      * {@inheritDoc}
      */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeInt(cnt);
+        out.writeInt(count);
         out.writeInt(waiters);
         out.writeBoolean(fair);
     }
@@ -104,7 +104,7 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
      * {@inheritDoc}
      */
     @Override public void readExternal(ObjectInput in) throws IOException {
-        cnt = in.readInt();
+        count = in.readInt();
         waiters = in.readInt();
         fair = in.readBoolean();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index 55dbb57..c46b5c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteAtomicSequence;
 import org.apache.ignite.IgniteAtomicStamped;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
 import org.apache.ignite.testframework.GridTestUtils;
 
@@ -675,4 +676,45 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
         assertTrue(srvLatch.await(1000));
         assertTrue(clientLatch.await(1000));
     }
-}
\ No newline at end of file
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSemaphoreReconnect() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        IgniteSemaphore clientSemaphore = client.semaphore("semaphore1", 3, false, true);
+
+        assertEquals(3, clientSemaphore.availablePermits());
+
+        final IgniteSemaphore srvSemaphore = srv.semaphore("semaphore1", 3, false, false);
+
+        assertEquals(3, srvSemaphore.availablePermits());
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                srvSemaphore.acquire();
+            }
+        });
+
+        assertEquals(2, srvSemaphore.availablePermits());
+        assertEquals(2, clientSemaphore.availablePermits());
+
+        srvSemaphore.acquire();
+
+        assertEquals(1, srvSemaphore.availablePermits());
+        assertEquals(1, clientSemaphore.availablePermits());
+
+        clientSemaphore.acquire();
+
+        assertEquals(0, srvSemaphore.availablePermits());
+        assertEquals(0, clientSemaphore.availablePermits());
+
+        assertFalse(srvSemaphore.tryAcquire());
+        assertFalse(srvSemaphore.tryAcquire());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
index 989f75f..bf6dcda 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteAtomicLong;
 import org.apache.ignite.IgniteAtomicSequence;
 import org.apache.ignite.IgniteCountDownLatch;
 import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteSet;
 import org.apache.ignite.configuration.CollectionConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -267,6 +268,62 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA
     /**
      * @throws Exception If failed.
      */
+    public void testSemaphore() throws Exception {
+        Ignite clientNode = clientIgnite();
+        Ignite srvNode = serverNode();
+
+        testSemaphore(clientNode, srvNode);
+        testSemaphore(srvNode, clientNode);
+    }
+
+    /**
+     * @param creator Creator node.
+     * @param other Other node.
+     * @throws Exception If failed.
+     */
+    private void testSemaphore(Ignite creator, final Ignite other) throws Exception {
+        assertNull(creator.semaphore("semaphore1", 1, true, false));
+        assertNull(other.semaphore("semaphore1", 1, true, false));
+
+        try (IgniteSemaphore semaphore = creator.semaphore("semaphore1", -1, true, true)) {
+            assertNotNull(semaphore);
+
+            assertEquals(-1, semaphore.availablePermits());
+
+            IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    U.sleep(1000);
+
+                    IgniteSemaphore semaphore0 = other.semaphore("semaphore1", -1, true, false);
+
+                    assertEquals(-1, semaphore0.availablePermits());
+
+                    log.info("Release semaphore.");
+
+                    semaphore0.release(2);
+
+                    return null;
+                }
+            });
+
+            log.info("Acquire semaphore.");
+
+            assertTrue(semaphore.tryAcquire(1, 5000, TimeUnit.MILLISECONDS));
+
+            log.info("Finished wait.");
+
+            fut.get();
+
+            assertEquals(0, semaphore.availablePermits());
+        }
+
+        assertNull(creator.semaphore("semaphore1", 1, true, false));
+        assertNull(other.semaphore("semaphore1", 1, true, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testQueue() throws Exception {
         Ignite clientNode = clientIgnite();
         Ignite srvNode = serverNode();
@@ -343,4 +400,4 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA
 
         return ignite;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
index f5305a2..4a21765 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteAtomicStamped;
 import org.apache.ignite.IgniteCountDownLatch;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteSet;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMemoryMode;
@@ -239,7 +240,7 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT
     private void testUniqueName(final boolean singleGrid) throws Exception {
         final String name = IgniteUuid.randomUuid().toString();
 
-        final int DS_TYPES = 7;
+        final int DS_TYPES = 8;
 
         final int THREADS = DS_TYPES * 3;
 
@@ -314,6 +315,12 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT
 
                                     break;
 
+                                case 7:
+                                    log.info("Create atomic semaphore, grid: " + ignite.name());
+
+                                    res = ignite.semaphore(name, 0, false, true);
+
+                                    break;
                                 default:
                                     fail();
 
@@ -352,7 +359,8 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT
                         res instanceof IgniteAtomicStamped ||
                         res instanceof IgniteCountDownLatch ||
                         res instanceof IgniteQueue ||
-                        res instanceof IgniteSet);
+                        res instanceof IgniteSet ||
+                        res instanceof IgniteSemaphore);
 
                 log.info("Data structure created: " + dataStructure);
 
@@ -371,4 +379,4 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT
             dataStructure.close();
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
new file mode 100644
index 0000000..977a414
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSemaphore;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+
+/**
+ * Cache semaphore self test.
+ */
+public abstract class IgniteSemaphoreAbstractSelfTest extends IgniteAtomicsAbstractTest
+    implements Externalizable {
+    /** */
+    private static final int NODES_CNT = 4;
+
+    /** */
+    protected static final int THREADS_CNT = 5;
+
+    /** */
+    private static final Random RND = new Random();
+
+    @Rule
+    public final ExpectedException exception = ExpectedException.none();
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return NODES_CNT;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSemaphore() throws Exception {
+        checkSemaphore();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkSemaphore() throws Exception {
+        // Test API.
+
+        checkAcquire();
+
+        checkRelease();
+
+        checkFair();
+
+        // Test main functionality.
+        IgniteSemaphore semaphore1 = grid(0).semaphore("semaphore", -2, true, true);
+
+        assertEquals(-2, semaphore1.availablePermits());
+
+        IgniteCompute comp = grid(0).compute().withAsync();
+
+        comp.call(new IgniteCallable<Object>() {
+            @IgniteInstanceResource
+            private Ignite ignite;
+
+            @LoggerResource
+            private IgniteLogger log;
+
+            @Nullable @Override public Object call() throws Exception {
+                // Test semaphore in multiple threads on each node.
+                IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(
+                    new Callable<Object>() {
+                        @Nullable @Override public Object call() throws Exception {
+                            IgniteSemaphore semaphore = ignite.semaphore("semaphore", -2, true, true);
+
+                            assert semaphore != null && semaphore.availablePermits() == -2;
+
+                            log.info("Thread is going to wait on semaphore: " + Thread.currentThread().getName());
+
+                            assert semaphore.tryAcquire(1, 1, MINUTES);
+
+                            log.info("Thread is again runnable: " + Thread.currentThread().getName());
+
+                            semaphore.release();
+
+                            return null;
+                        }
+                    },
+                    5,
+                    "test-thread"
+                );
+
+                fut.get();
+
+                return null;
+            }
+        });
+
+        IgniteFuture<Object> fut = comp.future();
+
+        Thread.sleep(3000);
+
+        semaphore1.release(2);
+
+        assert semaphore1.availablePermits() == 0;
+
+        semaphore1.release(1);
+
+        // Ensure there are no hangs.
+        fut.get();
+
+        // Test operations on removed semaphore.
+        semaphore1.close();
+
+        checkRemovedSemaphore(semaphore1);
+    }
+
+    /**
+     * @param semaphore Semaphore.
+     * @throws Exception If failed.
+     */
+    protected void checkRemovedSemaphore(final IgniteSemaphore semaphore) throws Exception {
+        assert GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return semaphore.removed();
+            }
+        }, 5000);
+
+        assert semaphore.removed();
+    }
+
+    /**
+     * @throws Exception Exception.
+     */
+    private void checkFair() throws Exception {
+        // Checks only if semaphore is initialized properly
+        IgniteSemaphore semaphore = createSemaphore("rmv", 5, true);
+
+        assert semaphore.isFair();
+
+        removeSemaphore("rmv");
+
+        IgniteSemaphore semaphore1 = createSemaphore("rmv1", 5, false);
+
+        assert !semaphore1.isFair();
+
+        removeSemaphore("rmv1");
+    }
+
+    /**
+     * @throws Exception Exception.
+     */
+    private void checkAcquire() throws Exception {
+        // Check only 'false' cases here. Successful await is tested over the grid.
+        IgniteSemaphore semaphore = createSemaphore("acquire", 5, false);
+
+        assert !semaphore.tryAcquire(10);
+        assert !semaphore.tryAcquire(10, 10, MICROSECONDS);
+
+        removeSemaphore("acquire");
+    }
+
+    /**
+     * @throws Exception Exception.
+     */
+    private void checkRelease() throws Exception {
+        IgniteSemaphore semaphore = createSemaphore("release", 5, false);
+
+        semaphore.release();
+        assert semaphore.availablePermits() == 6;
+
+        semaphore.release(2);
+        assert semaphore.availablePermits() == 8;
+
+        assert semaphore.drainPermits() == 8;
+        assert semaphore.availablePermits() == 0;
+
+        removeSemaphore("release");
+
+        checkRemovedSemaphore(semaphore);
+
+        IgniteSemaphore semaphore2 = createSemaphore("release2", -5, false);
+
+        semaphore2.release();
+
+        assert semaphore2.availablePermits() == -4;
+
+        semaphore2.release(2);
+
+        assert semaphore2.availablePermits() == -2;
+
+        assert semaphore2.drainPermits() == -2;
+
+        assert semaphore2.availablePermits() == 0;
+
+        removeSemaphore("release2");
+
+        checkRemovedSemaphore(semaphore2);
+    }
+
+    /**
+     * @param semaphoreName Semaphore name.
+     * @param numPermissions Initial number of permissions.
+     * @param fair Fairness flag.
+     * @return New semaphore.
+     * @throws Exception If failed.
+     */
+    private IgniteSemaphore createSemaphore(String semaphoreName, int numPermissions, boolean fair)
+        throws Exception {
+        IgniteSemaphore semaphore = grid(RND.nextInt(NODES_CNT)).semaphore(semaphoreName, numPermissions, fair, true);
+
+        // Test initialization.
+        assert semaphoreName.equals(semaphore.name());
+        assert semaphore.availablePermits() == numPermissions;
+        assert semaphore.getQueueLength() == 0;
+        assert semaphore.isFair() == fair;
+
+        return semaphore;
+    }
+
+    /**
+     * @param semaphoreName Semaphore name.
+     * @throws Exception If failed.
+     */
+    private void removeSemaphore(String semaphoreName)
+        throws Exception {
+        IgniteSemaphore semaphore = grid(RND.nextInt(NODES_CNT)).semaphore(semaphoreName, 10, false, true);
+
+        assert semaphore != null;
+
+        if (semaphore.availablePermits() < 0)
+            semaphore.release(-semaphore.availablePermits());
+
+        // Remove semaphore on random node.
+        IgniteSemaphore semaphore0 = grid(RND.nextInt(NODES_CNT)).semaphore(semaphoreName, 0, false, true);
+
+        assertNotNull(semaphore0);
+
+        semaphore0.close();
+
+        // Ensure semaphore is removed on all nodes.
+        for (Ignite g : G.allGrids())
+            assertNull(((IgniteKernal)g).context().dataStructures().semaphore(semaphoreName, 10, true, false));
+
+        checkRemovedSemaphore(semaphore);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSemaphoreMultinode1() throws Exception {
+        if (gridCount() == 1)
+            return;
+
+        IgniteSemaphore semaphore = grid(0).semaphore("s1", 0, true, true);
+
+        List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+        for (int i = 0; i < gridCount(); i++) {
+            final Ignite ignite = grid(i);
+
+            futs.add(GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    IgniteSemaphore semaphore = ignite.semaphore("s1", 0, true, false);
+
+                    assertNotNull(semaphore);
+
+                    boolean wait = semaphore.tryAcquire(30_000, MILLISECONDS);
+
+                    assertTrue(wait);
+
+                    return null;
+                }
+            }));
+        }
+
+        for (int i = 0; i < 10; i++)
+            semaphore.release();
+
+        for (IgniteInternalFuture<?> fut : futs)
+            fut.get(30_000);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalSemaphoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalSemaphoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalSemaphoreSelfTest.java
new file mode 100644
index 0000000..a516fc1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalSemaphoreSelfTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures.local;
+
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteSemaphore;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.datastructures.IgniteSemaphoreAbstractSelfTest;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+
+/**
+ *
+ */
+public class IgniteLocalSemaphoreSelfTest extends IgniteSemaphoreAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode atomicsCacheMode() {
+        return LOCAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void testSemaphore() throws Exception {
+        // Test main functionality.
+        IgniteSemaphore semaphore = grid(0).semaphore("semaphore", -2, false, true);
+
+        assertNotNull(semaphore);
+
+        assertEquals(-2, semaphore.availablePermits());
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(
+            new Callable<Object>() {
+                @Nullable @Override public Object call() throws Exception {
+                    IgniteSemaphore semaphore = grid(0).semaphore("semaphore", -2, false, true);
+
+                    assert semaphore != null && semaphore.availablePermits() == -2;
+
+                    info("Thread is going to wait on semaphore: " + Thread.currentThread().getName());
+
+                    assert semaphore.tryAcquire(1, 1, MINUTES);
+
+                    info("Thread is again runnable: " + Thread.currentThread().getName());
+
+                    semaphore.release();
+
+                    return null;
+                }
+            },
+            THREADS_CNT,
+            "test-thread"
+        );
+
+        Thread.sleep(3000);
+
+        assert semaphore.availablePermits() == -2;
+
+        semaphore.release(2);
+
+        assert semaphore.availablePermits() == 0;
+
+        semaphore.release();
+
+        // Ensure there are no hangs.
+        fut.get();
+
+        // Test operations on removed latch.
+        IgniteSemaphore semaphore0 = grid(0).semaphore("semaphore", 0, false, false);
+
+        assertNotNull(semaphore0);
+
+        semaphore0.close();
+
+        checkRemovedSemaphore(semaphore0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java
new file mode 100644
index 0000000..059e80a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures.partitioned;
+
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.processors.cache.datastructures.IgniteSemaphoreAbstractSelfTest;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class IgnitePartitionedSemaphoreSelfTest extends IgniteSemaphoreAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode atomicsCacheMode() {
+        return PARTITIONED;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedSemaphoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedSemaphoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedSemaphoreSelfTest.java
new file mode 100644
index 0000000..f58754f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedSemaphoreSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures.replicated;
+
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.processors.cache.datastructures.IgniteSemaphoreAbstractSelfTest;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class IgniteReplicatedSemaphoreSelfTest extends IgniteSemaphoreAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode atomicsCacheMode() {
+        return REPLICATED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c6852d0/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java
index 2fa8940..d4ca9a5 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCountDownLatch;
 import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.configuration.CollectionConfiguration;
@@ -60,6 +61,9 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
     /** Count down latch name. */
     private static final String TEST_LATCH_NAME = "test-latch";
 
+    /** Semaphore name. */
+    private static final String TEST_SEMAPHORE_NAME = "test-semaphore";
+
     /** */
     private static final CollectionConfiguration colCfg = new CollectionConfiguration();
 
@@ -69,6 +73,9 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
     /** Count down latch initial count. */
     private static final int LATCH_INIT_CNT = 1000;
 
+    /** Semaphore initial count. */
+    private static final int SEMAPHORE_INIT_CNT = 1000;
+
     /** */
     private static final boolean LONG = false;
 
@@ -88,6 +95,9 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
     private static final boolean LATCH = true;
 
     /** */
+    private static final boolean SEMAPHORE = true;
+
+    /** */
     private GridCacheDataStructuresLoadTest() {
         // No-op
     }
@@ -95,210 +105,247 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
     /** Atomic long write closure. */
     private final CIX1<Ignite> longWriteClos =
         new CIX1<Ignite>() {
-        @Override public void applyx(Ignite ignite) {
-            IgniteAtomicLong al = ignite.atomicLong(TEST_LONG_NAME, 0, true);
+            @Override public void applyx(Ignite ignite) {
+                IgniteAtomicLong al = ignite.atomicLong(TEST_LONG_NAME, 0, true);
 
-            for (int i = 0; i < operationsPerTx; i++) {
-                al.addAndGet(RAND.nextInt(MAX_INT));
+                for (int i = 0; i < operationsPerTx; i++) {
+                    al.addAndGet(RAND.nextInt(MAX_INT));
 
-                long cnt = writes.incrementAndGet();
+                    long cnt = writes.incrementAndGet();
 
-                if (cnt % WRITE_LOG_MOD == 0)
-                    info("Performed " + cnt + " writes.");
+                    if (cnt % WRITE_LOG_MOD == 0)
+                        info("Performed " + cnt + " writes.");
+                }
             }
-        }
-    };
+        };
 
     /** Atomic long read closure. */
     private final CIX1<Ignite> longReadClos =
         new CIX1<Ignite>() {
-        @Override public void applyx(Ignite ignite) {
-            IgniteAtomicLong al = ignite.atomicLong(TEST_LONG_NAME, 0, true);
+            @Override public void applyx(Ignite ignite) {
+                IgniteAtomicLong al = ignite.atomicLong(TEST_LONG_NAME, 0, true);
 
-            for (int i = 0; i < operationsPerTx; i++) {
-                al.get();
+                for (int i = 0; i < operationsPerTx; i++) {
+                    al.get();
 
-                long cnt = reads.incrementAndGet();
+                    long cnt = reads.incrementAndGet();
 
-                if (cnt % READ_LOG_MOD == 0)
-                    info("Performed " + cnt + " reads.");
+                    if (cnt % READ_LOG_MOD == 0)
+                        info("Performed " + cnt + " reads.");
+                }
             }
-        }
-    };
+        };
 
     /** Atomic reference write closure. */
     private final CIX1<Ignite> refWriteClos =
         new CIX1<Ignite>() {
-        @Override public void applyx(Ignite ignite) {
-            IgniteAtomicReference<Integer> ar = ignite.atomicReference(TEST_REF_NAME,
-                null, true);
+            @Override public void applyx(Ignite ignite) {
+                IgniteAtomicReference<Integer> ar = ignite.atomicReference(TEST_REF_NAME,
+                    null, true);
 
-            for (int i = 0; i < operationsPerTx; i++) {
-                ar.set(RAND.nextInt(MAX_INT));
+                for (int i = 0; i < operationsPerTx; i++) {
+                    ar.set(RAND.nextInt(MAX_INT));
 
-                long cnt = writes.incrementAndGet();
+                    long cnt = writes.incrementAndGet();
 
-                if (cnt % WRITE_LOG_MOD == 0)
-                    info("Performed " + cnt + " writes.");
+                    if (cnt % WRITE_LOG_MOD == 0)
+                        info("Performed " + cnt + " writes.");
+                }
             }
-        }
-    };
+        };
 
     /** Atomic reference read closure. */
     private final CIX1<Ignite> refReadClos =
         new CIX1<Ignite>() {
-        @Override public void applyx(Ignite ignite) {
-            IgniteAtomicReference<Integer> ar = ignite.atomicReference(TEST_REF_NAME, null,
-                true);
+            @Override public void applyx(Ignite ignite) {
+                IgniteAtomicReference<Integer> ar = ignite.atomicReference(TEST_REF_NAME, null,
+                    true);
 
-            for (int i = 0; i < operationsPerTx; i++) {
-                ar.get();
+                for (int i = 0; i < operationsPerTx; i++) {
+                    ar.get();
 
-                long cnt = reads.incrementAndGet();
+                    long cnt = reads.incrementAndGet();
 
-                if (cnt % READ_LOG_MOD == 0)
-                    info("Performed " + cnt + " reads.");
+                    if (cnt % READ_LOG_MOD == 0)
+                        info("Performed " + cnt + " reads.");
+                }
             }
-        }
-    };
+        };
 
     /** Atomic sequence write closure. */
     private final CIX1<Ignite> seqWriteClos =
         new CIX1<Ignite>() {
-        @Override public void applyx(Ignite ignite) {
-            IgniteAtomicSequence as = ignite.atomicSequence(TEST_SEQ_NAME, 0, true);
+            @Override public void applyx(Ignite ignite) {
+                IgniteAtomicSequence as = ignite.atomicSequence(TEST_SEQ_NAME, 0, true);
 
-            for (int i = 0; i < operationsPerTx; i++) {
-                as.addAndGet(RAND.nextInt(MAX_INT) + 1);
+                for (int i = 0; i < operationsPerTx; i++) {
+                    as.addAndGet(RAND.nextInt(MAX_INT) + 1);
 
-                long cnt = writes.incrementAndGet();
+                    long cnt = writes.incrementAndGet();
 
-                if (cnt % WRITE_LOG_MOD == 0)
-                    info("Performed " + cnt + " writes.");
+                    if (cnt % WRITE_LOG_MOD == 0)
+                        info("Performed " + cnt + " writes.");
+                }
             }
-        }
-    };
+        };
 
     /** Atomic sequence read closure. */
     private final CIX1<Ignite> seqReadClos =
         new CIX1<Ignite>() {
-        @Override public void applyx(Ignite ignite) {
-            IgniteAtomicSequence as = ignite.atomicSequence(TEST_SEQ_NAME, 0, true);
+            @Override public void applyx(Ignite ignite) {
+                IgniteAtomicSequence as = ignite.atomicSequence(TEST_SEQ_NAME, 0, true);
 
-            for (int i = 0; i < operationsPerTx; i++) {
-                as.get();
+                for (int i = 0; i < operationsPerTx; i++) {
+                    as.get();
 
-                long cnt = reads.incrementAndGet();
+                    long cnt = reads.incrementAndGet();
 
-                if (cnt % READ_LOG_MOD == 0)
-                    info("Performed " + cnt + " reads.");
+                    if (cnt % READ_LOG_MOD == 0)
+                        info("Performed " + cnt + " reads.");
+                }
             }
-        }
-    };
+        };
 
     /** Atomic stamped write closure. */
     private final CIX1<Ignite> stampWriteClos =
         new CIX1<Ignite>() {
-        @Override public void applyx(Ignite ignite) {
-            IgniteAtomicStamped<Integer, Integer> as = ignite.atomicStamped(TEST_STAMP_NAME,
-                0, 0, true);
+            @Override public void applyx(Ignite ignite) {
+                IgniteAtomicStamped<Integer, Integer> as = ignite.atomicStamped(TEST_STAMP_NAME,
+                    0, 0, true);
 
-            for (int i = 0; i < operationsPerTx; i++) {
-                as.set(RAND.nextInt(MAX_INT), RAND.nextInt(MAX_INT));
+                for (int i = 0; i < operationsPerTx; i++) {
+                    as.set(RAND.nextInt(MAX_INT), RAND.nextInt(MAX_INT));
 
-                long cnt = writes.incrementAndGet();
+                    long cnt = writes.incrementAndGet();
 
-                if (cnt % WRITE_LOG_MOD == 0)
-                    info("Performed " + cnt + " writes.");
+                    if (cnt % WRITE_LOG_MOD == 0)
+                        info("Performed " + cnt + " writes.");
+                }
             }
-        }
-    };
+        };
 
     /** Atomic stamped read closure. */
     private final CIX1<Ignite> stampReadClos =
         new CIX1<Ignite>() {
-        @Override public void applyx(Ignite ignite) {
-            IgniteAtomicStamped<Integer, Integer> as = ignite.atomicStamped(TEST_STAMP_NAME,
-                0, 0, true);
+            @Override public void applyx(Ignite ignite) {
+                IgniteAtomicStamped<Integer, Integer> as = ignite.atomicStamped(TEST_STAMP_NAME,
+                    0, 0, true);
 
-            for (int i = 0; i < operationsPerTx; i++) {
-                as.get();
+                for (int i = 0; i < operationsPerTx; i++) {
+                    as.get();
 
-                long cnt = reads.incrementAndGet();
+                    long cnt = reads.incrementAndGet();
 
-                if (cnt % READ_LOG_MOD == 0)
-                    info("Performed " + cnt + " reads.");
+                    if (cnt % READ_LOG_MOD == 0)
+                        info("Performed " + cnt + " reads.");
+                }
             }
-        }
-    };
+        };
 
     /** Queue write closure. */
     private final CIX1<Ignite> queueWriteClos =
         new CIX1<Ignite>() {
-        @Override public void applyx(Ignite ignite) {
-            IgniteQueue<Integer> q = ignite.queue(TEST_QUEUE_NAME, 0, colCfg);
+            @Override public void applyx(Ignite ignite) {
+                IgniteQueue<Integer> q = ignite.queue(TEST_QUEUE_NAME, 0, colCfg);
 
-            for (int i = 0; i < operationsPerTx; i++) {
-                q.put(RAND.nextInt(MAX_INT));
+                for (int i = 0; i < operationsPerTx; i++) {
+                    q.put(RAND.nextInt(MAX_INT));
 
-                long cnt = writes.incrementAndGet();
+                    long cnt = writes.incrementAndGet();
 
-                if (cnt % WRITE_LOG_MOD == 0)
-                    info("Performed " + cnt + " writes.");
+                    if (cnt % WRITE_LOG_MOD == 0)
+                        info("Performed " + cnt + " writes.");
+                }
             }
-        }
-    };
+        };
 
     /** Queue read closure. */
     private final CIX1<Ignite> queueReadClos =
         new CIX1<Ignite>() {
-        @Override public void applyx(Ignite ignite) {
-            IgniteQueue<Integer> q = ignite.queue(TEST_QUEUE_NAME, 0, colCfg);
+            @Override public void applyx(Ignite ignite) {
+                IgniteQueue<Integer> q = ignite.queue(TEST_QUEUE_NAME, 0, colCfg);
 
-            for (int i = 0; i < operationsPerTx; i++) {
-                q.peek();
+                for (int i = 0; i < operationsPerTx; i++) {
+                    q.peek();
 
-                long cnt = reads.incrementAndGet();
+                    long cnt = reads.incrementAndGet();
 
-                if (cnt % READ_LOG_MOD == 0)
-                    info("Performed " + cnt + " reads.");
+                    if (cnt % READ_LOG_MOD == 0)
+                        info("Performed " + cnt + " reads.");
+                }
             }
-        }
-    };
+        };
 
     /** Count down latch write closure. */
     private final CIX1<Ignite> latchWriteClos =
         new CIX1<Ignite>() {
-        @Override public void applyx(Ignite ignite) {
-            IgniteCountDownLatch l = ignite.countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, true, true);
+            @Override public void applyx(Ignite ignite) {
+                IgniteCountDownLatch l = ignite.countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, true, true);
 
-            for (int i = 0; i < operationsPerTx; i++) {
-                l.countDown();
+                for (int i = 0; i < operationsPerTx; i++) {
+                    l.countDown();
 
-                long cnt = writes.incrementAndGet();
+                    long cnt = writes.incrementAndGet();
 
-                if (cnt % WRITE_LOG_MOD == 0)
-                    info("Performed " + cnt + " writes.");
+                    if (cnt % WRITE_LOG_MOD == 0)
+                        info("Performed " + cnt + " writes.");
+                }
             }
-        }
-    };
+        };
 
     /** Count down latch read closure. */
     private final CIX1<Ignite> latchReadClos =
         new CIX1<Ignite>() {
-        @Override public void applyx(Ignite ignite) {
-            IgniteCountDownLatch l = ignite.countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, true, true);
+            @Override public void applyx(Ignite ignite) {
+                IgniteCountDownLatch l = ignite.countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, true, true);
 
-            for (int i = 0; i < operationsPerTx; i++) {
-                l.count();
+                for (int i = 0; i < operationsPerTx; i++) {
+                    l.count();
 
-                long cnt = reads.incrementAndGet();
+                    long cnt = reads.incrementAndGet();
 
-                if (cnt % READ_LOG_MOD == 0)
-                    info("Performed " + cnt + " reads.");
+                    if (cnt % READ_LOG_MOD == 0)
+                        info("Performed " + cnt + " reads.");
+                }
             }
-        }
-    };
+        };
+
+    /** Semaphore write closure. */
+    private final CIX1<Ignite> semaphoreWriteClos =
+        new CIX1<Ignite>() {
+            @Override public void applyx(Ignite ignite) {
+                IgniteSemaphore s = ignite.semaphore(TEST_SEMAPHORE_NAME, SEMAPHORE_INIT_CNT, false, true);
+
+                for (int i = 0; i < operationsPerTx; i++) {
+                    if ((i % 2) == 0)
+                        s.release();
+                    else
+                        s.acquire();
+
+                    long cnt = writes.incrementAndGet();
+
+                    if (cnt % WRITE_LOG_MOD == 0)
+                        info("Performed " + cnt + " writes.");
+                }
+            }
+        };
+
+    /** Semaphore read closure. */
+    private final CIX1<Ignite> semaphoreReadClos =
+        new CIX1<Ignite>() {
+            @Override public void applyx(Ignite ignite) {
+                IgniteSemaphore s = ignite.semaphore(TEST_SEMAPHORE_NAME, SEMAPHORE_INIT_CNT, false, true);
+
+                for (int i = 0; i < operationsPerTx; i++) {
+                    s.availablePermits();
+
+                    long cnt = reads.incrementAndGet();
+
+                    if (cnt % READ_LOG_MOD == 0)
+                        info("Performed " + cnt + " reads.");
+                }
+            }
+        };
 
     /**
      * @param args Arguments.
@@ -362,6 +409,14 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
 
                 test.loadTestIgnite(test.latchWriteClos, test.latchReadClos);
             }
+
+            System.gc();
+
+            if (SEMAPHORE) {
+                info("Testing semaphore...");
+
+                test.loadTestIgnite(test.semaphoreWriteClos, test.semaphoreReadClos);
+            }
         }
     }
 
@@ -407,7 +462,7 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
                 @Nullable @Override public Object call() throws Exception {
                     long start = System.currentTimeMillis();
 
-                    while(!done.get()) {
+                    while (!done.get()) {
                         if (tx) {
                             try (Transaction tx = ignite.transactions().txStart()) {
                                 readClos.apply(ignite);
@@ -447,4 +502,4 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
             throw new RuntimeException(e);
         }
     }
-}
\ No newline at end of file
+}


[3/6] ignite git commit: Fixes formatting issues;

Posted by yz...@apache.org.
Fixes formatting issues;


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/77f9deb7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/77f9deb7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/77f9deb7

Branch: refs/heads/ignite-638
Commit: 77f9deb7f79acb04b73994e83efd4539646073e6
Parents: be332a8
Author: Vladisav Jelisavcic <vl...@gmail.com>
Authored: Sat Oct 3 12:44:02 2015 +0200
Committer: vladisav <vl...@gmail.com>
Committed: Sat Oct 3 12:44:02 2015 +0200

----------------------------------------------------------------------
 .../datastructures/GridCacheSemaphoreImpl.java  | 158 +++++++++++--------
 .../datastructures/GridCacheSemaphoreState.java |  12 +-
 2 files changed, 97 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/77f9deb7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index 17efc61..d2a966f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -96,30 +96,61 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     abstract class Sync extends AbstractQueuedSynchronizer {
         private static final long serialVersionUID = 1192457210091910933L;
 
+        /** Thread map. */
         protected final ConcurrentMap<Thread, Integer> threadMap;
+
+        /** Total number of threads currently waiting on this semaphore. */
         protected int totalWaiters;
 
-        Sync(int permits) {
+        protected Sync(int permits) {
             setState(permits);
             threadMap = new ConcurrentHashMap<>();
         }
 
+        /**
+         * Sets the estimate of the total current number of threads waiting on this semaphore. This method should only
+         * be called in {@linkplain GridCacheSemaphoreImpl#onUpdate(GridCacheSemaphoreState)}.
+         *
+         * @param waiters Thread count.
+         */
         protected synchronized void setWaiters(int waiters) {
             totalWaiters = waiters;
         }
 
+        /**
+         * Gets the number of waiting threads.
+         *
+         * @return Number of thread waiting at this semaphore.
+         */
         public int getWaiters() {
             return totalWaiters;
         }
 
+        /**
+         * Sets the number of permits currently available on this semaphore. This method should only be used in
+         * {@linkplain GridCacheSemaphoreImpl#onUpdate(GridCacheSemaphoreState)}.
+         *
+         * @param permits Number of permits available at this semaphore.
+         */
         final synchronized void setPermits(int permits) {
             setState(permits);
         }
 
+        /**
+         * Gets the number of permissions currently available.
+         *
+         * @return Number of permits available at this semaphore.
+         */
         final int getPermits() {
             return getState();
         }
 
+        /**
+         * This method is used by the AQS to test if the current thread should block or not.
+         *
+         * @param acquires Number of permits to acquire.
+         * @return Negative number if thread should block, positive if thread successfully acquires permits.
+         */
         final int nonfairTryAcquireShared(int acquires) {
             for (; ; ) {
                 int available = getState();
@@ -137,7 +168,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
             }
         }
 
-        protected final boolean tryReleaseShared(int releases) {
+        /** {@inheritDoc} */
+        @Override protected final boolean tryReleaseShared(int releases) {
             // Check if some other node updated the state.
             // This method is called with release==0 only when trying to wake through update.
             if (releases == 0)
@@ -156,20 +188,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
             }
         }
 
-        final void reducePermits(int reductions) {
-            for (; ; ) {
-                int current = getState();
-
-                int next = current - reductions;
-
-                if (next > current) // underflow
-                    throw new Error("Permit count underflow");
-
-                if (compareAndSetGlobalState(current, next))
-                    return;
-            }
-        }
-
+        /**
+         * This method is used internally to implement {@linkplain GridCacheSemaphoreImpl#drainPermits()}.
+         *
+         * @return Number of permits to drain.
+         */
         final int drainPermits() {
             for (; ; ) {
 
@@ -180,12 +203,15 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
             }
         }
 
+        /**
+         * This method is used when thread blocks on this semaphore to synchronize the waiting thread counter across all
+         * nodes.
+         */
         protected void getAndIncWaitingCount() {
             try {
                 CU.outTx(
                     retryTopologySafe(new Callable<Boolean>() {
-                        @Override
-                        public Boolean call() throws Exception {
+                        @Override public Boolean call() throws Exception {
                             try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
                                 GridCacheSemaphoreState val = semaphoreView.get(key);
 
@@ -221,12 +247,14 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
             }
         }
 
+        /**
+         * This method is used for synchronizing the semaphore state across all nodes.
+         */
         protected boolean compareAndSetGlobalState(final int expVal, final int newVal) {
             try {
                 return CU.outTx(
                     retryTopologySafe(new Callable<Boolean>() {
-                        @Override
-                        public Boolean call() throws Exception {
+                        @Override public Boolean call() throws Exception {
                             try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
                                 GridCacheSemaphoreState val = semaphoreView.get(key);
 
@@ -281,7 +309,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
             super(permits);
         }
 
-        protected int tryAcquireShared(int acquires) {
+        /** {@inheritDoc} */
+        @Override protected int tryAcquireShared(int acquires) {
             return nonfairTryAcquireShared(acquires);
         }
     }
@@ -296,7 +325,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
             super(permits);
         }
 
-        protected int tryAcquireShared(int acquires) {
+        /** {@inheritDoc} */
+        @Override protected int tryAcquireShared(int acquires) {
             for (; ; ) {
                 if (hasQueuedPredecessors())
                     return -1;
@@ -355,8 +385,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
             try {
                 sync = CU.outTx(
                     retryTopologySafe(new Callable<Sync>() {
-                        @Override
-                        public Sync call() throws Exception {
+                        @Override public Sync call() throws Exception {
                             try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
                                 GridCacheSemaphoreState val = semaphoreView.get(key);
 
@@ -428,18 +457,18 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         sync.releaseShared(0);
     }
 
-    @Override
-    public void needCheckNotRemoved() {
+    /** {@inheritDoc} */
+    @Override public void needCheckNotRemoved() {
         // No-op.
     }
 
-    @Override
-    public void acquire() throws IgniteException {
+    /** {@inheritDoc} */
+    @Override public void acquire() throws IgniteException {
         acquire(1);
     }
 
-    @Override
-    public void acquire(int permits) throws IgniteInterruptedException {
+    /** {@inheritDoc} */
+    @Override public void acquire(int permits) throws IgniteInterruptedException {
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
 
         try {
@@ -455,8 +484,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         }
     }
 
-    @Override
-    public void acquireUninterruptibly() {
+    /** {@inheritDoc} */
+    @Override public void acquireUninterruptibly() {
         try {
             initializeSemaphore();
 
@@ -467,8 +496,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         }
     }
 
-    @Override
-    public void acquireUninterruptibly(int permits) {
+    /** {@inheritDoc} */
+    @Override public void acquireUninterruptibly(int permits) {
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
         try {
             initializeSemaphore();
@@ -480,15 +509,14 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         }
     }
 
-    @Override
-    public int availablePermits() {
+    /** {@inheritDoc} */
+    @Override public int availablePermits() {
         int ret;
         try {
             initializeSemaphore();
             ret = CU.outTx(
                 retryTopologySafe(new Callable<Integer>() {
-                    @Override
-                    public Integer call() throws Exception {
+                    @Override public Integer call() throws Exception {
                         try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
                             GridCacheSemaphoreState val = semaphoreView.get(key);
 
@@ -512,8 +540,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         return ret;
     }
 
-    @Override
-    public int drainPermits() {
+    /** {@inheritDoc} */
+    @Override public int drainPermits() {
         try {
             initializeSemaphore();
 
@@ -524,8 +552,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         }
     }
 
-    @Override
-    public boolean tryAcquire() {
+    /** {@inheritDoc} */
+    @Override public boolean tryAcquire() {
         try {
             initializeSemaphore();
 
@@ -536,8 +564,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         }
     }
 
-    @Override
-    public boolean tryAcquire(long timeout, TimeUnit unit) throws IgniteException {
+    /** {@inheritDoc} */
+    @Override public boolean tryAcquire(long timeout, TimeUnit unit) throws IgniteException {
         try {
             initializeSemaphore();
 
@@ -551,13 +579,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         }
     }
 
-    @Override
-    public void release() {
+    /** {@inheritDoc} */
+    @Override public void release() {
         release(1);
     }
 
-    @Override
-    public void release(int permits) {
+    /** {@inheritDoc} */
+    @Override public void release(int permits) {
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
 
         try {
@@ -570,8 +598,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         }
     }
 
-    @Override
-    public boolean tryAcquire(int permits) {
+    /** {@inheritDoc} */
+    @Override public boolean tryAcquire(int permits) {
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
 
         try {
@@ -584,8 +612,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         }
     }
 
-    @Override
-    public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws IgniteInterruptedException {
+    /** {@inheritDoc} */
+    @Override public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws IgniteInterruptedException {
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
         try {
             initializeSemaphore();
@@ -600,13 +628,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         }
     }
 
-    @Override
-    public boolean isFair() {
-        return false;
+    /** {@inheritDoc} */
+    @Override public boolean isFair() {
+        return isFair;
     }
 
-    @Override
-    public boolean hasQueuedThreads() {
+    /** {@inheritDoc} */
+    @Override public boolean hasQueuedThreads() {
         try {
             initializeSemaphore();
 
@@ -617,8 +645,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         }
     }
 
-    @Override
-    public int getQueueLength() {
+    /** {@inheritDoc} */
+    @Override public int getQueueLength() {
         try {
             initializeSemaphore();
 
@@ -629,22 +657,22 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         }
     }
 
-    @Override
-    public void writeExternal(ObjectOutput out) throws IOException {
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeObject(ctx.kernalContext());
         out.writeUTF(name);
     }
 
-    @Override
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         IgniteBiTuple<GridKernalContext, String> t = stash.get();
 
         t.set1((GridKernalContext)in.readObject());
         t.set2(in.readUTF());
     }
 
-    @Override
-    public void close() {
+    /** {@inheritDoc} */
+    @Override public void close() {
         if (!rmvd) {
             try {
                 ctx.kernalContext().dataStructures().removeSemaphore(name);

http://git-wip-us.apache.org/repos/asf/ignite/blob/77f9deb7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
index a02b7c9..e25649e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
@@ -87,16 +87,14 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
     /**
      * {@inheritDoc}
      */
-    @Override
-    public Object clone() throws CloneNotSupportedException {
+    @Override public Object clone() throws CloneNotSupportedException {
         return super.clone();
     }
 
     /**
      * {@inheritDoc}
      */
-    @Override
-    public void writeExternal(ObjectOutput out) throws IOException {
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeInt(cnt);
         out.writeInt(waiters);
         out.writeBoolean(fair);
@@ -105,8 +103,7 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
     /**
      * {@inheritDoc}
      */
-    @Override
-    public void readExternal(ObjectInput in) throws IOException {
+    @Override public void readExternal(ObjectInput in) throws IOException {
         cnt = in.readInt();
         waiters = in.readInt();
         fair = in.readBoolean();
@@ -115,8 +112,7 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
     /**
      * {@inheritDoc}
      */
-    @Override
-    public String toString() {
+    @Override public String toString() {
         return S.toString(GridCacheSemaphoreState.class, this);
     }
 }