You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2017/04/12 02:50:48 UTC
[3/7] ignite git commit: ignite-1977 - fixed IgniteSemaphore fault
tolerance.
ignite-1977 - fixed IgniteSemaphore fault tolerance.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/902bf42c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/902bf42c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/902bf42c
Branch: refs/heads/ignite-1192
Commit: 902bf42c36f46b0aaa605b779a699eb8e0c0aca3
Parents: aeacad6
Author: Vladisav Jelisavcic <vl...@gmail.com>
Authored: Tue Apr 11 14:09:12 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Tue Apr 11 14:09:12 2017 +0300
----------------------------------------------------------------------
.../datastructures/GridCacheSemaphoreImpl.java | 74 +++++++++++++++++---
.../datastructures/GridCacheSemaphoreState.java | 22 ++++++
...eAbstractDataStructuresFailoverSelfTest.java | 21 ++++--
3 files changed, 102 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/902bf42c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index a1c0515..159e735 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -232,6 +232,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
/** {@inheritDoc} */
@Override protected final boolean tryReleaseShared(int releases) {
+ // Fail-fast path.
+ if(broken)
+ return true;
+
// Check if some other node updated the state.
// This method is called with release==0 only when trying to wake through update.
if (releases == 0)
@@ -295,6 +299,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
throw new IgniteCheckedException("Failed to find semaphore with given name: " +
name);
+ // Abort if state is already broken.
+ if (val.isBroken()) {
+ tx.rollback();
+
+ return true;
+ }
+
boolean retVal = val.getCount() == expVal;
if (retVal) {
@@ -349,11 +360,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
/**
* This method is used for releasing the permits acquired by failing node.
+ * In case the semaphore is broken, no permits are released and semaphore is set (globally) to broken state.
*
* @param nodeId ID of the failing node.
+ * @param broken Flag indicating that this semaphore is broken.
* @return True if this is the call that succeeded to change the global state.
*/
- protected boolean releaseFailedNode(final UUID nodeId) {
+ protected boolean releaseFailedNode(final UUID nodeId, final boolean broken) {
try {
return CU.outTx(
retryTopologySafe(new Callable<Boolean>() {
@@ -369,6 +382,25 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
throw new IgniteCheckedException("Failed to find semaphore with given name: " +
name);
+ // Quit early if semaphore is already broken.
+ if( val.isBroken()) {
+ tx.rollback();
+
+ return false;
+ }
+
+ // Mark semaphore as broken. No permits are released,
+ // since semaphore is useless from now on.
+ if (broken) {
+ val.setBroken(true);
+
+ semView.put(key, val);
+
+ tx.commit();
+
+ return true;
+ }
+
Map<UUID, Integer> map = val.getWaiters();
if (!map.containsKey(nodeId)) {
@@ -473,7 +505,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
tx.commit();
- return new Sync(cnt, waiters, failoverSafe);
+ Sync sync = new Sync(cnt, waiters, failoverSafe);
+
+ sync.setBroken(val.isBroken());
+
+ return sync;
}
}
}),
@@ -520,6 +556,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
if (sync == null)
return;
+ // Update broken flag.
+ sync.setBroken(val.isBroken());
+
// Update permission count.
sync.setPermits(val.getCount());
@@ -535,10 +574,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
int numPermits = sync.getPermitsForNode(nodeId);
if (numPermits > 0) {
- if (sync.failoverSafe)
- // Release permits acquired by threads on failing node.
- sync.releaseFailedNode(nodeId);
- else {
+ // Semaphore is broken if reaches this point in non-failover safe mode.
+ boolean broken = !sync.failoverSafe;
+
+ // Release permits acquired by threads on failing node.
+ sync.releaseFailedNode(nodeId, broken);
+
+ if (broken) {
// Interrupt every waiting thread if this semaphore is not failover safe.
sync.setBroken(true);
@@ -614,8 +656,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
sync.acquireSharedInterruptibly(permits);
- if (isBroken())
+ if (isBroken()) {
+ Thread.interrupted(); // Clear interrupt flag.
+
throw new InterruptedException();
+ }
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -731,8 +776,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
boolean result = sync.nonfairTryAcquireShared(1) >= 0;
- if (isBroken())
+ if (isBroken()) {
+ Thread.interrupted(); // Clear interrupt flag.
+
throw new InterruptedException();
+ }
return result;
}
@@ -756,8 +804,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
boolean result = sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
- if (isBroken())
+ if (isBroken()) {
+ Thread.interrupted(); // Clear interrupt flag.
+
throw new InterruptedException();
+ }
return result;
}
@@ -825,8 +876,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
boolean result = sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
- if (isBroken())
+ if (isBroken()) {
+ Thread.interrupted();
+
throw new InterruptedException();
+ }
return result;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/902bf42c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
index 50cdf10..cdff9c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
@@ -46,6 +46,9 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
/** FailoverSafe flag. */
private boolean failoverSafe;
+ /** Flag indicating that semaphore is no longer safe to use. */
+ private boolean broken;
+
/**
* Constructor.
*
@@ -101,6 +104,21 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
return failoverSafe;
}
+ /**
+ * @return broken flag.
+ */
+ public boolean isBroken() {
+ return broken;
+ }
+
+ /**
+ *
+ * @param broken Flag indicating that this semaphore should be no longer used.
+ */
+ public void setBroken(boolean broken) {
+ this.broken = broken;
+ }
+
/** {@inheritDoc} */
@Override public Object clone() throws CloneNotSupportedException {
return super.clone();
@@ -120,6 +138,8 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
out.writeInt(e.getValue());
}
}
+
+ out.writeBoolean(broken);
}
/** {@inheritDoc} */
@@ -135,6 +155,8 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
for (int i = 0; i < size; i++)
waiters.put(U.readUuid(in), in.readInt());
}
+
+ broken = in.readBoolean();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/902bf42c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 285ea6e..f918acd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -530,8 +530,6 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
private void doTestSemaphore(ConstantTopologyChangeWorker topWorker, final boolean failoverSafe) throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-1977");
-
final int permits = topWorker instanceof MultipleTopologyChangeWorker ||
topWorker instanceof PartitionedMultipleTopologyChangeWorker ? TOP_CHANGE_THREAD_CNT * 3 :
TOP_CHANGE_CNT;
@@ -548,9 +546,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
break;
}
catch (IgniteInterruptedException e) {
- // Exception may happen in non failover safe mode.
+ // Exception may happen in non failover safe mode.
if (failoverSafe)
throw e;
+ else {
+ // In non-failoverSafe mode semaphore is not safe to be reused,
+ // and should always be discarded after exception is caught.
+ break;
+ }
}
}
@@ -569,6 +572,11 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
// Exception may happen in non failover safe mode.
if (failoverSafe)
throw e;
+ else {
+ // In non-failoverSafe mode semaphore is not safe to be reused,
+ // and should always be discarded after exception is caught.
+ break;
+ }
}
}
@@ -581,8 +589,11 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
fut.get();
- for (Ignite g : G.allGrids())
- assertEquals(permits, g.semaphore(STRUCTURE_NAME, permits, false, false).availablePermits());
+ // Semaphore is left in proper state only if failoverSafe mode is used.
+ if (failoverSafe) {
+ for (Ignite g : G.allGrids())
+ assertEquals(permits, g.semaphore(STRUCTURE_NAME, permits, false, false).availablePermits());
+ }
}
}