You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/15 11:41:33 UTC

[04/22] ignite git commit: IGNITE-2735 - Interrupt all acquires on local node after ignite.close - Fixes #525.

IGNITE-2735 - Interrupt all acquires on local node after ignite.close - Fixes #525.

Signed-off-by: Valentin Kulichenko <va...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b63cee45
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b63cee45
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b63cee45

Branch: refs/heads/ignite-1786
Commit: b63cee457007bfcf91bd4d0415c57a1b05647b7b
Parents: 053af5d
Author: Vladisav Jelisavcic <vl...@gmail.com>
Authored: Thu Mar 10 21:02:52 2016 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Thu Mar 10 21:02:52 2016 -0800

----------------------------------------------------------------------
 .../datastructures/DataStructuresProcessor.java |   5 +
 .../datastructures/GridCacheSemaphoreEx.java    |   5 +
 .../datastructures/GridCacheSemaphoreImpl.java  | 161 +++++++++++++++++--
 ...eAbstractDataStructuresFailoverSelfTest.java |  44 +++++
 4 files changed, 206 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b63cee45/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 445fc3e..0b02abd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -259,6 +259,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     @Override public void onKernalStop(boolean cancel) {
         super.onKernalStop(cancel);
 
+        for (GridCacheRemovable ds : dsMap.values()) {
+            if (ds instanceof GridCacheSemaphoreEx)
+                ((GridCacheSemaphoreEx)ds).stop();
+        }
+
         if (initLatch.getCount() > 0) {
             initFailed = true;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b63cee45/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
index 4d39635..b49d6b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
@@ -44,4 +44,9 @@ public interface GridCacheSemaphoreEx extends IgniteSemaphore, GridCacheRemovabl
      * @param nodeId Id of the node that left the grid.
      */
     public void onNodeRemoved(UUID nodeId);
+
+    /**
+     * Callback to notify local semaphore instance on node stop.
+     */
+    public void stop();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b63cee45/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index 37df9d5..2c60e8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -179,6 +179,29 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         }
 
         /**
+         * Set a flag indicating that it is not safe to continue using this semaphore.
+         * This is the case only if one of two things happened:
+         * 1. A node that previously acquired on this semaphore failed and
+         * semaphore is created in non-failoversafe mode;
+         * 2. Local node failed (is closed), so any any threads on this node
+         * waiting to acquire are notified, and semaphore is not safe to be used anymore.
+         *
+         * @return True is semaphore is not safe to be used anymore.
+         */
+        protected boolean isBroken() {
+            return broken;
+        }
+
+        /** Flag indicating that a node failed and it is not safe to continue using this semaphore.
+         * Any attempt to acquire on broken semaphore will result in {@linkplain IgniteInterruptedException}.
+         *
+         * @param broken True if semaphore should not be used anymore.
+         * */
+        protected void setBroken(boolean broken) {
+            this.broken = broken;
+        }
+
+        /**
          * This method is used by the AQS to test if the current thread should block or not.
          *
          * @param acquires Number of permits to acquire.
@@ -186,6 +209,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
          */
         final int nonfairTryAcquireShared(int acquires) {
             for (;;) {
+                // If broken, return immediately, exception will be thrown anyway.
+                if(broken)
+                    return 1;
+
                 int available = getState();
 
                 int remaining = available - acquires;
@@ -209,6 +236,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
                 return true;
 
             for (;;) {
+                // If broken, return immediately, exception will be thrown anyway.
+                if(broken)
+                    return true;
+
                 int cur = getState();
 
                 int next = cur + releases;
@@ -228,6 +259,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
          */
         final int drainPermits() {
             for (;;) {
+                // If broken, return immediately, exception will be thrown anyway.
+                if(broken)
+                    return 1;
 
                 int current = getState();
 
@@ -504,7 +538,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
                 sync.releaseFailedNode(nodeId);
             else {
                 // Interrupt every waiting thread if this semaphore is not failover safe.
-                sync.broken = true;
+                sync.setBroken(true);
 
                 for (Thread t : sync.getSharedQueuedThreads())
                     t.interrupt();
@@ -515,6 +549,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         }
     }
 
+    @Override public void stop() {
+        sync.setBroken(true);
+
+        // Try to notify any waiting threads.
+        sync.releaseShared(0);
+    }
+
     /** {@inheritDoc} */
     @Override public void needCheckNotRemoved() {
         // No-op.
@@ -527,15 +568,17 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
 
     /** {@inheritDoc} */
     @Override public void acquire(int permits) throws IgniteInterruptedException {
+        ctx.kernalContext().gateway().readLock();
+
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
 
         try {
             initializeSemaphore();
 
-            if(isBroken())
-                Thread.currentThread().interrupt();
-
             sync.acquireSharedInterruptibly(permits);
+
+            if(isBroken())
+                throw new InterruptedException();
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -543,10 +586,15 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         catch (InterruptedException e) {
             throw new IgniteInterruptedException(e);
         }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
     }
 
     /** {@inheritDoc} */
     @Override public void acquireUninterruptibly() {
+        ctx.kernalContext().gateway().readLock();
+
         try {
             initializeSemaphore();
 
@@ -555,10 +603,15 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
     }
 
     /** {@inheritDoc} */
     @Override public void acquireUninterruptibly(int permits) {
+        ctx.kernalContext().gateway().readLock();
+
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
 
         try {
@@ -569,10 +622,15 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
     }
 
     /** {@inheritDoc} */
     @Override public int availablePermits() {
+        ctx.kernalContext().gateway().readLock();
+
         int ret;
         try {
             initializeSemaphore();
@@ -603,12 +661,17 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
 
         return ret;
     }
 
     /** {@inheritDoc} */
     @Override public int drainPermits() {
+        ctx.kernalContext().gateway().readLock();
+
         try {
             initializeSemaphore();
 
@@ -617,26 +680,49 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
     }
 
     /** {@inheritDoc} */
     @Override public boolean tryAcquire() {
+        ctx.kernalContext().gateway().readLock();
+
         try {
             initializeSemaphore();
 
-            return sync.nonfairTryAcquireShared(1) >= 0;
+            boolean result = sync.nonfairTryAcquireShared(1) >= 0;
+
+            if(isBroken())
+                throw new InterruptedException();
+
+            return result;
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
+        catch (InterruptedException e) {
+            throw new IgniteInterruptedException(e);
+        }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
     }
 
     /** {@inheritDoc} */
     @Override public boolean tryAcquire(long timeout, TimeUnit unit) throws IgniteException {
+        ctx.kernalContext().gateway().readLock();
+
         try {
             initializeSemaphore();
 
-            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+            boolean result = sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+
+            if(isBroken())
+                throw new InterruptedException();
+
+            return result;
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -644,6 +730,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         catch (InterruptedException e) {
             throw new IgniteInterruptedException(e);
         }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
     }
 
     /** {@inheritDoc} */
@@ -653,6 +742,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
 
     /** {@inheritDoc} */
     @Override public void release(int permits) {
+        ctx.kernalContext().gateway().readLock();
+
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
 
         try {
@@ -663,10 +754,15 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
     }
 
     /** {@inheritDoc} */
     @Override public boolean tryAcquire(int permits) {
+        ctx.kernalContext().gateway().readLock();
+
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
 
         try {
@@ -677,15 +773,25 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
     }
 
     /** {@inheritDoc} */
     @Override public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws IgniteInterruptedException {
+        ctx.kernalContext().gateway().readLock();
+
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
         try {
             initializeSemaphore();
 
-            return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
+            boolean result =  sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
+
+            if(isBroken())
+                throw new InterruptedException();
+
+            return result;
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -693,15 +799,32 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         catch (InterruptedException e) {
             throw new IgniteInterruptedException(e);
         }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
     }
 
     /** {@inheritDoc} */
     @Override public boolean isFailoverSafe() {
-        return sync.failoverSafe;
+        ctx.kernalContext().gateway().readLock();
+
+        try {
+            initializeSemaphore();
+
+            return sync.failoverSafe;
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
     }
 
     /** {@inheritDoc} */
     @Override public boolean hasQueuedThreads() {
+        ctx.kernalContext().gateway().readLock();
+
         try {
             initializeSemaphore();
 
@@ -710,10 +833,15 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
     }
 
     /** {@inheritDoc} */
     @Override public int getQueueLength() {
+        ctx.kernalContext().gateway().readLock();
+
         try {
             initializeSemaphore();
 
@@ -722,11 +850,26 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
     }
 
     /** {@inheritDoc} */
     @Override public boolean isBroken(){
-        return sync.broken;
+        ctx.kernalContext().gateway().readLock();
+
+        try {
+            initializeSemaphore();
+
+            return sync.isBroken();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b63cee45/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index e85468e..fbd72bf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -454,6 +454,50 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
     /**
      * @throws Exception If failed.
      */
+    public void testSemaphoreSingleNodeFailure() throws Exception {
+        final Ignite i1 = grid(0);
+
+        IgniteSemaphore sem1 = i1.semaphore(STRUCTURE_NAME, 1, false, true);
+
+        sem1.acquire();
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                boolean failed = true;
+
+                IgniteSemaphore sem2 = i1.semaphore(STRUCTURE_NAME, 1, false, true);
+
+                try {
+                    sem2.acquire();
+                }
+                catch (Exception e){
+                    failed = false;
+                }
+                finally {
+                    assertFalse(failed);
+
+                    sem2.release();
+                }
+                return null;
+            }
+        });
+
+        while(!sem1.hasQueuedThreads()){
+            try {
+                Thread.sleep(1);
+            } catch (InterruptedException e) {
+                fail();
+            }
+        }
+
+        i1.close();
+
+        fut.get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testSemaphoreConstantTopologyChangeFailoverSafe() throws Exception {
         doTestSemaphore(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true);
     }