You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2017/04/12 12:46:59 UTC

[02/13] ignite git commit: ignite-1977 - fixed IgniteSemaphore fault tolerance.

ignite-1977 - fixed IgniteSemaphore fault tolerance.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/902bf42c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/902bf42c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/902bf42c

Branch: refs/heads/ignite-3477-master
Commit: 902bf42c36f46b0aaa605b779a699eb8e0c0aca3
Parents: aeacad6
Author: Vladisav Jelisavcic <vl...@gmail.com>
Authored: Tue Apr 11 14:09:12 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Tue Apr 11 14:09:12 2017 +0300

----------------------------------------------------------------------
 .../datastructures/GridCacheSemaphoreImpl.java  | 74 +++++++++++++++++---
 .../datastructures/GridCacheSemaphoreState.java | 22 ++++++
 ...eAbstractDataStructuresFailoverSelfTest.java | 21 ++++--
 3 files changed, 102 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/902bf42c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index a1c0515..159e735 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -232,6 +232,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
 
         /** {@inheritDoc} */
         @Override protected final boolean tryReleaseShared(int releases) {
+            // Fail-fast path.
+            if(broken)
+                return true;
+
             // Check if some other node updated the state.
             // This method is called with release==0 only when trying to wake through update.
             if (releases == 0)
@@ -295,6 +299,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
                                     throw new IgniteCheckedException("Failed to find semaphore with given name: " +
                                         name);
 
+                                // Abort if state is already broken.
+                                if (val.isBroken()) {
+                                    tx.rollback();
+
+                                    return true;
+                                }
+
                                 boolean retVal = val.getCount() == expVal;
 
                                 if (retVal) {
@@ -349,11 +360,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
 
         /**
          * This method is used for releasing the permits acquired by failing node.
+         * In case the semaphore is broken, no permits are released and semaphore is set (globally) to broken state.
          *
          * @param nodeId ID of the failing node.
+         * @param broken Flag indicating that this semaphore is broken.
          * @return True if this is the call that succeeded to change the global state.
          */
-        protected boolean releaseFailedNode(final UUID nodeId) {
+        protected boolean releaseFailedNode(final UUID nodeId, final boolean broken) {
             try {
                 return CU.outTx(
                     retryTopologySafe(new Callable<Boolean>() {
@@ -369,6 +382,25 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
                                     throw new IgniteCheckedException("Failed to find semaphore with given name: " +
                                         name);
 
+                                // Quit early if semaphore is already broken.
+                                if( val.isBroken()) {
+                                    tx.rollback();
+
+                                    return false;
+                                }
+
+                                // Mark semaphore as broken. No permits are released,
+                                // since semaphore is useless from now on.
+                                if (broken) {
+                                    val.setBroken(true);
+
+                                    semView.put(key, val);
+
+                                    tx.commit();
+
+                                    return true;
+                                }
+
                                 Map<UUID, Integer> map = val.getWaiters();
 
                                 if (!map.containsKey(nodeId)) {
@@ -473,7 +505,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
 
                                 tx.commit();
 
-                                return new Sync(cnt, waiters, failoverSafe);
+                                Sync sync = new Sync(cnt, waiters, failoverSafe);
+
+                                sync.setBroken(val.isBroken());
+
+                                return sync;
                             }
                         }
                     }),
@@ -520,6 +556,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         if (sync == null)
             return;
 
+        // Update broken flag.
+        sync.setBroken(val.isBroken());
+
         // Update permission count.
         sync.setPermits(val.getCount());
 
@@ -535,10 +574,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         int numPermits = sync.getPermitsForNode(nodeId);
 
         if (numPermits > 0) {
-            if (sync.failoverSafe)
-                // Release permits acquired by threads on failing node.
-                sync.releaseFailedNode(nodeId);
-            else {
+            // Semaphore is broken if reaches this point in non-failover safe mode.
+            boolean broken = !sync.failoverSafe;
+
+            // Release permits acquired by threads on failing node.
+            sync.releaseFailedNode(nodeId, broken);
+
+            if (broken) {
                 // Interrupt every waiting thread if this semaphore is not failover safe.
                 sync.setBroken(true);
 
@@ -614,8 +656,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
 
             sync.acquireSharedInterruptibly(permits);
 
-            if (isBroken())
+            if (isBroken()) {
+                Thread.interrupted(); // Clear interrupt flag.
+
                 throw new InterruptedException();
+            }
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -731,8 +776,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
 
             boolean result = sync.nonfairTryAcquireShared(1) >= 0;
 
-            if (isBroken())
+            if (isBroken()) {
+                Thread.interrupted(); // Clear interrupt flag.
+
                 throw new InterruptedException();
+            }
 
             return result;
         }
@@ -756,8 +804,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
 
             boolean result = sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
 
-            if (isBroken())
+            if (isBroken()) {
+                Thread.interrupted(); // Clear interrupt flag.
+
                 throw new InterruptedException();
+            }
 
             return result;
         }
@@ -825,8 +876,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
 
             boolean result = sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
 
-            if (isBroken())
+            if (isBroken()) {
+                Thread.interrupted();
+
                 throw new InterruptedException();
+            }
 
             return result;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/902bf42c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
index 50cdf10..cdff9c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
@@ -46,6 +46,9 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
     /** FailoverSafe flag. */
     private boolean failoverSafe;
 
+    /** Flag indicating that semaphore is no longer safe to use. */
+    private boolean broken;
+
     /**
      * Constructor.
      *
@@ -101,6 +104,21 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
         return failoverSafe;
     }
 
+    /**
+     * @return broken flag.
+     */
+    public boolean isBroken() {
+        return broken;
+    }
+
+    /**
+     *
+     * @param broken Flag indicating that this semaphore should be no longer used.
+     */
+    public void setBroken(boolean broken) {
+        this.broken = broken;
+    }
+
     /** {@inheritDoc} */
     @Override public Object clone() throws CloneNotSupportedException {
         return super.clone();
@@ -120,6 +138,8 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
                 out.writeInt(e.getValue());
             }
         }
+
+        out.writeBoolean(broken);
     }
 
     /** {@inheritDoc} */
@@ -135,6 +155,8 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
             for (int i = 0; i < size; i++)
                 waiters.put(U.readUuid(in), in.readInt());
         }
+
+        broken = in.readBoolean();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/902bf42c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 285ea6e..f918acd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -530,8 +530,6 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     private void doTestSemaphore(ConstantTopologyChangeWorker topWorker, final boolean failoverSafe) throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-1977");
-
         final int permits = topWorker instanceof MultipleTopologyChangeWorker ||
             topWorker instanceof PartitionedMultipleTopologyChangeWorker ? TOP_CHANGE_THREAD_CNT * 3 :
             TOP_CHANGE_CNT;
@@ -548,9 +546,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
                             break;
                         }
                         catch (IgniteInterruptedException e) {
-                            // Exception may happen in non failover safe mode.
+                           // Exception may happen in non failover safe mode.
                             if (failoverSafe)
                                 throw e;
+                            else {
+                                // In non-failoverSafe mode semaphore is not safe to be reused,
+                                // and should always be discarded after exception is caught.
+                                break;
+                            }
                         }
                     }
 
@@ -569,6 +572,11 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
                         // Exception may happen in non failover safe mode.
                         if (failoverSafe)
                             throw e;
+                        else {
+                            // In non-failoverSafe mode semaphore is not safe to be reused,
+                            // and should always be discarded after exception is caught.
+                            break;
+                        }
                     }
                 }
 
@@ -581,8 +589,11 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
 
             fut.get();
 
-            for (Ignite g : G.allGrids())
-                assertEquals(permits, g.semaphore(STRUCTURE_NAME, permits, false, false).availablePermits());
+            // Semaphore is left in proper state only if failoverSafe mode is used.
+            if (failoverSafe) {
+                for (Ignite g : G.allGrids())
+                    assertEquals(permits, g.semaphore(STRUCTURE_NAME, permits, false, false).availablePermits());
+            }
         }
     }