You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/03/22 17:49:38 UTC
[02/50] [abbrv] ignite git commit: IGNITE-2735 - Interrupt all
acquires on local node after ignite.close - Fixes #525.
IGNITE-2735 - Interrupt all acquires on local node after ignite.close - Fixes #525.
Signed-off-by: Valentin Kulichenko <va...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b63cee45
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b63cee45
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b63cee45
Branch: refs/heads/ignite-2004
Commit: b63cee457007bfcf91bd4d0415c57a1b05647b7b
Parents: 053af5d
Author: Vladisav Jelisavcic <vl...@gmail.com>
Authored: Thu Mar 10 21:02:52 2016 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Thu Mar 10 21:02:52 2016 -0800
----------------------------------------------------------------------
.../datastructures/DataStructuresProcessor.java | 5 +
.../datastructures/GridCacheSemaphoreEx.java | 5 +
.../datastructures/GridCacheSemaphoreImpl.java | 161 +++++++++++++++++--
...eAbstractDataStructuresFailoverSelfTest.java | 44 +++++
4 files changed, 206 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b63cee45/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 445fc3e..0b02abd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -259,6 +259,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
@Override public void onKernalStop(boolean cancel) {
super.onKernalStop(cancel);
+ for (GridCacheRemovable ds : dsMap.values()) {
+ if (ds instanceof GridCacheSemaphoreEx)
+ ((GridCacheSemaphoreEx)ds).stop();
+ }
+
if (initLatch.getCount() > 0) {
initFailed = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b63cee45/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
index 4d39635..b49d6b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
@@ -44,4 +44,9 @@ public interface GridCacheSemaphoreEx extends IgniteSemaphore, GridCacheRemovabl
* @param nodeId Id of the node that left the grid.
*/
public void onNodeRemoved(UUID nodeId);
+
+ /**
+ * Callback to notify local semaphore instance on node stop.
+ */
+ public void stop();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b63cee45/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index 37df9d5..2c60e8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -179,6 +179,29 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
/**
+ * Set a flag indicating that it is not safe to continue using this semaphore.
+ * This is the case only if one of two things happened:
+ * 1. A node that previously acquired on this semaphore failed and
+ * semaphore is created in non-failoversafe mode;
+ * 2. Local node failed (is closed), so any any threads on this node
+ * waiting to acquire are notified, and semaphore is not safe to be used anymore.
+ *
+ * @return True is semaphore is not safe to be used anymore.
+ */
+ protected boolean isBroken() {
+ return broken;
+ }
+
+ /** Flag indicating that a node failed and it is not safe to continue using this semaphore.
+ * Any attempt to acquire on broken semaphore will result in {@linkplain IgniteInterruptedException}.
+ *
+ * @param broken True if semaphore should not be used anymore.
+ * */
+ protected void setBroken(boolean broken) {
+ this.broken = broken;
+ }
+
+ /**
* This method is used by the AQS to test if the current thread should block or not.
*
* @param acquires Number of permits to acquire.
@@ -186,6 +209,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
*/
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
+ // If broken, return immediately, exception will be thrown anyway.
+ if(broken)
+ return 1;
+
int available = getState();
int remaining = available - acquires;
@@ -209,6 +236,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
return true;
for (;;) {
+ // If broken, return immediately, exception will be thrown anyway.
+ if(broken)
+ return true;
+
int cur = getState();
int next = cur + releases;
@@ -228,6 +259,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
*/
final int drainPermits() {
for (;;) {
+ // If broken, return immediately, exception will be thrown anyway.
+ if(broken)
+ return 1;
int current = getState();
@@ -504,7 +538,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
sync.releaseFailedNode(nodeId);
else {
// Interrupt every waiting thread if this semaphore is not failover safe.
- sync.broken = true;
+ sync.setBroken(true);
for (Thread t : sync.getSharedQueuedThreads())
t.interrupt();
@@ -515,6 +549,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
+ @Override public void stop() {
+ sync.setBroken(true);
+
+ // Try to notify any waiting threads.
+ sync.releaseShared(0);
+ }
+
/** {@inheritDoc} */
@Override public void needCheckNotRemoved() {
// No-op.
@@ -527,15 +568,17 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
/** {@inheritDoc} */
@Override public void acquire(int permits) throws IgniteInterruptedException {
+ ctx.kernalContext().gateway().readLock();
+
A.ensure(permits >= 0, "Number of permits must be non-negative.");
try {
initializeSemaphore();
- if(isBroken())
- Thread.currentThread().interrupt();
-
sync.acquireSharedInterruptibly(permits);
+
+ if(isBroken())
+ throw new InterruptedException();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -543,10 +586,15 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
}
/** {@inheritDoc} */
@Override public void acquireUninterruptibly() {
+ ctx.kernalContext().gateway().readLock();
+
try {
initializeSemaphore();
@@ -555,10 +603,15 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
}
/** {@inheritDoc} */
@Override public void acquireUninterruptibly(int permits) {
+ ctx.kernalContext().gateway().readLock();
+
A.ensure(permits >= 0, "Number of permits must be non-negative.");
try {
@@ -569,10 +622,15 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
}
/** {@inheritDoc} */
@Override public int availablePermits() {
+ ctx.kernalContext().gateway().readLock();
+
int ret;
try {
initializeSemaphore();
@@ -603,12 +661,17 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
return ret;
}
/** {@inheritDoc} */
@Override public int drainPermits() {
+ ctx.kernalContext().gateway().readLock();
+
try {
initializeSemaphore();
@@ -617,26 +680,49 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
}
/** {@inheritDoc} */
@Override public boolean tryAcquire() {
+ ctx.kernalContext().gateway().readLock();
+
try {
initializeSemaphore();
- return sync.nonfairTryAcquireShared(1) >= 0;
+ boolean result = sync.nonfairTryAcquireShared(1) >= 0;
+
+ if(isBroken())
+ throw new InterruptedException();
+
+ return result;
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
}
/** {@inheritDoc} */
@Override public boolean tryAcquire(long timeout, TimeUnit unit) throws IgniteException {
+ ctx.kernalContext().gateway().readLock();
+
try {
initializeSemaphore();
- return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+ boolean result = sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+
+ if(isBroken())
+ throw new InterruptedException();
+
+ return result;
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -644,6 +730,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
}
/** {@inheritDoc} */
@@ -653,6 +742,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
/** {@inheritDoc} */
@Override public void release(int permits) {
+ ctx.kernalContext().gateway().readLock();
+
A.ensure(permits >= 0, "Number of permits must be non-negative.");
try {
@@ -663,10 +754,15 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
}
/** {@inheritDoc} */
@Override public boolean tryAcquire(int permits) {
+ ctx.kernalContext().gateway().readLock();
+
A.ensure(permits >= 0, "Number of permits must be non-negative.");
try {
@@ -677,15 +773,25 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
}
/** {@inheritDoc} */
@Override public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws IgniteInterruptedException {
+ ctx.kernalContext().gateway().readLock();
+
A.ensure(permits >= 0, "Number of permits must be non-negative.");
try {
initializeSemaphore();
- return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
+ boolean result = sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
+
+ if(isBroken())
+ throw new InterruptedException();
+
+ return result;
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -693,15 +799,32 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
}
/** {@inheritDoc} */
@Override public boolean isFailoverSafe() {
- return sync.failoverSafe;
+ ctx.kernalContext().gateway().readLock();
+
+ try {
+ initializeSemaphore();
+
+ return sync.failoverSafe;
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
}
/** {@inheritDoc} */
@Override public boolean hasQueuedThreads() {
+ ctx.kernalContext().gateway().readLock();
+
try {
initializeSemaphore();
@@ -710,10 +833,15 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
}
/** {@inheritDoc} */
@Override public int getQueueLength() {
+ ctx.kernalContext().gateway().readLock();
+
try {
initializeSemaphore();
@@ -722,11 +850,26 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
}
/** {@inheritDoc} */
@Override public boolean isBroken(){
- return sync.broken;
+ ctx.kernalContext().gateway().readLock();
+
+ try {
+ initializeSemaphore();
+
+ return sync.isBroken();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/b63cee45/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index e85468e..fbd72bf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -454,6 +454,50 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
/**
* @throws Exception If failed.
*/
+ public void testSemaphoreSingleNodeFailure() throws Exception {
+ final Ignite i1 = grid(0);
+
+ IgniteSemaphore sem1 = i1.semaphore(STRUCTURE_NAME, 1, false, true);
+
+ sem1.acquire();
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ boolean failed = true;
+
+ IgniteSemaphore sem2 = i1.semaphore(STRUCTURE_NAME, 1, false, true);
+
+ try {
+ sem2.acquire();
+ }
+ catch (Exception e){
+ failed = false;
+ }
+ finally {
+ assertFalse(failed);
+
+ sem2.release();
+ }
+ return null;
+ }
+ });
+
+ while(!sem1.hasQueuedThreads()){
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ fail();
+ }
+ }
+
+ i1.close();
+
+ fut.get();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testSemaphoreConstantTopologyChangeFailoverSafe() throws Exception {
doTestSemaphore(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true);
}